没有契合的数据库迁移工具,用pymysql实现一个
2019年1月01日 11:21
版本迭代少不了数据迁移,python有自己的数据库迁移工具migrate。如果有的是其它开发语言,或者没有契合的迁移工具。
怎么自己做一个?
环境说明
项目开发语言:java 数据库: mysql 迁移脚本: python python工具包: pymysql
实现思路
目标是将老版本的数据转为新版本的数据。
1. show tables 查询所有的表 ---得到所有表名 2. 注意trancate table_name 保证新表干净 -- 清环境 3. python在执行sql后可以在游标的属性cursor.description中看到表的字段信息 -- 得到所有字段名、类型 4. 查询数据生成insert语句。版本之间有变化的做特殊处理 --数据映射 5. 每生成500条commit一次到新库 -- 批量commit 6. mysqldump一份数据,在开发环境做好测试,没问题就可以用了。
前方有坑
留意数据精度问题
示例
import pymysql FROM_DB = dict( ip="xxxxxxxxxx", username="xxxx", password="xxxx", db_name= "db1" ) TO_DB = dict( ip="xxxxxxxxxx", username="xxxx", password="xxxx", db_name="db2" ) class WebDB: def __init__(self, ip, username, password, db_name='xxx'): self.ip = ip self.username = username self.password = password self.db_name = db_name self.conn = None self.cursor = None def __enter__(self): self.conn = pymysql.connect(self.ip, self.username, self.password, self.db_name) self.cursor = self.conn.cursor() return self def __exit__(self, exctype, excvalue, traceback): if self.cursor: self.cursor.close() if self.conn: self.conn.close() class MigrateDB(object): def start(self): self.clean() self.import_data() def clean(self): with WebDB(**TO_DB)as db: db.cursor.execute("show tables;") for tables in db.cursor.fetchall(): table_name = tables[0] db.cursor.execute("truncate %s;" % table_name) def import_data(self): with WebDB(**FROM_DB) as from_db: with WebDB(**TO_DB) as to_db: #所有表 from_db.cursor.execute("show tables;") for from_tables in from_db.cursor.fetchall(): from_table_name = from_tables[0] from_db.cursor.execute("select * from %s;" % from_table_name) items = self.to_dict(from_db.cursor.fetchall(), from_db.cursor.description) print("import table: %s" % from_table_name) to_table_name = from_table_name for i, item in enumerate(items): # 需要映射转换的表,生成专门的sql if from_table_name == "xxxx": to_table_name = "xxxxx" item = self.gen_insert_sql_for_xxx(item) sql = self.gen_insert_sql(to_table_name, item) to_db.cursor.execute(sql, args=item) # 每500条commit一次 if i != 0 and i % 500 == 0: to_db.conn.commit() to_db.conn.commit() def to_dict(self, rows, description): """ 记录转为字典 :param rows: :param description: :return: """ for row in rows: item = dict() for i, field in enumerate(description): field_name = field[0] item[field_name] = row[i] yield item def gen_insert_sql(self, table_name, item): sql = "insert into %s(%s) values(%s)" keys = item.keys() key_str = ",".join(keys) value_str = ",".join(["%%(%s)s" % k for k, v in item.items()]) return sql % (table_name, key_str, value_str) if __name__ == "__main__": m = MigrateDB() m.start()
来源