澳门新浦京娱乐场网站-www.146.net-新浦京娱乐场官网
做最好的网站

澳门新浦京娱乐场网站:mysql基于binlog回滚工具

 

 

    update、delete的条件写错甚至没有写,导致数据操作错误,需要恢复被误操作的行记录。这种情形,其实时有发生,可以选择用备份文件 binlog来恢复到测试环境,然后再做数据修复,但是这样其实需要耗费一定的时间跟资源。

 

    其实,如果binlog format为row,binlog文件中是会详细记录每一个事务涉及到操作,并把每一个事务影响到行记录均存储起来,能否给予binlog 文件来反解析数据库的行记录变动情况呢?

    最近写完mysql flashback,突然发现还有有这种使用场景:有些情况下,可能会统计在某个时间段内,MySQL修改了多少数据量?发生了多少事务?主要是哪些表格发生变动?变动的数量是怎么样的? 但是却不需要行记录的修改内容,只需要了解 行数据的 变动情况。故也整理了下。

    业界已有不少相关的脚本及工具,但是随着MySQL版本的更新、binlog记录内容的变化以及需求不一致,大多数脚本不太适合个人目前的使用需求,所以开始着手编写 mysql的 flash back脚本。

    昨晚写的脚本,因为个人python能力有限,本来想这不发这文,后来想想,没准会有哪位园友给出优化建议。

 

 





 

    如果转载,请注明博文来源: www.cnblogs.com/xinysu/   ,版权归 博客园 苏家小萝卜 所有。望各位支持!

    如果转载,请注明博文来源: www.cnblogs.com/xinysu/   ,版权归 博客园 苏家小萝卜 所有。望各位支持!

 

 





1 实现内容 

    有些情况下,可能会统计在某个时间段内,MySQL修改了多少数据量?发生了多少事务?主要是哪些表格发生变动?变动的数量是怎么样的? 但是却不需要行记录的修改内容,只需要了解 行数据的 变动情况。

    这些情况部分可以通过监控来大致了解,但是也可以基于binlog来全盘分析,binlog的格式是row模式。

    在写flashback的时候,顺带把这个也写了个脚步,使用python编写,都差不多原理,只是这个简单些,介于个人python弱的不行,性能可能还有很大的提升空间,也希望园友能协助优化下。

    先贴python脚步的分析结果图如下,分为4个部分:事务耗时情况、事务影响行数情况、DML行数情况以及操作最频繁表格情况。

澳门新浦京娱乐场网站 1

 

2 脚本简单描述

    脚本依赖的模块中,pymysql需要自行安装。

    创建类queryanalyse,其中有5个函数定义:_get_db、create_tab、rowrecord、binlogdesc跟closeconn。

     仅在MySQL 5.6/5.7版本测试,python运行环境需要安装pymysql模块。

2.1 _get_db

    该函数用来解析输入参数值,参数值一共有7个,都是必须填写的。分别为host,user,password,port,table name for transaction,table name for records,对应的简写如下:

 

ALL options need to assign:

 

-h    : host, the database host,which database will store the results after analysis

-u    : user, the db user

-p    : password, the db user's password

-P    : port, the db port

-f    : file path, the binlog file

-tr    : table name for record , the table name to store the row record

-tt    : table name for transaction, the table name to store transactions

    比如,执行脚本:python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow,该函数负责处理各个选项的参数值情况,并存储。

1 实现内容

    根据binlog文件,对某个些事务、某段时间某些表、某段时间全库 做回滚操作,实现闪回功能。工具处理过程中,会把binlog中的事务修改的行记录存储到表格中去,通过 dml_sql 列,可以查看每一个事务内部的所有行记录变更情况,通过 undo_sql 查看回滚的SQL内容。如下图,然后再根据表格内容做回滚操作。 

    澳门新浦京娱乐场网站 2  

    那么这个脚本有哪些优点呢?

  • 回滚分为2个命令:第一个命令 分析binglog并存储进入数据库;第二个命令 执行回滚操作;
  • 回滚的时候,可以把执行脚本跟回滚脚本统一存放到数据库中,可以查看 更新内容以及回滚内容;
  • 根据存储的分析表格,方便指定事务或者指定表格来来恢复;
  • 详细的日志输出,说明分析进度跟执行进度。

    分析binlog的输出截图(分析1G的binlog文件)

    澳门新浦京娱乐场网站 3

 

   回滚数据库的输出截图:

   澳门新浦京娱乐场网站 4

2.2 create_tab

澳门新浦京娱乐场网站:mysql基于binlog回滚工具_flashback,基于binlog来分析mysql的行记录修改情况。    创建两个表格,分别用来存储 binlog file文件的分析结果。一个用来存储事务的执行开始时间跟结束时间,由选项 -tt来赋值表名;一个是用来存储每一行记录的修改情况,由选项 -tr来赋值表名。

    事务表记录内容:事务的开始时间及事务的结束时间。

    行记录表的内容:库名,表名,DML类型以及事务对应事务表的编号。

 

root@localhost:mysql3310.sock  14:42:29 [flashback]>show create table tbrow G
*************************** 1. row ***************************
       Table: tbrow
Create Table: CREATE TABLE `tbrow` (
  `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
  `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
  `dbname` varchar(50) NOT NULL,
  `tbname` varchar(50) NOT NULL,
  PRIMARY KEY (`auto_id`),
  KEY `sqltype` (`sqltype`),
  KEY `dbname` (`dbname`),
  KEY `tbname` (`tbname`)
) ENGINE=InnoDB AUTO_INCREMENT=295151 DEFAULT CHARSET=utf8
1 row in set (0.00 sec)

