Azure Eventhub Python SDK:在时间窗口中所有分区的所有批次进行批处理

问题描述

我目前正在尝试从具有32个分区的Azure EventHub中实现批量读取。更准确地说,我正在尝试读取包含最近60分钟内收到的所有事件的批处理。

问题在于,在使用consumer.receive_batch()方法时,它会触发对同一分区内的批次的on_event()回调方法调用。例如,分区6中的6个事件触发一个呼叫,而分区9中的7个事件触发另一个呼叫。 我想让每个分区中的所有事件触发一次on_event()回调方法调用。我目前正在使用类似的方法EventHubConsumerClient置于监听模式:


self.consumer = EventHubConsumerClient.from_connection_string(
            conn_str=conn_str,consumer_group=consumer_group,eventhub_name=eventhub_name
        )

with self.consumer as consumer:
     consumer.receive_batch(
         on_event_batch=on_event_callback_method,starting_position=timestamp_60_minutes_ago,starting_position_inclusive=True,max_batch_size=999999999999,max_wait_time=60 # to let time to the receiver to gather all the messages
         )

我目前正在考虑N个线程(这里N为32)从1-32读取分区,然后减少一个统一的事件列表,但不确定如何继续,即使我没有深入研究兔子孔。很高兴就此发表您的看法!我们当前的实现依赖于Databricks对EventHub(似乎能够做到)的支持,但我们希望与它保持一段距离,并使用官方的SDK。

解决方法

您所建议的不是在左侧字段中出现的,但是如果您愿意进行稍有不同的汇总,那么可能会使自己的工作量超出所需。

与其生成32个使用者,而不是让您潜在地管理故障模式,生命周期,线程管理和所有其他开销,何不只是按照减少为一个列表的方式执行您描述的内容,而是通过您现在拥有的单一事件中心消费者? queue.Queue()here)中的import queue应该为您提供开箱即用所需的所有同步逻辑,以让您有一个工作人员来处理从该漏斗中读取的汇总。>

我首先要指出的是,这与仅在分区分区回调中执行逻辑没有太大的语义差异,因为它们将以大致相同的顺序馈入聚合队列,但是如果您的目标是是在较大的连续窗口中进行处理,或者在一个逻辑读取中查看跨分区的扩展,以上内容将使您实现这一目标。 (作为记录,您的建议也会这样做,但这需要更多的线程+客户端管理,而不是一个生产者和一个消费者。)

如果这不能解决您的问题或需要进一步说明,请随时与我联系;全面披露我是event-hubs python SDK的维护者之一,您也可以随时向我们发送问题,随时可以通过our github与我们联系。

,

为了后代(并感谢@Kibrantn的帮助),我最终最终使用了Threading / Queue模式,如下所示。这将为每个可用分区旋转一个线程,并发接收,在线程安全队列中聚合,在close()接收者之前执行N秒,最后将所有内容聚合到一个列表中:

from queue import Queue
import threading

from azure.eventhub import EventHubConsumerClient


class ReceiverClass:

    def _callback_process_data(self,partition_context,events):
        """
        Generic method used as a processing callback for all the events batches
        captured. This follows the Transform => ML => Post downstream workflow.
        """
        # Aggregate data into the aggregation Queue
        self.events_aggregation_queue.put(events)

    def receive_data(self):

        # Initialize the consumer
        self.consumer = EventHubConsumerClient.from_connection_string(
                    conn_str=self.config.connection_string,consumer_group=self.config.consumer_group,eventhub_name=self.config.eventhub_name
                )

        # Initialize the aggregation queue to gather all the EventData together (the callback take care of that)
        self.events_aggregation_queue = Queue()

        # Create a reception thread for each partition
        for partition_id in self.consumer.get_partition_ids():
            worker = threading.Thread(target=self.consumer.receive_batch,kwargs={"on_event_batch": self._callback_process_data,"starting_position": self.config.data_window.receiving_from_time,"starting_position_inclusive": True,"partition_id": partition_id})
            worker.start()

        # Aggregating for N seconds before stopping
        time.sleep(self.config.aggregation_wait_time_in_seconds)

        # Aggregate the data from the async Queue
        events_data_nested = [self.events_aggregation_queue.get() for _ in range(self.events_aggregation_queue.qsize())]
        events_data = [event_data for sublist in events_data_nested for event_data in sublist]
        logger.info(f"Received a list of {len(events_data)} EventData...")

        return events_data