没有契合的数据库迁移工具,用pymysql实现一个

2019年1月01日 11:21

版本迭代少不了数据迁移,python有自己的数据库迁移工具migrate。如果有的是其它开发语言,或者没有契合的迁移工具。
 
怎么自己做一个?
 

环境说明

项目开发语言:java
数据库: mysql
迁移脚本: python
python工具包: pymysql
 

实现思路

 
目标是将老版本的数据转为新版本的数据。
 
1. show tables 查询所有的表  ---得到所有表名
2. 注意trancate table_name 保证新表干净  -- 清环境
3. python在执行sql后可以在游标的属性cursor.description中看到表的字段信息  -- 得到所有字段名、类型
4. 查询数据生成insert语句。版本之间有变化的做特殊处理  --数据映射
5. 每生成500条commit一次到新库   -- 批量commit
6. mysqldump一份数据,在开发环境做好测试,没问题就可以用了。
 

前方有坑

 
留意数据精度问题
 

示例

 
import pymysql

FROM_DB = dict(
    ip="xxxxxxxxxx",
    username="xxxx",
    password="xxxx",
    db_name= "db1"
)

TO_DB = dict(
    ip="xxxxxxxxxx",
    username="xxxx",
    password="xxxx",
    db_name="db2"
)


class WebDB:
    def __init__(self, ip, username, password, db_name='xxx'):
        self.ip = ip
        self.username = username
        self.password = password
        self.db_name = db_name
        self.conn = None
        self.cursor = None

    def __enter__(self):
        self.conn = pymysql.connect(self.ip, self.username, self.password, self.db_name)
        self.cursor = self.conn.cursor()
        return self

    def __exit__(self, exctype, excvalue, traceback):
        if self.cursor:
            self.cursor.close()

        if self.conn:
            self.conn.close()



class MigrateDB(object):
    def start(self):
        self.clean()
        self.import_data()

    def clean(self):
        with WebDB(**TO_DB)as db:
            db.cursor.execute("show tables;")
            for tables in db.cursor.fetchall():
                table_name = tables[0]
                db.cursor.execute("truncate %s;" % table_name)

    def import_data(self):
        with WebDB(**FROM_DB) as from_db:
            with WebDB(**TO_DB) as to_db:
                #所有表
                from_db.cursor.execute("show tables;")
                for from_tables in from_db.cursor.fetchall():
                    from_table_name = from_tables[0]
                    from_db.cursor.execute("select * from %s;" % from_table_name)
                    items = self.to_dict(from_db.cursor.fetchall(), from_db.cursor.description)
                    print("import table: %s" % from_table_name)
                    to_table_name = from_table_name
                    for i, item in enumerate(items):
                        # 需要映射转换的表,生成专门的sql
                        if from_table_name == "xxxx":
                            to_table_name = "xxxxx"
                            item = self.gen_insert_sql_for_xxx(item)

                        sql = self.gen_insert_sql(to_table_name, item)
                        to_db.cursor.execute(sql, args=item)
                        # 每500条commit一次
                        if i != 0 and i % 500 == 0:
                            to_db.conn.commit()
                    to_db.conn.commit()


    def to_dict(self, rows, description):
        """
        记录转为字典
        :param rows:
        :param description:
        :return:
        """
        for row in rows:
            item = dict()
            for i, field in enumerate(description):
                field_name = field[0]
                item[field_name] = row[i]
            yield item

    def gen_insert_sql(self, table_name, item):
        sql = "insert into %s(%s) values(%s)"
        keys = item.keys()
        key_str = ",".join(keys)
        value_str = ",".join(["%%(%s)s" % k for k, v in item.items()])

        return sql % (table_name, key_str, value_str)


if __name__ == "__main__":
    m = MigrateDB()
    m.start()
 
来源
 
 
 

评论(0) 阅读(594)

mysql配置主从复制基于GTIDs

2018年11月25日 13:59

 
 
用最简单的例子,教你配置基于GTIDs全局事物id的主从配置。mysql从5.7之后,增加了基于全局事务的主从复制,能够省去,log-bin主从复制找坐标的工作。
 

环境说明

 
mysql 8.0.12
 

过程说明

 
下面是两台新装的mysql启用全局事务id主从复制的过程
 
1) 启用log-bin,mysql默认是启动的不需要配置
 
2) master上创建复制用的帐号
 
3) master/slave启用gtid
 
4) slave配置master信息
 
5) slave启动复制
 
6) 检查配置是否成功
 
7) 测试效果
 

master上创建帐号用于复制

 
create user 'repl'@'%' identified by 'password';
grant replication slave on *.* to 'repl'@'%';
 

master启用gtid

