问题描述
我正在尝试使用python azure.storage.queue库向azure发送大量消息(数千万条),但是这样做花费了很长时间。我正在使用的代码如下:
from azure.storage.queue import (
QueueClient,BinaryBase64EncodePolicy,BinaryBase64DecodePolicy
)
messages = [example list of messages]
connectionString = "example connection string"
queueName = "example-queue-name"
queueClient = QueueClient.from_connection_string(connectionString,queueName)
for message in messages:
queueClient.send_message(message)
当前发送大约70,000条消息大约需要3个小时,考虑到需要发送的潜在消息数量,这太慢了。
我浏览了文档以尝试找到批处理选项,但是似乎不存在:https://docs.microsoft.com/en-us/python/api/azure-storage-queue/azure.storage.queue.queueclient?view=azure-python
我还想知道是否有人有使用asynchio库加快该过程的经验,并且可以建议如何使用它?
解决方法
尝试一下:
from azure.storage.queue import (
QueueClient,BinaryBase64EncodePolicy,BinaryBase64DecodePolicy
)
from concurrent.futures import ProcessPoolExecutor
import time
messages = []
messagesP1 = messages[:len(messages)//2]
messagesP2 = messages[len(messages)//2:]
print(len(messagesP1))
print(len(messagesP2))
connectionString = "<conn str>"
queueName = "<queue name>"
queueClient = QueueClient.from_connection_string(connectionString,queueName)
def pushThread(messages):
for message in messages:
queueClient.send_message(message)
def callback_function(future):
print('Callback with the following result',future.result())
tic = time.perf_counter()
def main():
with ProcessPoolExecutor(max_workers=2) as executor:
future = executor.submit(pushThread,messagesP1)
future.add_done_callback(callback_function)
future2 = executor.submit(pushThread,messagesP2)
while True:
if(future.running()):
print("Task 1 running")
if(future2.running()):
print("Task 2 running")
if(future.done() and future2.done()):
print(future.result(),future2.result())
break
if __name__ == '__main__':
main()
toc = time.perf_counter()
print(f"spent {toc - tic:0.4f} seconds")
如您所见,我将消息数组分为两部分,并使用2个任务将数据同时推送到队列中。根据我的测试,我大约有800条消息,花了我94秒来推送所有消息:
但是使用上面的方法,它花了我48岁: