当前位置: 代码网 > it编程>前端脚本>Python > Python解析MySQL Binlog日志分析情况

Python解析MySQL Binlog日志分析情况

2025年01月23日 Python 我要评论
python解析mysql binlog日志分析情况1. 分析目的binlog 中记录了 mysql 数据变动,经常用于时间点恢复、数据闪回、分析一些 “奇怪” 的问题。例如是

python解析mysql binlog日志分析情况

1. 分析目的

binlog 中记录了 mysql 数据变动,经常用于时间点恢复、数据闪回、分析一些 “奇怪” 的问题。

例如是否有大事务,哪张表涉及的更新最多?是否有一些事务没有及时提交,都可以通过分析 binlog 来得到答案。

2. 代码逻辑

收集数据的第一步就是要解析 binlog 文件,binlog 是由事件组成的,例如:gtid 事件、xid 事件、table map 事件、dml 事件,只要获得所有的事件,就可以分析到很多东西。

每个事件都由两部分组成,事件头 - 事件体,事件头的存储格式是这样的。

类型占用
timestamp4 bytes
type_code1 bytes
server_id4 bytes
event_length4 bytes
next_position4 bytes
flags2 bytes

一共占 19 bytes 我们通过 header 可以知道事件类型,发生时间、事件长度、下一个事件开始位置。

读取头信息后,我们就可以通过 next_position 跳到下一个事件开始的位置,读取事件头,如果遇到 table_map 事件则表示要开启一个 dml 事务,那么 table map 事件中存储的是什么呢?

从 table map 中可以获得 dml 要操作的数据库与表信息,这样我们就可以定位到 dml 操作的是哪张表,开启一个事务时,binlog 会先记录 table_map 事件,涉及到多张表就会有多个 table_map 事件,然后就是 dml 事件,最后是一个 xid 事件,表示事务提交。

脚本通过解析 query_event 获得事务的起点,解析 table map 事件获得涉及的表,通过 xid 事件获得事务结束。

3. 实战分析

直接上代码吧~

需要安装 pandas 模块,这个模块处理数据非常方便,如果没有使用过的朋友,建议去把玩下,用好了提升工作效率。

# -*- coding: utf-8 -*-
import sys
import math
import time
import struct
import argparse
import pandas as pd
from datetime import datetime

binlog_quer_event_stern = 4
binlog_event_fix_part = 13
table_map_event_fix_length = 8
binlog_file_header = b'\xfe\x62\x69\x6e'
binlog_event_header_len = 19


class binlogevent:
    unknown_event = 0
    start_event_v3 = 1
    query_event = 2
    stop_event = 3
    rotate_event = 4
    intvar_event = 5
    load_event = 6
    slave_event = 7
    create_file_event = 8
    append_block_event = 9
    exec_load_event = 10
    delete_file_event = 11
    new_load_event = 12
    rand_event = 13
    user_var_event = 14
    format_description_event = 15
    xid_event = 16
    begin_load_query_event = 17
    execute_load_query_event = 18
    table_map_event = 19
    pre_ga_write_rows_event = 20
    pre_ga_update_rows_event = 21
    pre_ga_delete_rows_event = 22
    write_rows_event = 23
    update_rows_event = 24
    delete_rows_event = 25
    incident_event = 26
    heartbeat_log_event = 27
    ignorable_log_event = 28
    rows_query_log_event = 29
    write_rows_event_v2 = 30
    update_rows_event_v2 = 31
    delete_rows_event_v2 = 32
    gtid_log_event = 33
    anonymous_gtid_log_event = 34
    previous_gtids_log_event = 35


