问题描述
我想知道如何以并行方式将流数据写入不同的 MysqL 表?
我有以下代码:其中 GetStreaming()
返回调用时可用的元组 [(tbName,data1,data2),(tbName,...]
列表。
import MysqL.connector
from pathos.multiprocessing import ProcessingPool as Pool
def foo(tb1,d1,d2):
global cursor
stmt = lambda tb1: "INSERT INTO {:} (col1,col2) VALUES (%s,%s)".format(tb1)
cursor.execute(stmt(tb1),(d1,d2))
if __name=="__main__":
cnx = MysqL.connector.connect(**config)
cursor = cnx.cursor()
for _ in range(int(1e5)):
data = GetStreaming()
with Pool() as p:
p.map(foo,data)
cnx.commit()
cursor.close()
cnx.close()
如果我用普通的 for 循环替换 with Pool() as p: p.map(foo,data)
for each in data:
foo(each)
代码可以工作。但是,使用多处理方式,我不能再这样做了。我尝试了多种方法,但出现了 MysqL 断开连接或不可picklable 之类的错误。
解决方法
每个“并行”插入过程都需要自己的连接器和光标。您无法在任何类型的线程中共享它们。
您可以使用 connection pooling 加快连接的分配和释放。
在 MySQL(或任何成本低于小国 GDP 的 DBMS)中,没有任何魔法可以让它扩展以同时处理大约 100 个连接上的大规模数据插入。矛盾的是,由于它们之间存在争用,因此更多连接的吞吐量可能低于更少连接的吞吐量。您可能需要重新考虑您的系统架构,以便通过一些连接使其正常工作。
换句话说:大表越少,性能越好。
最后,阅读有关加速批量插入的方法。例如这种多行插入
INSERT INTO tbl (a,b) VALUES (
(1,2),(3,4),(5,6) );
运行速度几乎是连续插入的三倍
INSERT INTO tbl (a,b) VALUES (1,2);
INSERT INTO tbl (a,b) VALUES (3,4);
INSERT INTO tbl (a,b) VALUES (5,6);
除非你这样做:
START TRANSACTION;
INSERT INTO tbl (a,6);
COMMIT;
因为插入的艰苦工作发生在 COMMIT 时间。对于许多实际用例,我的经验表明,大约 100 行的数据块效果很好。
插入速度的黄金标准是 LOAD DATA INFILE。不过,为此您需要将数据保存在 csv 或类似 csv 的文件中。