root@localhost:mysql3310.sock  14:42:31 [flashback]>SHOW CREATE TABLE TBTRAN G
*************************** 1. row ***************************
       Table: TBTRAN
Create Table: CREATE TABLE `tbtran` (
  `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `begin_time` datetime NOT NULL,
  `end_time` datetime NOT NULL,
  PRIMARY KEY (`auto_id`)
) ENGINE=InnoDB AUTO_INCREMENT=6390 DEFAULT CHARSET=utf8
1 row in set (0.00 sec)

 

2 原理

    前提:实例启动了binlog并且格式为ROW。

    

    使用python对mysqlbinlog后的log文件进行文本分析处理,在整个处理过程中,有以下6个疑难点需要处理:

  1. 判断一个事务的开始跟结束
  2. 同一个事务的执行顺序需要反序执行
  3. 解析回滚SQL
  4. 同一个事务操作不同表格处理
  5. 转义字符处理,比如换行符、tab符等等
  6. timestamp数据类型参数值转换
  7. 负数处理
  8. 单个事务涉及到行修改SQL操作了 max_allow
  9. 针对某个表格做回滚,而不是全库回滚

    

2.3 rowrecord

    重点函数,分析binlog文件内容。这里有几个规律:

  1. 每个事务的结束点,是以 'Xid = ' 来查找
    1. 事务的开始时间,是事务内的第一个 'Table_map' 行里边的时间
    2. 事务的结束时间,是以 'Xid = '所在行的 里边的时间
  2. 每个行数据是属于哪个表格,是以 'Table_map'来查找
  3. DML的类型是按照 行记录开头的情况是否为:'### INSERT INTO'  、'### UPDATE' 、'### DELETE FROM' 
  4. 注意,单个事务可以包含多个表格多种DML多行数据修改的情况。

2.1 事务的开始与结束

    按照Xid出现的位置来判断,从binlog文件的最开始开始读取,遇到SQL语句则提取出来,直到遇到Xid,统一把之前提取出来的SQL汇总为一个事务,然后继续提取SQL语句,直到遇到下一个Xid,再把这个事务的SQL汇总成一个事务,一直这样循环,直至文件顺序遍历结束。

    澳门新浦京娱乐场网站 5

 

2.4 binlogdesc

    描述分析结果,简单4个SQL分析。

  1. 分析修改行数据的 事务耗时情况
  2. 分析修改行数据的 事务影响行数情况
  3. 分析DML分布情况
  4. 分析 最多DML操作的表格 ,取前十个分析

2.2 事务内部反序处理

    同一个事务中,如果有多个表格多行记录发生变更,在回滚的时候,应该反序回滚SQL,那么,如何将提取出来的SQL反序存储呢?思路如下:

  • 每行记录的修改SQL独立出来
  • 将独立出来的SQL反序存储

   假设正序的事务SQL语句存储在变量 dml_sql 中,反序后的可以回滚的SQL存储在变量 undo_sql中。按顺序把行记录修改的SQL抽取出来 存储到变量 record_sql 中去,然后 赋值 undo_sql =record_sql undo_sql ,再置空 record_sql 变量,如此,便可实现反序事务内部的执行SQL。

2.5 closeconn

    关闭数据库连接。

2.3 解析回滚SQL

    首先,查看binlog的日志内容,发现行修改的SQL情形如下,提取过程中需要注意这几个问题:

  • 行记录的列名配对,binlog file存储的列序号,不能直接使用
  • WHERE部分 跟 SET部分 之间并无关键字或者符号,需要添加 AND 或者 逗号
  • DELETE SQL 需要反转为 INSERT
  • UPDATE SQL 需要把WHERE 跟 SET的部分进行替换
  • INSERT SQL需要反转为 DELETE
  • 澳门新浦京娱乐场网站 6

3 使用说明

    首先,确保python安装了pymysql模块,把python脚本拷贝到文件 queryanalyse.py。

    然后,把要分析的binlog文件先用 mysqlbinlog 指令分析存储,具体binlog的文件说明,可以查看之前的博文:关于binary log那些事——认真码了好长一篇。mysqlbinlog的指令使用方法,可以详细查看文档: 。

    比较常用通过指定开始时间跟结束时间来分析 binlog文件。

mysqlbinlog --start-datetime='2017-04-23 00:00:03' --stop-datetime='2017-04-23 00:30:00' --base64-output=decode-rows -v /data/mysql/logs/mysql-bin.007335 > /tmp/binlog_test.log

     

    分析后,可以把这个 binlog_test.log文件拷贝到其他空闲服务器执行分析,只需要有个空闲的DB来存储分析记录即可。

    假设这个时候,拷贝 binlog_test.log到测试服务器上,测试服务器上的数据库可以用来存储分析内容,则可以执行python脚本了,注意要进入到python脚本的目录中,或者指定python脚本路径。

 

python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f= /tmp/binlog_test.log -tt=flashback.tbtran -tr=flashback.tbrow

 

    没了,就等待输出吧。

    性能是硬伤,在虚拟机上测试,大概500M的binlog文件需要分析2-3min,有待提高!

2.4 同事务不同表格处理

    同一个事务中,允许对不同表格进行数据修改,这点在列名替换列序号的时候,需要留意处理。

    每一个的行记录前有一行记录,含有 'Table_map' 标识,会说明这一行当行记录是修改哪个表格,可以根据这个提示,来替换binlog里边的列序号为列名。

4 python脚本

  1 import pymysql
  2 from pymysql.cursors import DictCursor
  3 import re
  4 import os
  5 import sys
  6 import datetime
  7 import time
  8 import logging
  9 import importlib
 10 importlib.reload(logging)
 11 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')
 12 
 13 
 14 usage=''' usage: python [script's path] [option]
 15 ALL options need to assign:
 16 
 17 -h     : host, the database host,which database will store the results after analysis 
 18 -u     : user, the db user
 19 -p     : password, the db user's password
 20 -P     : port, the db port
 21 -f     : file path, the binlog file
 22 -tr    : table name for record , the table name to store the row record
 23 -tt    : table name for transaction, the table name to store transactions
 24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow
 25 
 26 '''
 27 
 28 class queryanalyse:
 29     def __init__(self):
 30         #初始化
 31         self.host=''
 32         self.user=''
 33         self.password=''
 34         self.port='3306'
 35         self.fpath=''
 36         self.tbrow=''
 37         self.tbtran=''
 38 
 39         self._get_db()
 40         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran))
 41 
 42         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
 43         self.cur = self.mysqlconn.cursor(cursor=DictCursor)
 44         logging.info('MySQL which userd to store binlog event connection is ok')
 45 
 46         self.begin_time=''
 47         self.end_time=''
 48         self.db_name=''
 49         self.tb_name=''
 50 
 51     def _get_db(self):
 52         #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式,
 53         #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理
 54         logging.info('begin to assign values to parameters')
 55         if len(sys.argv) == 1:
 56             print(usage)
 57             sys.exit(1)
 58         elif sys.argv[1] == '--help':
 59             print(usage)
 60             sys.exit()
 61         elif len(sys.argv) > 2:
 62             for i in sys.argv[1:]:
 63                 _argv = i.split('=')
 64                 if _argv[0] == '-h':
 65                     self.host = _argv[1]
 66                 elif _argv[0] == '-u':
 67                     self.user = _argv[1]
 68                 elif _argv[0] == '-P':
 69                     self.port = int(_argv[1])
 70                 elif _argv[0] == '-f':
 71                     self.fpath = _argv[1]
 72                 elif _argv[0] == '-tr':
 73                     self.tbrow = _argv[1]
 74                 elif _argv[0] == '-tt':
 75                     self.tbtran = _argv[1]
 76                 elif _argv[0] == '-p':
 77                     self.password = _argv[1]
 78                 else:
 79                     print(usage)
 80 
 81     def create_tab(self):
 82         #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况
 83         #注意,一个事务可以存储多行数据修改的情况
 84         logging.info('creating table ...')
 85         create_tb_sql ='''CREATE TABLE IF NOT EXISTS  {} (
 86                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
 87                           `begin_time` datetime NOT NULL,
 88                           `end_time` datetime NOT NULL,
 89                           PRIMARY KEY (`auto_id`)
 90                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
 91                         CREATE TABLE IF NOT EXISTS  {} (
 92                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
 93                           `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
 94                           `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
 95                           `dbname` varchar(50) NOT NULL,
 96                           `tbname` varchar(50) NOT NULL,
 97                           PRIMARY KEY (`auto_id`),
 98                           KEY `sqltype` (`sqltype`),
 99                           KEY `dbname` (`dbname`),
100                           KEY `tbname` (`tbname`)
101                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
102                         truncate table {};
103                         truncate table {};
104                         '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)
105 
106         self.cur.execute(create_tb_sql)
107         logging.info('created table {} and {}'.format(self.tbrow,self.tbtran))
108 
109     def rowrecord(self):
110         #处理每一行binlog
111         #事务的结束采用 'Xid =' 来划分
112         #分析结果,按照一个事务为单位存储提交一次到db
113         try:
114             tran_num=1    #事务数
115             record_sql='' #行记录的insert sql
116             tran_sql=''   #事务的insert sql
117 
118             self.create_tab()
119 
120             with open(self.fpath,'r') as binlog_file:
121                 logging.info('begining to analyze the binlog file ,this may be take a long time !!!')
122                 logging.info('analyzing...')
123 
124                 for bline in binlog_file:
125 
126                     if bline.find('Table_map:') != -1:
127                         l = bline.index('server')
128                         n = bline.index('Table_map')
129                         begin_time = bline[:l:].rstrip(' ').replace('#', '20')
130 
131                         if record_sql=='':
132                             self.begin_time = begin_time[0:4]   '-'   begin_time[4:6]   '-'   begin_time[6:]
133 
134                         self.db_name = bline[n::].split(' ')[1].replace('`', '').split('.')[0]
135                         self.tb_name = bline[n::].split(' ')[1].replace('`', '').split('.')[1]
136                         bline=''
137 
138                     elif bline.startswith('### INSERT INTO'):
139                        record_sql=record_sql "insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
140 
141                     elif bline.startswith('### UPDATE'):
142                        record_sql=record_sql "insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
143 
144                     elif bline.startswith('### DELETE FROM'):
145                        record_sql=record_sql "insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)
146 
147                     elif bline.find('Xid =') != -1:
148 
149                         l = bline.index('server')
150                         end_time = bline[:l:].rstrip(' ').replace('#', '20')
151                         self.end_time = end_time[0:4]   '-'   end_time[4:6]   '-'   end_time[6:]
152                         tran_sql=record_sql "insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time)
153 
154                         self.cur.execute(tran_sql)
155                         self.mysqlconn.commit()
156                         record_sql = ''
157                         tran_num  = 1
158 
159         except Exception:
160             return 'funtion rowrecord error'
161 
162     def binlogdesc(self):
163         sql=''
164         t_num=0
165         r_num=0
166         logging.info('Analysed result printing...n')
167         #分析总的事务数跟行修改数量
168         sql="select 'tbtran' name,count(*) nums from {}  union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow)
169         self.cur.execute(sql)
170         rows=self.cur.fetchall()
171         for row in rows:
172             if row['name']=='tbtran':
173                 t_num = row['nums']
174             else:
175                 r_num = row['nums']
176         print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num))
177 
178         # 计算 最耗时 的单个事务
179         # 分析每个事务的耗时情况,分为5个时间段来描述
180         # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second
181         sql='''select 
182                       count(case when cost_sec between 0 and 1 then 1 end ) cos_1,
183                       count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,
184                       count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,
185                       count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,
186                       count(case when cost_sec >30.1 then 1 end ) cos_more,
187                       max(cost_sec) cos_max
188                 from 
189                 (
190                         select 
191                             auto_id,timestampdiff(second,begin_time,end_time) cost_sec
192                         from {}
193                 ) a;'''.format(self.tbtran)
194         self.cur.execute(sql)
195         rows=self.cur.fetchall()
196 
197         for row in rows:
198             print('The most cost time : {} '.format(row['cos_max']))
199             print('The distribution map of each transaction costed time: ')
200             print('Cost time between    0 and  1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num)))
201             print('Cost time between  1.1 and  5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num)))
202             print('Cost time between  5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num)))
203             print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num)))
204             print('Cost time                     > 30.1 : {} , {}%n'.format(row['cos_more'], int(row['cos_more'] * 100 / t_num)))
205 
206         # 计算 单个事务影响行数最多 的行数量
207         # 分析每个事务 影响行数 情况,分为5个梯度来描述
208         sql='''select 
209                     count(case when nums between 0 and 10 then 1 end ) row_1,
210                     count(case when nums between 11 and 100 then 1 end ) row_2,
211                     count(case when nums between 101 and 1000 then 1 end ) row_3,
212                     count(case when nums between 1001 and 10000 then 1 end ) row_4,
213                     count(case when nums >10001 then 1 end ) row_5,
214                     max(nums) row_max
215                from 
216                   (
217                     select 
218                              count(*) nums
219                     from {} group by tran_num
220                    ) a;'''.format(self.tbrow)
221         self.cur.execute(sql)
222         rows=self.cur.fetchall()
223 
224         for row in rows:
225             print('The most changed rows for each row: {} '.format(row['row_max']))
226             print('The distribution map of each transaction changed rows : ')
227             print('Changed rows between    1 and    10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num)))
228             print('Changed rows between   11 and   100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num)))
229             print('Changed rows between  101 and  1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num)))
230             print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num)))
231             print('Changed rows                       > 10001 : {} , {}%n'.format(row['row_5'], int(row['row_5'] * 100 / t_num)))
232 
233         # 分析 各个行数 DML的类型情况
234         # 描述 delete,insert,update的分布情况
235         sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow)
236         self.cur.execute(sql)
237         rows=self.cur.fetchall()
238 
239         print('The distribution map of the {} changed rows : '.format(r_num))
240         for row in rows:
241 
242             if row['sqltype']==1:
243                 print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
244             if row['sqltype']==2:
245                 print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))
246             if row['sqltype']==3:
247                 print('DELETE rows :{} , {}%n '.format(row['nums'],int(row['nums']*100/r_num)))
248 
249         # 描述 影响行数 最多的表格
250         # 可以分析是哪些表格频繁操作,这里显示前10个table name
251         sql = '''select 
252                       dbname,tbname ,
253                       count(*) ALL_rows,
254                       count(*)*100/{} per,
255                       count(case when sqltype=1 then 1 end) INSERT_rows,
256                       count(case when sqltype=2 then 1 end) UPDATE_rows,
257                       count(case when sqltype=3 then 1 end) DELETE_rows
258                 from {} 
259                 group by dbname,tbname 
260                 order by ALL_rows desc 
261                 limit 10;'''.format(r_num,self.tbrow)
262         self.cur.execute(sql)
263         rows = self.cur.fetchall()
264 
265         print('The distribution map of the {} changed rows : '.format(r_num))
266         print('tablename'.ljust(50),
267               '|','changed_rows'.center(15),
268               '|','percent'.center(10),
269               '|','insert_rows'.center(18),
270               '|','update_rows'.center(18),
271               '|','delete_rows'.center(18)
272               )
273         print('-------------------------------------------------------------------------------------------------------------------------------------------------')
274         for row in rows:
275             print((row['dbname'] '.' row['tbname']).ljust(50),
276                   '|',str(row['ALL_rows']).rjust(15),
277                   '|',(str(int(row['per'])) '%').rjust(10),
278                   '|',str(row['INSERT_rows']).rjust(10) ' , ' (str(int(row['INSERT_rows']*100/row['ALL_rows'])) '%').ljust(5),
279                   '|',str(row['UPDATE_rows']).rjust(10) ' , ' (str(int(row['UPDATE_rows']*100/row['ALL_rows'])) '%').ljust(5),
280                   '|',str(row['DELETE_rows']).rjust(10) ' , ' (str(int(row['DELETE_rows']*100/row['ALL_rows'])) '%').ljust(5),
281                   )
282         print('n')
283 
284         logging.info('Finished to analyse the binlog file !!!')
285 
286     def closeconn(self):
287         self.cur.close()
288         logging.info('release db connectionsn')
289 
290 def main():
291     p = queryanalyse()
292     p.rowrecord()
293     p.binlogdesc()
294     p.closeconn()
295 
296 if __name__ == "__main__":
297     main()

 