#/etc/mysql/my.conf
server-id=1
gtid_mode=ON
enforce-gtid-consistency=true
 

slave启用gtid

server-id=2  #id不能重复
gtid_mode=ON
enforce-gtid-consistency=true
 
 

slave配置master信息

 
change master TO
master_host='192.168.50.111',
master_port=3307,
master_user='repl',
master_password='password',
master_auto_position=1;
 

启动slave

 
start slave
 

查看slave状态

 
#Slave_IO_Running/Slave_SQL_Running两个都是yes表示配置成功
show slave status;
 
 

测试

 
在master创建数据库,slave很快同步过来
 
 

参考来源

 
 

Tags: mysql 主从
评论(0) 阅读(268)

连接mysql8.0提示认证协议失败

2018年11月05日 00:06

描述

 
    用docker容器安装软件非常方便,省了太多麻烦。通过docker运行mysql8.0,再用navicat客户端连接出现client does not support authentication protocol requested by server。
 
    像似认真失败,检查mysql.user表,已经配置了用户,为什么还出现认真失败?这是因为mysql8.0改了密码加密方法。客户端与服务器端不对应就会出现该错误。
 

正确的docker-compose.yml,通过command指定加密插件

version: "3.1"
services:
    mysqlrepm:
        image: "mysql:8.0.12"
        #指定认证插件
        command: --default-authentication-plugin=mysql_native_password
        environment:
            - MYSQL_ROOT_PASSWORD=111111
        ports:
            - "3307:3306"
 
 

Tags: navicat mysql
评论(0) 阅读(249)

mysql存储引擎毫无疑问选择Innodb

2018年9月15日 12:25

网上观点

网上部分人在考虑如何选择MyISAM与InnoDB的依据是
* innodb 支持事务、行级锁. 适合写多读少
* myisam 表锁,适合读多写少
 

实际测试下查询速度

1千万(InnoDB)
select *        21秒
select id=xxx   0.026秒
count(id)       15.71秒(第一次16.71,第二次0秒,插入一条后16.476秒)
count(username) 16.956秒
avg(id)         18.41秒

1千万(Myisam)
select *        8.389秒
select id=xxx   0秒
count(*)        0秒
count(username) 1.643秒
avg(id)         3.793秒

myisam-->innodb 52.200秒
确实MyISAM比InnoDB快了3倍
 
可问题是实际中,都是通过索引分批取数据,有谁有一次取那么多的数据呢?而分批取数据性能都差不多。
 

存储引擎特性对比

下面是官方网站上mysql8.0的特性。可以看出MyISAM特性,InooDB都有,没有必须选择MyISAM的理由
| Feature                      | MyISAM       | InnoDB       |
| B-tree indexes               | Yes          | Yes          |
| Backup/point-in-time 恢复    | Yes          | Yes          |
| Cluster database support     | No           | No           |
| Clustered indexes            | No           | Yes          |
| Compressed data              | Yes (note 2) | Yes          |
| Data caches                  | No           | Yes          |
| Encrypted data (note 3)      | Yes          | Yes          |
| Foreign key support          | No           | Yes          |
| Full-text search indexes     | Yes          | Yes (note 5) |
| Geospatial data type support | Yes          | Yes          |
| Geospatial indexing support  | Yes          | Yes (note 6) |
| Hash indexes                 | No           | No (note 7)  |
| Index caches                 | Yes          | Yes          |
| Locking granularity          | Table        | Row          |
| MVCC                         | No           | Yes          |
| Replication support (note 1) | Yes          | Yes          |
| Storage limits               | 256TB        | 64TB         |
| T-tree indexes               | No           | No           |
| Transactions                 | No           | Yes          |
| Update statistics for data dictionary YesYes
 

官方网站怎么说MyISAM?

#原文
MyISAM: These tables have a small footprint. Table-level locking limits the performance in read/write workloads, 
so it is often used in read-only or read-mostly workloads in Web and data warehousing configurations.
#意思
MyISAM应用范围比较小。它是表级锁,对并发读写有限制。适合只读或者大部分是读的web应用或者配置性的数据仓库。
 

数据损坏

InnoDB会自动恢复,MyISAM需要定时检查
 

我的结论

* 从查询速度上,实际中是分批取数查询数据差不多
* 从特性上,InooDB是MyISAM的超集
* 从功能上,InooDB功能更强
 
所以网上说读多写少选择MyISAM是人云亦云。
当然存在肯定有理由,为了兼容性,还是什么?
 
此生必看的科学实验-水知道答案
精神病为什么治不好
百病之源

 

Tags: myisam
评论(0) 阅读(457)

思考大表统计的优化策略

2018年9月15日 10:02

前言

以InnoDB表的count()统计为例子
 

