如何使用python快速将消息发送到Azure队列存储?

问题描述

我正在尝试使用python azure.storage.queue库向azure发送大量消息(数千万条),但是这样做花费了很长时间。我正在使用的代码如下:

from azure.storage.queue import (
    QueueClient,BinaryBase64EncodePolicy,BinaryBase64DecodePolicy
)

messages = [example list of messages]
connectionString = "example connection string"
queueName = "example-queue-name"

queueClient = QueueClient.from_connection_string(connectionString,queueName)
for message in messages:
    queueClient.send_message(message)

当前发送大约70,000条消息大约需要3个小时,考虑到需要发送的潜在消息数量,这太慢了。

我浏览了文档以尝试找到批处理选项,但是似乎不存在:https://docs.microsoft.com/en-us/python/api/azure-storage-queue/azure.storage.queue.queueclient?view=azure-python

我还想知道是否有人有使用asynchio库加快该过程的经验,并且可以建议如何使用它?

解决方法

尝试一下:

from azure.storage.queue import (
    QueueClient,BinaryBase64EncodePolicy,BinaryBase64DecodePolicy
)
from concurrent.futures import ProcessPoolExecutor
import time

messages = []

messagesP1 = messages[:len(messages)//2] 
messagesP2 = messages[len(messages)//2:] 

print(len(messagesP1))
print(len(messagesP2))

connectionString = "<conn str>"
queueName = "<queue name>"

queueClient = QueueClient.from_connection_string(connectionString,queueName)

def pushThread(messages):
   for message in messages:
       queueClient.send_message(message)



def callback_function(future):
    print('Callback with the following result',future.result())

tic = time.perf_counter()

def main():
    with ProcessPoolExecutor(max_workers=2) as executor:
        future = executor.submit(pushThread,messagesP1)
        future.add_done_callback(callback_function)
        future2 = executor.submit(pushThread,messagesP2)
        while True:
            if(future.running()):
                print("Task 1 running")
            if(future2.running()):
                print("Task 2 running")

            if(future.done() and future2.done()):
                print(future.result(),future2.result())
                break

if __name__ == '__main__':
    main()


toc = time.perf_counter()
    
print(f"spent {toc - tic:0.4f} seconds")

如您所见,我将消息数组分为两部分,并使用2个任务将数据同时推送到队列中。根据我的测试,我大约有800条消息,花了我94秒来推送所有消息: enter image description here

但是使用上面的方法,它花了我48岁:

enter image description here