mysql配置主从复制基于GTIDs

没有契合的数据库迁移工具,用pymysql实现一个

妙音 posted @ 2019年1月01日 11:21 in mysql , 598 阅读
版本迭代少不了数据迁移,python有自己的数据库迁移工具migrate。如果有的是其它开发语言,或者没有契合的迁移工具。
 
怎么自己做一个?
 

环境说明

项目开发语言:java
数据库: mysql
迁移脚本: python
python工具包: pymysql
 

实现思路

 
目标是将老版本的数据转为新版本的数据。
 
1. show tables 查询所有的表  ---得到所有表名
2. 注意trancate table_name 保证新表干净  -- 清环境
3. python在执行sql后可以在游标的属性cursor.description中看到表的字段信息  -- 得到所有字段名、类型
4. 查询数据生成insert语句。版本之间有变化的做特殊处理  --数据映射
5. 每生成500条commit一次到新库   -- 批量commit
6. mysqldump一份数据,在开发环境做好测试,没问题就可以用了。
 

前方有坑

 
留意数据精度问题
 

示例

 
import pymysql

FROM_DB = dict(
    ip="xxxxxxxxxx",
    username="xxxx",
    password="xxxx",
    db_name= "db1"
)

TO_DB = dict(
    ip="xxxxxxxxxx",
    username="xxxx",
    password="xxxx",
    db_name="db2"
)


class WebDB:
    def __init__(self, ip, username, password, db_name='xxx'):
        self.ip = ip
        self.username = username
        self.password = password
        self.db_name = db_name
        self.conn = None
        self.cursor = None

    def __enter__(self):
        self.conn = pymysql.connect(self.ip, self.username, self.password, self.db_name)
        self.cursor = self.conn.cursor()
        return self

    def __exit__(self, exctype, excvalue, traceback):
        if self.cursor:
            self.cursor.close()

        if self.conn:
            self.conn.close()



class MigrateDB(object):
    def start(self):
        self.clean()
        self.import_data()

    def clean(self):
        with WebDB(**TO_DB)as db:
            db.cursor.execute("show tables;")
            for tables in db.cursor.fetchall():
                table_name = tables[0]
                db.cursor.execute("truncate %s;" % table_name)

    def import_data(self):
        with WebDB(**FROM_DB) as from_db:
            with WebDB(**TO_DB) as to_db:
                #所有表
                from_db.cursor.execute("show tables;")
                for from_tables in from_db.cursor.fetchall():
                    from_table_name = from_tables[0]
                    from_db.cursor.execute("select * from %s;" % from_table_name)
                    items = self.to_dict(from_db.cursor.fetchall(), from_db.cursor.description)
                    print("import table: %s" % from_table_name)
                    to_table_name = from_table_name
                    for i, item in enumerate(items):
                        # 需要映射转换的表,生成专门的sql
                        if from_table_name == "xxxx":
                            to_table_name = "xxxxx"
                            item = self.gen_insert_sql_for_xxx(item)

                        sql = self.gen_insert_sql(to_table_name, item)
                        to_db.cursor.execute(sql, args=item)
                        # 每500条commit一次
                        if i != 0 and i % 500 == 0:
                            to_db.conn.commit()
                    to_db.conn.commit()


    def to_dict(self, rows, description):
        """
        记录转为字典
        :param rows:
        :param description:
        :return:
        """
        for row in rows:
            item = dict()
            for i, field in enumerate(description):
                field_name = field[0]
                item[field_name] = row[i]
            yield item

    def gen_insert_sql(self, table_name, item):
        sql = "insert into %s(%s) values(%s)"
        keys = item.keys()
        key_str = ",".join(keys)
        value_str = ",".join(["%%(%s)s" % k for k, v in item.items()])

        return sql % (table_name, key_str, value_str)


if __name__ == "__main__":
    m = MigrateDB()
    m.start()
 
来源
 
 
 

登录 *


loading captcha image...
(输入验证码)
or Ctrl+Enter