2.5 转义字符处理

    binlog文件在对非空格的空白字符处理,采用转义字符字符串存储,比如,在表格insert一列记录含换行符,而实际上在binlog文件中,是使用了 x0a 替换了 换行操作,所以在回滚数据的过程中,需要对转义字符做处理。

    澳门新浦京娱乐场网站 7

    澳门新浦京娱乐场网站 8

 

    

    这里注意一个地方,039的转义字符是没有在函数 esc_code 中统一处理,而是单独做另外处理。

    澳门新浦京娱乐场网站 9

 

    转移字符表相见下图:

    澳门新浦京娱乐场网站 10

 

2.6 timestamp数据类型处理

    timestamp实际在数据库中的存储值是 INT类型,需要使用 函数 from_unixtime转换。

    建立测试表格tbtest,只有一列timestamp的列,存储值后查看binlog的内容,具体截图如下:

    澳门新浦京娱乐场网站 11

 

    在处理行记录的时候,要对timestamp的value做处理,添加from_unixtime函数转换。

2.7 负数值处理

    这个一开始写代码的时候,并没有考虑到。大量测试的过程中发现,所有整型的数据类型,在存储负数的时候,都会存入一个最大范围值。binlog在处理这块的机制有些不是很了解。测试如下:

    澳门新浦京娱乐场网站 12

 

   所以当遇到INT的各种数据类型并且VALUE为负数的时候,需要把 这个范围值去除,才能执行执行undo_sql。