方法一、直接count,全表扫描

 
* 分析
#一千万条数据count()耗费几十秒,基本不用干活
select count(*) from user
 

方法二、分段count,全表扫描转变为范围扫描

加上where条件,将全部查询,拆分成多个小范围查询。
#全表扫描
select count(*) from user;

#范围扫描
select count(*) from user where id < 1000000;
select count(*) from user where id>=1000000 and id < 200000;
 
实际中需要打开多个数据库连接,实际操作不是很好。
 
当然分表也属于这类方法
 
* 原理
大问题拆成多个小问题,将压力分散。
 
* 分析
如果是同一张表,需要打开多个连接(多个进程)
如果分库了,需要连接多个数据库
虽然能减少时间,但是不能减小压力,count()次数多了,系统也得嗝屁
 

方法三、增加tinyint类型字段并建立二级索引用于统计

 
二级索引key是索引列,value是主键,一个数据页上可以存储更多数据,减少了分页于页内移动
 
#加了之后再统计,mysql解析器会自动选择二级索引进行count
select count(*) from user
 
* 原理
利用二级索引减少io
 
* 分析
能够减少时间,也能减小io压力,但是时间还是挺长的,我的机器上一千万数据从5秒编成了1秒,1秒还是比较长
假如加上where条件呢,这个方法就用不上了
 

方法四、维护一张统计表

凡事预则立,不预则废,设计时加一张tablename_stats表(两个字段key,value),平时就维护好统计数据,增加时加1,删除时减1。
 
key value 
count                   1111
hash值(表名+where条件) 1111
 
* 原理
提前准备
 
* 分析
毫无压力,效果完美
需要提前预测好可能的统计以及条件
缺点存在统计数据与实际不一致,在大数据量情况下,也不算什么问题
 

总结

最终发现还是类似myisam维护一个计数器是最好的方法。
只是我们扩展了功能,加了where条件也提前计数
 
 
《了凡四训》详解之改过之法
印光大师十念法(胡小林主讲第1集)
此生必看的科学实验-水知道答案

 

Tags: 大数据
评论(0) 阅读(370)

myisam锁表机制

2018年9月15日 06:34

 锁表机制

* 锁机制
a.加锁读(共享锁):不会阻塞读请求(select, insert),阻塞写(update, delete)请求
b.加锁写(独占锁):阻塞其它读写(select, delete, insert, update)请求
 
* 锁耗时多长?
取决于sql耗时。控制好sql耗时,锁表的问题并不大。
 

测试数据

生成1000万条测试数
表user(id, username, password, age, sex)
存储引擎MyISAM,id建立索引
 

加锁读,select,insert不阻塞,update,delete阻塞

打开两个客户端
客户端1全表扫描
select * from user
 
客户端2测试阻塞
select * from user limit 10;    #结果不阻塞

insert into user(username, password, age, sex)
values("aaaaa", "bbbb", 10, 1); #结果不阻塞

update user set username="aaa" where id =90001;  #结果阻塞

delete from user where id =10000006  #结果阻塞
 

加锁写,select,insert,update,delete都阻塞

 
客户端1全表扫描写
update set username="aaaaaaa"
where username="不存在的值让sql扫描全表"
 
客户端2测试阻塞
重复上面select, insert, update, delete结果都阻塞
 
此生必看的科学实验-水知道答案
《了凡四训》详解之改过之法
印光大师十念法(胡小林主讲第1集)

 

评论(0) 阅读(381)

flask自动重连数据库

2018年9月12日 07:43

情况描述

flask部署到服务器后,日志中经常出现错误
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (2006, "MySQL server has gone away (ConnectionResetError(104, 'Connection reset by peer'))")
 
看下mysql日志,大量的超时导致的连接中断
...
2018-09-10  6:45:41 768 [Warning] Aborted connection 768 to db: 'xxx' user: 'xxx' host: '172.22.0.2' (Got timeout reading communication packets)
 

问题原因

原因就是因为连接超时,数据库把它中断了,详细参见官方文档.
https://dev.mysql.com/doc/refman/8.0/en/communication-errors.html
 

起因

  • 查看数据库配置。不要混淆全局配置与会话配置
show global variables like '%timeout%'

connect_timeout      5
interactive_timeout  28800
wait_timeout      600
 
  • 配置文件my.cnf
connect_timeout         = 5
wait_timeout            = 600
 

错误复现

mysql错误日志两次中断间隔时间都大于600秒,有可能是该参数引起,将它设置小点看看,能否复现问题
set global wait_timeout=10
 
设置后,问题又出现了,可以缺点就是该参数引起的。
 

自动重连实现

无论wait_timeou设置多大,总会出现超时,该怎么避免呢?简单方法是定时重连接.
 
