问题描述
我正在测试用于 Azure 服务总线的 Python SDK,以将消息发送到队列。基本测试工作正常,因为我向队列发送了一个字符串,如下所示:
import csv
from azure.servicebus import ServiceBusClient,ServiceBusMessage
CONNECTION_STR = "CONN_STR"
QUEUE_NAME= "queue name"
def send_single_message(sender):
message = ServiceBusMessage("Single Message")
sender.send_messages(message)
print("Sent a single message")
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR,logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
send_single_message(sender)
print("Done sending messages")
print("-----------------------")
with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME,max_wait_time=5)
with receiver:
for msg in receiver:
print("Received: " + str(msg))
receiver.complete_message(msg)
现在我想要实现的下一步是遍历一个 csv 文件,并为每一行将其发送到队列中。
所以我尝试遍历这个 csv 文件,并将这些行发送到队列中。如下:
def send_a_list_of_messages(sender):
to_queue = []
with open('final_result.csv') as f:
reader = csv.reader(f)
for row in reader:
to_queue.append(row)
print(to_queue)
message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
sender.send_messages(message)
print("Sent a single message")
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR,logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
send_a_list_of_messages(sender)
print("Done sending messages")
print("-----------------------")
Traceback (most recent call last):
File "servicebus.py",line 23,in <module>
send_a_list_of_messages(sender)
File "servicebus.py",line 13,in send_a_list_of_messages
message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
File "servicebus.py",in <listcomp>
message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py",line 110,in __init__
self._build_message(body)
File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py",line 190,in _build_message
raise TypeError(
TypeError: ServiceBusMessage body must be a string,bytes,or None. Got instead: <class 'list'>
我知道服务需要特定的正文类型,但我很确定 to_queue[]
将是字符串类型。
如果有人能帮我解决这个问题,我将不胜感激。 如果您有任何其他问题,请告诉我。
非常感谢
编辑:
我通过将 ServiceBusMessage 转换为字符串来解决正文类型问题,如下所示:
message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]
但现在我收到关于大小限制的错误:
azure.servicebus.exceptions.MessageSizeExceededError: ServiceBusMessageBatch has reached its size limit: 262144
谁能帮我解决这个问题。
我尝试以不同的方式处理工作流程,该方法应为每一行触发多次。如下:
for msg in to_queue:
print(msg)
def send_a_list_of_messages(sender):
print(to_queue)
message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]
sender.send_messages(message)
print("Sent a single message")
但我得到了同样的错误。
我决定实现一个 for 循环来触发 csv 文件中每一行的函数,从而暂时克服了大小问题。
for msg in to_queue:
# print(msg)
def send_a_list_of_messages(sender):
# print(to_queue)
message = [ServiceBusMessage(str(msg))]
sender.send_messages(message)
print("msg sent: " + str(msg))
解决方法
-
ServiceBusMessage 仅接受 str/bytes,但最初您以列表形式发送。如果在您转换为
str
后它仍然失败,则可能是其他地方出了问题。你能提供一个你的 csv 文件的样本吗? -
当列表/批量消息的大小超过最大限制时,会发生
MessageSizeExceededError
。您可以在此处阅读有关限制和批次的更多信息:https://docs.microsoft.com/en-us/python/api/azure-servicebus/azure.servicebus.servicebusmessagebatch?view=azure-python。 为确保您不超过限制并安全地发送您的一批邮件,您应该使用以下模式:https://github.com/Azure/azure-sdk-for-python/blob/ac60fb93ea23f32e5ed5a83532c1ed6aa49921b5/sdk/servicebus/azure-servicebus/samples/sync_samples/send_queue.py#L32
如果您有任何问题,请告诉我!