2.8 单个事务行记录总SQL超过max_allowed_package处理

    分析binlog后存储两种sql类型,一种是行记录的修改SQL,即 dml_sql;一种是 行记录的回滚sql,即 undo_sql。从代码可知,存储这两个sql的列是longtext,最大可存储4G的内容。但是 MySQL中单个会话的包大小是有限制的,限制的参数为 max_allowed_packet,默认大小为 4Mb,最大为1G,所以这个脚本使用前,请手动设置 存储binlog file的数据库实例以及线上的数据库实例这个参数:

set global max_allowed_packet = 1073741824; #记得后续修改回来

    

    万一操作了呢?那么回滚只能分段来回滚,先回滚到 这个大事务,然后单独执行这个大事务,紧接着继续回滚,这部分不能使用pymysql嗲用source 文件执行,所以只能手动做这个操作。 求高能人士修改这个逻辑代码!!!

2.9 针对性回滚

    假设误操作的没有明确的时间点,只有一个区间,而这个区间还有其他的表格操作,那么这个时候,需要在分析binlog文件的时候,添加--database选项,先帅选到同一个数据库中binlog文件中。

    这里的处理是将这段区间的dml_sql跟undo_sql都存储到数据库表格中,然后再删除不需要回滚的事务,剩余需要回滚的事务。再执行回滚操作。