flask已经提供了重连参数。连接超过一定时间就将它回收。
SQLALCHEMY_POOL_RECYCLE=7200   # 默认2小时。该值一定要比数据库wait_timeout小,否则它不起作用,上面就是这个原因导致的
 
当然这个方法不会帮我们处理网络异常导致的连接失效时重连
 
《了凡四训》详解之改过之法
印光大师十念法(胡小林主讲第1集)
此生必看的科学实验-水知道答案

 

Tags: flask
评论(0) 阅读(903)

sysbench测试mysql性能瓶颈

2018年9月06日 05:56

前言

任何系统,业务越来越繁忙,最终都会遇到性能瓶颈。能够提前知道系统大概的瓶颈,是很重要的。下面介绍一个测试工具sysbench
 

安装

yum install sysbench
#下面测试中版本是1.0.9
 

github

https://github.com/akopytov/sysbench
 

测试mysql

sysbench --test=/usr/share/sysbench/oltp_insert.lua
--mysql-host=127.0.0.1
--mysql-port=3306
--mysql-user=root
--mysql-password=111111
--mysql-db=mytest
--db-driver=mysql  prepare
 
  • * 参数选项
--test 指定测试所用lua脚本
--mysql-host
--mysql-port
--mysql-user
--mysql-password
--mysql-db
--db-driver=mysql  #指定db类型
prepare 测试前准备工作
run 正式测试
cleanup 测试后删掉测试数据
 
  • * lua脚本的参数选项
详细参数看lua脚本/usr/share/sysbench/oltp_common.lua
下面是常用参数
--mysql_storage_engine=innodb   #表引擎
--table_size=10000              #表大小
--tables=10                     #表数量
 
《了凡四训》详解之改过之法
印光大师十念法(胡小林主讲第1集)
此生必看的科学实验-水知道答案
 

Tags: sysbench
评论(0) 阅读(527)

快速生成千万条mysql数据

2018年9月06日 05:33

目的

学习、测试mysql大数据场景,需要先生成大量数据。
 

思路

mysql官方文档说得很清楚。"load data infile"导入数据比insert要快20倍。所以我们先生成一千万条数据的文件。
然后将数据导入表中。
 
## 生成数据
假如有个用户表(id, username,password, age, sex),id是自动增长,我们现在需要生成username等信息
生成一千万条数据,速度还能接受,耗时236秒,文件大小315M。
 
import string
import random

def random_str(length=1):
    template = string.letters + string.digits
    chars = random.sample(template, length)
    return "".join(chars)

def generate_record():
    """
    username/password/age/sex
    """
    length = random.randint(6, 20)
    username = random_str(length)

    length = random.randint(6, 20)
    password = random_str(length)

    age = random.randint(10, 100)
    sex = random.choice([0, 1])
    return [username, password, age, sex]

def create_file(num=10000000):
    with open("user_data.txt", "w") as f:
        for i in range(num):
            row = generate_record()
            f.write(",".join(map(str, row))+"\n")

if __name__ == '__main__':
    import datetime
    start = datetime.datetime.now()
    create_file()
    end = datetime.datetime.now()
    cost = (end -start).total_seconds()
    print("cost: %s" % cost)
#一千万条,耗时236s,文件315M
 

导入

load data infile命令有安全限制,最好是把数据拿到mysql server端,再通过mysql -uxxx -pxxx进入命令,再导入。
我的虚拟机导入耗时57秒
 
load data infile "/user_data.txt" into table user
fields terminated by ','
lines terminated by '\n'
(username, password, age, sex);
 
 

其它方式

  • 测试工具sysbench
sysbench是批量insert,性能比不上导入。但是它更接近实际场景
 
  • 存储过程
速度很快,但是不如用熟悉的脚本方便
 
此生必看的科学实验-水知道答案
《了凡四训》详解之改过之法
印光大师十念法(胡小林主讲第1集)
 

Tags: python
评论(0) 阅读(485)

Host '10.0.2.2' is not allowed to connect to this MariaDB server

2015年2月27日 16:01

Host '10.0.2.2' is not allowed to connect to this MariaDB server
mysql的root用户,默认只允许本地访问。当使用客户端连接远程mysql服务器,出现上面错误。只要在mysql.user表添加远程用户即可.
  • 进入mysql命令行界面
➜  ~  mysql -u root -p
Enter password: 
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 13
Server version: 5.5.41-MariaDB MariaDB Server

Copyright (c) 2000, 2014, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
  • 执行下面insert
insert into mysql.user(Host,User,Password) values("%","root",password("root"));insert into mysql.user(Host,User,Password) values("%","root",password("root"));
 

评论(0) 阅读(4913)