class binlogeventget(object):
    def __init__(self, binlog_path, outfile_path):
        self.file_handle = open(binlog_path, 'rb')
        # 分析文件导出的位置
        self.outfile_path = outfile_path

    def __del__(self):
        self.file_handle.close()

    def read_table_map_event(self, event_length, next_position):
        """
        fix_part = 8
            table_id : 6bytes
            reserved : 2bytes
        variable_part:
            database_name_length : 1bytes
            database_name : database_name_length bytes + 1
            table_name_length : 1bytes
            table_name : table_name_length bytes + 1
            cloums_count : 1bytes
            colums_type_array : one byte per column
            mmetadata_lenth : 1bytes
            metadata : .....(only available in the variable length field,varchar:2bytes,text、blob:1bytes,time、timestamp、datetime: 1bytes
                            blob、float、decimal : 1bytes, char、enum、binary、set: 2bytes(column type id :1bytes metadatea: 1bytes))
            bit_filed : 1bytes
            crc : 4bytes
            .........
        :return:
        """
        self.read_bytes(table_map_event_fix_length)
        database_name_length, = struct.unpack('b', self.read_bytes(1))
        database_name, _a, = struct.unpack('{}ss'.format(database_name_length),
                                           self.read_bytes(database_name_length + 1))

        table_name_length, = struct.unpack('b', self.read_bytes(1))
        table_name, _a, = struct.unpack('{}ss'.format(table_name_length), self.read_bytes(table_name_length + 1))

        self.file_handle.seek(next_position, 0)
        return database_name, table_name

    def read_bytes(self, count):
        """
        读取固定 bytes 的数据
        :param count:
        :return:
        """
        return self.file_handle.read(count)

    def main(self):
        if not self.read_bytes(4) == binlog_file_header:
            print("error: is not a standard binlog file format.")
            sys.exit(0)

        # 事务记录字典
        temp_transaction_dict = {
            'id': none,
            'db_name': none,
            'ld_table_name': none,
            'table_set': set(),
            'start_time': none,
            'end_time': none,
            'diff_second': none,
            'event_type': set(),
            'start_position': none,
            'end_position': none
        }

        tem_id = 0
        df = list()
        start_position, end_position = none, none
        print('loading.....')
        while true:
            type_code, event_length, timestamp, next_position = self.read_header()

            # 终止循环判断
            if type_code is none:
                break

            # 事务信息收集逻辑判断
            if type_code == binlogevent.query_event:
                thread_id, db_name, info = self.read_query_event(event_length)

                if info == 'begin':
                    temp_transaction_dict['start_position'] = next_position - event_length
                    temp_transaction_dict['start_time'] = timestamp
                    temp_transaction_dict['db_name'] = db_name
                    # print('time:', timestamp, 'db:', db_name, 'sql:', info)
                self.file_handle.seek(next_position, 0)

            elif type_code == binlogevent.table_map_event:
                with_database, with_table = self.read_table_map_event(event_length, next_position)
                # 只记录最开始的一张表
                if temp_transaction_dict['ld_table_name'] is none:
                    temp_transaction_dict['ld_table_name'] = str(with_table.decode())
                # 一个事务涉及的所有表集合
                temp_transaction_dict['table_set'].add(str(with_table.decode()))

            elif type_code in (binlogevent.write_rows_event, binlogevent.write_rows_event_v2):
                # print('insert:', type_code, event_length, timestamp, next_position)
                temp_transaction_dict['event_type'].add('insert')
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

            elif type_code in (binlogevent.update_rows_event, binlogevent.update_rows_event_v2):
                # print('update:', type_code, event_length, timestamp, next_position)
                temp_transaction_dict['event_type'].add('update')
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

            elif type_code in (binlogevent.delete_rows_event, binlogevent.delete_rows_event_v2):
                # print('delete:', type_code, event_length, timestamp, next_position)
                temp_transaction_dict['event_type'].add('delete')
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

            elif type_code == binlogevent.xid_event:
                # 补充事务结束信息
                temp_transaction_dict['id'] = tem_id
                temp_transaction_dict['end_time'] = timestamp
                temp_transaction_dict['end_position'] = next_position

                _start = datetime.strptime(temp_transaction_dict['start_time'], '%y-%m-%d %h:%m:%s')
                _end = datetime.strptime(temp_transaction_dict['end_time'], '%y-%m-%d %h:%m:%s')

                temp_transaction_dict['diff_second'] = (_end - _start).seconds

                df.append(temp_transaction_dict)

                # print(temp_transaction_dict)
                # 收尾
                temp_transaction_dict = {
                    'id': none,
                    'db_name': none,
                    'ld_table_name': none,
                    'table_set': set(),
                    'start_time': none,
                    'end_time': none,
                    'diff_second': none,
                    'event_type': set(),
                    'start_position': none,
                    'end_position': none
                }
                self.file_handle.seek(event_length - binlog_event_header_len, 1)
                tem_id += 1
            else:
                # 如果读取的是一个 header 事件,直接跳过即可。
                self.file_handle.seek(event_length - binlog_event_header_len, 1)

        outfile = pd.dataframe(df)
        outfile['transaction_size_bytes'] = (outfile['end_position'] - outfile['start_position'])
        outfile["transaction_size"] = outfile["transaction_size_bytes"].map(lambda x: self.bit_conversion(x))
        outfile.to_csv(self.outfile_path, encoding='utf_8_sig')
        print('file export directory: {}'.format(self.outfile_path))
        print('complete ok!')

    def read_header(self):
        """
        binlog_event_header_len = 19
        timestamp : 4bytes
        type_code : 1bytes
        server_id : 4bytes
        event_length : 4bytes
        next_position : 4bytes
        flags : 2bytes
        """
        read_byte = self.read_bytes(binlog_event_header_len)

        if read_byte:
            result = struct.unpack('=ibiiih', read_byte)
            type_code, event_length, timestamp, next_position = result[1], result[3], result[0], result[4]

            return type_code, event_length, time.strftime('%y-%m-%d %h:%m:%s',
                                                          time.localtime(
                                                              timestamp)), next_position
        else:
            return none, none, none, none

    def read_query_event(self, event_length=none):
        """
        fix_part = 13:
                thread_id : 4bytes
                execute_seconds : 4bytes
                database_length : 1bytes
                error_code : 2bytes
                variable_block_length : 2bytes
            variable_part :
                variable_block_length = fix_part.variable_block_length
                database_name = fix_part.database_length
                sql_statement = event_header.event_length - 19 - 13 - variable_block_length - database_length - 4
        """
        read_byte = self.read_bytes(binlog_event_fix_part)
        fix_result = struct.unpack('=iibhh', read_byte)
        thread_id = fix_result[0]
        self.read_bytes(fix_result[4])
        read_byte = self.read_bytes(fix_result[2])
        database_name, = struct.unpack('{}s'.format(fix_result[2]), read_byte)
        statement_length = event_length - binlog_event_fix_part - binlog_event_header_len \
                           - fix_result[4] - fix_result[2] - binlog_quer_event_stern
        read_byte = self.read_bytes(statement_length)
        _a, sql_statement, = struct.unpack('1s{}s'.format(statement_length - 1), read_byte)
        return thread_id, database_name.decode(), sql_statement.decode()

    @staticmethod
    def bit_conversion(size, dot=2):
        size = float(size)
        if 0 <= size < 1:
            human_size = str(round(size / 0.125, dot)) + ' b'
        elif 1 <= size < 1024:
            human_size = str(round(size, dot)) + ' b'
        elif math.pow(1024, 1) <= size < math.pow(1024, 2):
            human_size = str(round(size / math.pow(1024, 1), dot)) + ' kb'
        elif math.pow(1024, 2) <= size < math.pow(1024, 3):
            human_size = str(round(size / math.pow(1024, 2), dot)) + ' mb'
        elif math.pow(1024, 3) <= size < math.pow(1024, 4):
            human_size = str(round(size / math.pow(1024, 3), dot)) + ' gb'
        elif math.pow(1024, 4) <= size < math.pow(1024, 5):
            human_size = str(round(size / math.pow(1024, 4), dot)) + ' tb'
        elif math.pow(1024, 5) <= size < math.pow(1024, 6):
            human_size = str(round(size / math.pow(1024, 5), dot)) + ' pb'
        elif math.pow(1024, 6) <= size < math.pow(1024, 7):
            human_size = str(round(size / math.pow(1024, 6), dot)) + ' eb'
        elif math.pow(1024, 7) <= size < math.pow(1024, 8):
            human_size = str(round(size / math.pow(1024, 7), dot)) + ' zb'
        elif math.pow(1024, 8) <= size < math.pow(1024, 9):
            human_size = str(round(size / math.pow(1024, 8), dot)) + ' yb'
        elif math.pow(1024, 9) <= size < math.pow(1024, 10):
            human_size = str(round(size / math.pow(1024, 9), dot)) + ' bb'
        elif math.pow(1024, 10) <= size < math.pow(1024, 11):
            human_size = str(round(size / math.pow(1024, 10), dot)) + ' nb'
        elif math.pow(1024, 11) <= size < math.pow(1024, 12):
            human_size = str(round(size / math.pow(1024, 11), dot)) + ' db'
        elif math.pow(1024, 12) <= size:
            human_size = str(round(size / math.pow(1024, 12), dot)) + ' cb'
        else:
            raise valueerror('bit_conversion error')
        return human_size


if __name__ == '__main__':
    parser = argparse.argumentparser(description='a piece of binlog analysis code.')
    parser.add_argument('--binlog', type=str, help='binlog file path.', default=none)
    parser.add_argument('--outfile', type=str, help='analyze the file export directory.', default=none)
    args = parser.parse_args()

    if not args.binlog or not args.outfile:
        parser.print_help()
        sys.exit(0)

    binlog_show = binlogeventget(args.binlog, args.outfile)
    binlog_show.main()
➜  desktop python3 binlogshow.py --help
usage: binlogshow.py [-h] [--binlog binlog] [--outfile outfile]

a piece of binlog analysis code.

optional arguments:
  -h, --help         show this help message and exit
  --binlog binlog    binlog file path.
  --outfile outfile  analyze the file export directory.
➜  desktop

指定 binlog 文件目录和导出分析文件目录即可。

➜  desktop python3 binlogshow.py --binlog=/users/cooh/desktop/mysql-bin.009549 --outfile=/users/cooh/desktop/binlogshow.csv
loading.....
file export directory: /users/cooh/desktop/binlogshow.csv
complete ok!

运行完成后就会得到程序解析后的信息,我们根据这份文件,写一些分析代码即可。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持代码网。

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com