3 使用说明

3.1 参数说明

    这个脚本的参数稍微多些,可以 --help 查看具体说明。

    澳门新浦京娱乐场网站 13

 

    本人喜欢用各种颜色来分类参数(blingbling五颜六色,看着多有趣多精神),所以,按颜色来说明这些参数。

  • 黄色区域:这6个参数,提供的是 分析并存储binlog file的相关值,说明存储分析结果的数据库的链接方式、binlog文件的位置以及存储结果的表格名字;
  • 蓝色区域:这4个参数,提供 与线上数据库表结构一致的DB实例连接方式,仅需跟线上一模一样的表结构,不一定需要是主从库;
  • 绿色区域:最最重要的选项 -a,0代表仅分析binlog文件,1代表仅执行回滚操作,必须先执行0才可以执行1;
  • 紫色区域:举例说明。

3.2 应用场景说明

全库回滚某段时间

  • 需要回滚某个时间段的所有SQL操作,回滚到某一个时间点
  • 这种情况下呢,大多数是使用备份文件 binlog解决
  • 但是这个脚本也可以满足,但请勿直接在线上操作,先 -a=0,看下分析结果,是否符合,符合的话,停掉某个从库,再在从库上执行,最后开发业务接入检查是否恢复到指定时间点,数据是否正常。

某段时间某些表格回滚某些操作

比如,开发提交了一个批量更新脚本,各个测试层面验证没有问题,提交线上执行,但是执行后,发现有个业务漏测试,导致某些字段更新后影响到其他业务,现在需要紧急 把被批量更新的表格回滚到原先的行记录

这个并不能单纯从技术角度来处理,要综合考虑

  • 澳门新浦京娱乐场网站 14

     

  • 这种情况下,如何回顾tab A表格的修改操作呢?

  • 个人觉得,这种方式比较行得通,dump tabA表格的数据到测试环境,然后再分析 binlog file 从11点-12点的undo sql,接着在测试环境回滚该表格到11点这个时刻,紧接着,由开发跟业务对比测试环境11点的数据跟线上现有的数据中,看下是哪些行哪些列需要在线上进行回滚,哪些是不需要的,然后开发再提交SQL脚本,再在线上执行。其实,这里边,DBA仅提供一个角色,就是把 表格 tab A 在一个新的环境上,回滚到某个时间点,但是不提供直接线上回滚SQL的处理。

