问题描述
我用python写了一个代码来操作数据库中的表。我正在使用sql Alchemy。基本上我有表1,其中有2 500 000条目。我还有另一个具有20000个条目的表2。基本上,我想做的是将表1中的源ip和dest ip与表2中的源ip和dest ip进行比较。如果存在匹配,我将表1中的ip源和ip dest替换为以下数据:匹配表2中的ip source和ip dest,并在表3中添加条目。我的代码还检查条目是否不在新表中。如果是这样,它将跳过它,然后继续下一行。 我的问题是它非常慢。我昨天启动了我的脚本,在24小时内,它仅经历了2500万个条目中的47 000个条目。我想知道是否仍然可以加快该过程。这是一个postgres db,我无法确定花费这么多时间的脚本是否合理,或者是否有问题。如果有人对类似的事情有类似的经验,那么完成之前需要花费多少时间? 非常感谢。
session = Session()
i = 0
start_id = 1
flows = session.query(Table1).filter(Table1.id >= start_id).all()
result_number = len(flows)
vlan_list = {"['0050']","['0130']","['0120']","['0011']","['0110']"}
while i < result_number:
for flow in flows:
if flow.vlan_destination in vlan_list:
usage = session.query(Table2).filter(Table2.ip ==
str(flow.ip_destination)).all()
if len(usage) > 0:
usage = usage[0].usage
else:
usage = str(flow.ip_destination)
usage_ip_src = session.query(Table2).filter(Table2.ip ==
str(flow.ip_source)).all()
if len(usage_ip_src) > 0:
usage_ip_src = usage_ip_src[0].usage
else:
usage_ip_src = str(flow.ip_source)
if flow.protocol == "17":
protocol = func.REPLACE(flow.protocol,"17",'UDP')
elif flow.protocol == "1":
protocol = func.REPLACE(flow.protocol,"1",'ICMP')
elif flow.protocol == "6":
protocol = func.REPLACE(flow.protocol,"6",'TCP')
else:
protocol = flow.protocol
is_in_db = session.query(Table3).filter(Table3.protocol ==
protocol)\
.filter(Table3.application == flow.application)\
.filter(Table3.destination_port == flow.destination_port)\
.filter(Table3.vlan_destination == flow.vlan_destination)\
.filter(Table3.usage_source == usage_ip_src)\
.filter(Table3.state == flow.state)\
.filter(Table3.usage_destination == usage).count()
if is_in_db == 0:
to_add = Table3(usage_ip_src,usage,protocol,flow.application,flow.destination_port,flow.vlan_destination,flow.state)
session.add(to_add)
session.flush()
session.commit()
print("added " + str(i))
else:
print("usage already in DB")
i = i + 1
session.close()
编辑根据要求,这里有更多详细信息:表1有11列,我们感兴趣的两列是源ip和dest ip。 Table 1 在这里,我有表2:Table 2。它有一个IP和一个用法。我的脚本在做什么,它从表1中获取源ip和dest ip,并在表2中查找是否匹配。如果是,它将按用法替换ip地址,并将其与ip中的某些列一起添加表3中的表1:[Table3] [3] 为此,将协议列添加到表3中时,它将写协议名称而不是数字,只是为了使其更具可读性。
编辑2 我试图以不同的方式思考这个问题,所以我绘制了一个问题图Diagram (X problem) 我要弄清楚的是我的代码(Y解决方案)是否按预期工作。我只用python编码一个月,感觉好像搞砸了。我的代码应该从表1中获取每一行,然后将其与表2进行比较,然后将数据添加到表3中。我的表1拥有超过200万个条目,可以理解,虽然需要花费一些时间,但是它太慢了。例如,当我不得不将数据从API加载到数据库时,它的速度比我试图处理数据库中所有数据的比较要快。我正在具有足够内存的虚拟机上运行我的代码,因此我确定这是我的代码不足,因此我需要改进的方向。我的表格的屏幕截图:
表2
表3
表1
编辑3 :PostgreSQL查询
SELECT
coalesce(table2_1.usage,table1.ip_source) AS coalesce_1,coalesce(table2_2.usage,table1.ip_destination) AS coalesce_2,CASE table1.protocol WHEN %(param_1) s THEN %(param_2) s WHEN %(param_3) s THEN %(param_4) s WHEN %(param_5) s THEN %(param_6) s ELSE table1.protocol END AS anon_1,table1.application AS table1_application,table1.destination_port AS table1_destination_port,table1.vlan_destination AS table1_vlan_destination,table1.state AS table1_state
FROM
table1
LEFT OUTER JOIN table2 AS table2_2 ON table2_2.ip = table1.ip_destination
LEFT OUTER JOIN table2 AS table2_1 ON table2_1.ip = table1.ip_source
WHERE
table1.vlan_destination IN (
%(vlan_destination_1) s,%(vlan_destination_2) s,%(vlan_destination_3) s,%(vlan_destination_4) s,%(vlan_destination_5) s
)
AND NOT (
EXISTS (
SELECT
1
FROM
table3
WHERE
table3.usage_source = coalesce(table2_1.usage,table1.ip_source)
AND table3.usage_destination = coalesce(table2_2.usage,table1.ip_destination)
AND table3.protocol = CASE table1.protocol WHEN %(param_1) s THEN %(param_2) s WHEN %(param_3) s THEN %(param_4) s WHEN %(param_5) s THEN %(param_6) s ELSE table1.protocol END
AND table3.application = table1.application
AND table3.destination_port = table1.destination_port
AND table3.vlan_destination = table1.vlan_destination
AND table3.state = table1.state
)
)
解决方法
鉴于当前的问题,我认为这至少与您可能追求的目标接近。这个想法是在数据库中执行整个操作,而不是获取所有内容(整个2,500,000行)并在Python中进行过滤等。
from sqlalchemy import func,case
from sqlalchemy.orm import aliased
def newhotness(session,vlan_list):
# The query needs to join Table2 twice,so it has to be aliased
dst = aliased(Table2)
src = aliased(Table2)
# Prepare required SQL expressions
usage = func.coalesce(dst.usage,Table1.ip_destination)
usage_ip_src = func.coalesce(src.usage,Table1.ip_source)
protocol = case({"17": "UDP","1": "ICMP","6": "TCP"},value=Table1.protocol,else_=Table1.protocol)
# Form a query producing the data to insert to Table3
flows = session.query(
usage_ip_src,usage,protocol,Table1.application,Table1.destination_port,Table1.vlan_destination,Table1.state).\
outerjoin(dst,dst.ip == Table1.ip_destination).\
outerjoin(src,src.ip == Table1.ip_source).\
filter(Table1.vlan_destination.in_(vlan_list),~session.query(Table3).
filter_by(usage_source=usage_ip_src,usage_destination=usage,protocol=protocol,application=Table1.application,destination_port=Table1.destination_port,vlan_destination=Table1.vlan_destination,state=Table1.state).
exists())
stmt = insert(Table3).from_select(
["usage_source","usage_destination","protocol","application","destination_port","vlan_destination","state"],flows)
return session.execute(stmt)
如果vlan_list
是选择性的,换句话说,过滤掉了大多数行,这将在数据库中执行更少的操作。根据{{1}}的大小,您可能会受益于索引Table2
,但首先进行测试。如果相对较小,我猜想PostgreSQL将在那里执行散列或嵌套循环连接。如果用于过滤Table2.ip
中重复项的列中的某些列是唯一的,则可以执行Table3
而不是使用INSERT ... ON CONFLICT ... DO NOTHING
子查询表达式删除SELECT
中的重复项(哪个PostgreSQL将作为反连接执行)。如果NOT EXISTS
查询可能会产生重复,请向其添加一个对flows
的调用。