回滚某个/些SQL

  • 这种情况比较常见,某个update某个delete缺少where条件或者where条件执行错误
  • 这种情况下,找到对应的事务,执行回滚即可,回滚流程请参考上面一说,对的,我就是这么胆小怕事 澳门新浦京娱乐场网站 15 

3.3 测试案例

3.3.1 全库回滚某段时间

假设需要回滚9点10分到9点15分间数据库的所有操作:

  • 准备测试环境实例存储分析后的数据 
  • 测试环境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog文件
  • python脚本分析文件,action=0
  • 线上测试环境修改set global max_allowed_packet = 1073741824
  • 回滚数据,action=1
  • 线上测试环境修改set global max_allowed_packet = 4194304

    1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3 4 5 mysql> show global variables like 'max_allowed_packet'; 6 -------------------- ---------- 7 | Variable_name | Value | 8 -------------------- ---------- 9 | max_allowed_packet | 16777216 | 10 -------------------- ---------- 11 1 row in set (0.00 sec) 12 13 mysql> set global max_allowed_packet = 1073741824; 14 Query OK, 0 rows affected (0.00 sec) 15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log 17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p= -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op= -a=0 19 2017-06-19 10:59:39,041 INFO begin to assign values to parameters 20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent 21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok 22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=,port=3310 23 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok 24 2017-06-19 10:59:39,054 INFO MySQL connection is ok 25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event 26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!! 28 2017-06-19 10:59:39,061 INFO analyzing... 29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!! 30 2017-06-19 11:49:53,782 INFO release all db connections 31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p= -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op= -a=1 35 2017-06-19 16:30:20,633 INFO begin to assign values to parameters 36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent 37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok 38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=,port=3310 39 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok 40 2017-06-19 16:30:20,871 INFO MySQL connection is ok 41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图-------------- 46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections 49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 
  • 测试环境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog文件
  • python脚本分析文件,action=0
  • 分析帅选需要的事务,rename表格
  • dump 对应的表格到测试环境
  • 回滚数据,action=1
  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 
  • 测试环境修改set global max_allowed_packet = 1073741824
  • mysqlbinlog分析binlog文件
  • python脚本分析文件,action=0
  • 分析帅选需要的事务,rename表格
  • dump 对应的表格到测试环境
  • 回滚数据,action=1
  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-
  2 __author__ = 'xinysu'
  3 __date__ = '2017/6/15 10:30'
  4 
  5 
  6 
  7 import re
  8 import os
  9 import sys
 10 import datetime
 11 import time
 12 import logging
 13 import importlib
 14 importlib.reload(logging)
 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ')
 16 
 17 import pymysql
 18 from pymysql.cursors import DictCursor
 19 
 20 usage='''nusage: python [script's path] [option]
 21 ALL options need to assign:
 22 33[1;33;40m
 23 -h    : host, the database host,which database will store the results after analysis
 24 -u    : user, the db user
 25 -p    : password, the db user's password
 26 -P    : port, the db port
 27 
 28 -f    : file path, the binlog file
 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname},
 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent
 32 33[1;34;40m
 33 -oh   : online host, the database host,which database have the online table schema
 34 -ou   : online user, the db user
 35 -op   : online password, the db user's password
 36 -oP   : online port, the db port
 37 33[1;32;40m
 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table
 41 33[0m  
 42 --help: help document
 43 33[1;35;40m
 44 Example:
 45 analysize binlog:
 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0
 49 
 50 flash back:
 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1
 54 33[0m                        
 55 '''
 56 
 57 class flashback:
 58     def __init__(self):
 59         self.host=''
 60         self.user=''
 61         self.password=''
 62         self.port='3306'
 63         self.fpath=''
 64         self.tbevent=''
 65 
 66         self.on_host=''
 67         self.on_user=''
 68         self.on_password=''
 69         self.on_port='3306'
 70 
 71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table
 72 
 73         self._get_db() # 从输入参数获取连接数据库的相关参数值
 74 
 75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容
 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent))
 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8')
 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor)
 79         logging.info('MySQL which userd to store binlog event connection is ok')
 80 
 81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致
 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析
 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port))
 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8')
 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor)
 86         logging.info('MySQL which userd to analyse online table schema connection is ok')
 87 
 88         logging.info('33[33mMySQL connection is ok33[0m')
 89 
 90         self.dml_sql=''
 91         self.undo_sql=''
 92 
 93         self.tbfield_where = []
 94         self.tbfield_set = []
 95 
 96         self.begin_time=''
 97         self.db_name=''
 98         self.tb_name=''
 99         self.end_time=''
100         self.end_pos=''
101         self.sqltype=0
102 
103     #_get_db用于获取执行命令的输入参数
104     def _get_db(self):
105         logging.info('begin to assign values to parameters')
106         if len(sys.argv) == 1:
107             print(usage)
108             sys.exit(1)
109         elif sys.argv[1] == '--help':
110             print(usage)
111             sys.exit()
112         elif len(sys.argv) > 2:
113             for i in sys.argv[1:]:
114                 _argv = i.split('=')
115                 if _argv[0] == '-h':
116                     self.host = _argv[1]
117                 elif _argv[0] == '-u':
118                     self.user = _argv[1]
119                 elif _argv[0] == '-P':
120                     self.port = int(_argv[1])
121                 elif _argv[0] == '-f':
122                     self.fpath = _argv[1]
123                 elif _argv[0] == '-t':
124                     self.tbevent = _argv[1]
125                 elif _argv[0] == '-p':
126                     self.password = _argv[1]
127 
128                 elif _argv[0] == '-oh':
129                     self.on_host = _argv[1]
130                 elif _argv[0] == '-ou':
131                     self.on_user = _argv[1]
132                 elif _argv[0] == '-oP':
133                     self.on_port = int(_argv[1])
134                 elif _argv[0] == '-op':
135                     self.on_password = _argv[1]
136 
137                 elif _argv[0] == '-a':
138                     self.action = _argv[1]
139 
140                 else:
141                     print(usage)
142 
143     #创建表格,用于存储分析后的BINLOG内容
144     def create_tab(self):
145         logging.info('creating table {} to store binlog event'.format(self.tbevent))
146         create_tb_sql ='''
147         CREATE TABLE IF NOT EXISTS {}(
148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,
149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',
150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',
151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',
152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',
153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',
154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',
155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',
157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',
158             PRIMARY KEY (auto_id),
159             INDEX sqltype(sqltype),
160             INDEX dml_start_time (dml_start_time),
161             INDEX dml_end_time (dml_end_time),
162             INDEX end_log_pos (end_log_pos),
163             INDEX db_name (db_name),
164             INDEX table_name (table_name)
165         )
166         COLLATE='utf8_general_ci' ENGINE=InnoDB;
167         TRUNCATE TABLE {};
168 
169         '''.format(self.tbevent,self.tbevent)
170         self.cur.execute(create_tb_sql)
171         logging.info('created table {} '.format(self.tbevent))
172 
173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and
174     def tbschema(self,dbname,tbname):
175         self.tbfield_where = []
176         self.tbfield_set = []
177 
178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)
179 
180         self.on_cur.execute(sql_tb)
181         tbcol=self.on_cur.fetchall()
182 
183         i = 0
184         for l in tbcol:
185             #self.tbfield.append(l['Field'])
186             if i==0:
187                 self.tbfield_where.append('`' l['Field'] '`')
188                 self.tbfield_set.append('`' l['Field'] '`')
189                 i =1
190             else:
191                 self.tbfield_where.append('/*where*/ and /*where*/'   '`' l['Field'] '`')
192                 self.tbfield_set.append( '/*set*/ , /*set*/' '`' l['Field'] '`' )
193 
194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录
195     def rowrecord(self,bl_line):
196         try:
197             if bl_line.find('Table_map:') != -1:
198                 l = bl_line.index('server')
199                 m = bl_line.index('end_log_pos')
200                 n = bl_line.index('Table_map')
201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')
202 
203                 self.begin_time = begin_time[0:4]   '-'   begin_time[4:6]   '-'   begin_time[6:]
204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]
205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]
206 
207                 self.tbschema(self.db_name,self.tb_name)
208         except Exception:
209             return 'funtion rowrecord error'
210 
211     def dml_tran(self,bl_line):
212         try:
213 
214 
215             if bl_line.find('Xid =') != -1:
216 
217                 l = bl_line.index('server')
218                 m = bl_line.index('end_log_pos')
219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')
220                 self.end_time = end_time[0:4]   '-'   end_time[4:6]   '-'   end_time[6:]
221                 self.end_pos = int(bl_line[m::].split(' ')[1])
222 
223 
224 
225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')
226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')
227 
228                 if self.dml_sql.startswith(' INSERT INTO '):
229                     self.sqltype=1
230                 elif self.dml_sql.startswith(' UPDATE '):
231                     self.sqltype=2
232                 elif self.dml_sql.startswith(' DELETE '):
233                     self.sqltype=3
234 
235                 record_sql = ''
236                 undosql_desc = ''
237 
238                 #同个事务内部的行记录修改SQL,反序存储
239                 for l in self.undo_sql.splitlines():
240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):
241                         undosql_desc = record_sql   undosql_desc
242                         record_sql = ''
243                         record_sql = record_sql   l
244                     else:
245                         record_sql = record_sql   l
246 
247                 self.undo_sql = record_sql   undosql_desc
248                 self.undo_sql = self.undo_sql.lstrip()[1:] ';'
249 
250                 #处理非空格的空白特殊字符
251                 self.dml_sql = self.esc_code(self.dml_sql)
252                 self.undo_sql = self.esc_code(self.undo_sql)
253 
254                 #单独处理 转移字符: '
255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\x27',"''''")  #   ';'
256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\x27',"''''")  #   ';'
257 
258                 if len(self.dml_sql)>500000000:
259                     with open('/tmp/flashback_undosql/' str(self.end_pos) '.sql', 'w') as w_f:
260                         w_f.write('begin;'   'n')
261                         w_f.write(self.undo_sql)
262                         w_f.write('commit;'   'n')
263                     self.dml_sql=''
264                     self.undo_sql='/tmp/flashback_undosql/' str(self.end_pos) '.sql'
265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))
266 
267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(
268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,
269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)
270 
271                 self.cur.execute(insert_sql)
272                 self.mysqlconn.commit()
273 
274                 self.dml_sql = ''
275                 self.undo_sql = ''
276         except Exception:
277             print( 'funtion dml_tran error')
278 
279 
280     def analyse_binlog(self):
281         try:
282             sqlcomma=0
283             self.create_tab()
284 
285             with open(self.fpath,'r') as binlog_file:
286                 logging.info('33[36mbegining to analyze the binlog file ,this may be take a long time !!!33[0m')
287                 logging.info('33[36manalyzing...33[0m')
288                 for bline in binlog_file:
289                     if bline.find('Table_map:') != -1:
290                         self.rowrecord(bline)
291                         bline=''
292                     elif bline.rstrip()=='### SET':
293                         bline = bline[3:]
294                         sqlcomma=1
295                     elif bline.rstrip()=='### WHERE':
296                         bline = bline[3:]
297                         sqlcomma = 2
298                     elif bline.startswith('###   @'):
299                         len_f=len('###   @')
300                         i=bline[len_f:].split('=')[0]
301 
302                         #处理timestamp类型
303                         if bline[8 len(i):].split(' ')[2] == 'TIMESTAMP(0)':
304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')
305                             bline = bline.split('=')[0]   '=from_unixtime('   bline[:stop_pos].split('=')[1]   ')'
306 
307                         #处理负数存储方式
308                         if bline.split('=')[1].startswith('-'):
309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')
310                             bline = bline.split('=')[0]   '='   bline.split('=')[1].split(' ')[0] 'n'
311 
312                         if sqlcomma==1:
313                             bline = self.tbfield_set[int(i) - 1] bline[(len_f len(i)):]
314                         elif sqlcomma==2:
315                             bline = self.tbfield_where[int(i) - 1]   bline[(len_f len(i)):]
316 
317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):
318                         bline = bline[3:]
319 
320                     elif bline.find('Xid =') != -1:
321                         self.dml_tran(bline)
322                         bline=''
323                     else:
324                         bline = ''
325 
326                     if bline.rstrip('n') != '':
327                         self.dml_sql = self.dml_sql   bline   ' '
328         except Exception:
329             return 'function do error'
330 
331     def esc_code(self,sql):
332         esc={
333              '\x07':'a','\x08':'b','\x0c':'f','\x0a':'n','\x0d':'r','\x09':'t','\x0b':'v','\x5c':'\',
334             #'\x27':''',
335             '\x22':'"','\x3f':'?','\x00':''
336              }
337 
338         for k,v in esc.items():
339             sql=sql.replace(k,v)
340         return sql
341 
342     def binlogdesc(self):
343 
344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)
345         print(countsql)
346         self.cur.execute(countsql)
347         count_row=self.cur.fetchall()
348 
349         update_count=0
350         insert_couont=0
351         delete_count=0
352         for row in count_row:
353             if row['sqltype']==1:
354                 insert_couont=row['numbers']
355             elif row['sqltype']==2:
356                 update_count=row['numbers']
357             elif row['sqltype']==3:
358                 delete_count=row['numbers']
359         logging.info('33[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !33[0m(all number is accurate, the other is approximate value) 33[0m'.format(insert_couont update_count delete_count,insert_couont,update_count,delete_count))
360 
361     def undosql(self,number):
362         #这里会有几个问题:
363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;
364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;
365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理
366 
367         tran_num=1
368         id=0
369 
370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)
371 
372         self.cur.execute(tran_num_sql)
373         tran_rows=self.cur.fetchall()
374 
375         for num in tran_rows:
376             tran_num=num['table_rows']
377 
378         logging.info('33[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions 33[0m'.format(tran_num,int(tran_num/number) 1,number))
379 
380         while id<=tran_num:
381             logging.info('doing batch : {} '.format(int(id/number) 1))
382             undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id number),tran_num-id)
383             self.cur.execute(undo_sql)
384 
385             undo_rows=self.cur.fetchall()
386             f_sql=''
387 
388             for u_row in undo_rows:
389                 try:
390                     self.on_cur.execute(u_row['undo_sql'])
391                     self.on_mysqlconn.commit()
392                 except Exception:
393                     print('auto_id:',u_row['auto_id'])
394             id =number
395 
396 
397     def undo_file(self,number):
398         # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source
399 
400         tran_num=1
401         id=0
402 
403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)
404 
405         self.cur.execute(tran_num_sql)
406         tran_rows=self.cur.fetchall()
407 
408         for num in tran_rows:
409             tran_num=num['table_rows']
410 
411         logging.info('copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql')
412         logging.info('33[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions 33[0m'.format(tran_num,int(tran_num/number) 1,number))
413 
414         with open('/tmp/flashback_undosql/undo_file_flashback.sql', 'w') as w_f:
415             while id<=tran_num:
416                 logging.info('doing batch : {} '.format(int(id/number) 1))
417                 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id number),tran_num-id)
418                 self.cur.execute(undo_sql)
419 
420                 undo_rows=self.cur.fetchall()
421                 for u_row in undo_rows:
422                     try:
423                         w_f.write('begin;'   'n')
424                         w_f.write('# auto_id' str(u_row['auto_id'])   'n')
425                         w_f.write(u_row['undo_sql']   'n')
426                         w_f.write('commit;'   'n')
427                     except Exception:
428                         print('auto_id',u_row['auto_id'])
429                     #time.sleep(2)
430                 id =number
431 
432     def do(self):
433         if self.action=='0':
434             self.analyse_binlog()
435             logging.info('33[36mfinished to analyze the binlog file !!!33[0m')
436             #self.binlogdesc()
437         elif self.action=='1':
438             self.undosql(10000)
439 
440     def closeconn(self):
441         self.cur.close()
442         self.on_cur.close()
443         logging.info('release all db connections')
444         logging.info('33[33mAll done,check the {} which stored binlog event on host {} , port {} 33[0m'.format(self.tbevent,self.host,self.port))
445 
446 def main():
447     p = flashback()
448     p.do()
449     p.closeconn()
450 
451 if __name__ == "__main__":
452     main()

 

本文由澳门新浦京娱乐场网站发布于数据库,转载请注明出处:澳门新浦京娱乐场网站:mysql基于binlog回滚工具