在python azure事件中心消费事件

问题描述

我正在事件中心接收JSON数据。

每天,我想从事件中心读取此数据并将其存储在数据库中。为了从事件中心读取数据,我遵循以下文档:https://docs.microsoft.com/en-us/python/api/overview/azure/eventhub-readme?view=azure-python

我能够打印事件中心中的所有事件,但是我不知道如何获取这些事件并在此函数之外返回熊猫数据框。

我已经尝试过了:


def on_event_batch(partition_context,events):
    final_dataframe = pd.DataFrame()
    print("Received event from partition {}".format(partition_context.partition_id))
    for event in events:
        
        body = json.loads(next(event.body).decode('UTF-8'))
        event_df  = pd.DataFrame(body,index = [0])
        final_dataframe = pd.concat([final_dataframe,event_df],ignore_index= True)
    partition_context.update_checkpoint()
    client.close()
    print(final_dataframe)
    return final_dataframe
    
with client:
    final_dataframe = client.receive_batch(
        on_event_batch=on_event_batch,starting_position="-1",# "-1" is from the beginning of the partition.
    )
    
    # receive events from specified partition:
    # client.receive_batch(on_event_batch=on_event_batch,partition_id='0')

但它不起作用。

解决方法

client.receive_batch(on_event_batch=on_event_batch,partition_id='0')的返回类型为None。我不确定您是否可以通过在回调函数上执行返回操作来实现此目的。

但是,我认为下面更简单的方法

from azure.eventhub import EventHubConsumerClient
import pandas as pd
import json



def get_messages() :
    connection_str = '<YOUR CONNECTION STRING>'
    consumer_group = '<YOUR CONSUMER GROUP>'
    eventhub_name = '<YOUR EVENT HUB>'
    client = EventHubConsumerClient.from_connection_string(connection_str,consumer_group,eventhub_name=eventhub_name)

    final_df = pd.DataFrame()
    def on_event_batch(partition_context,events):
        print("Received event from partition {}".format(partition_context.partition_id))
        print(len(events))
        #Checking whether there is any event returned as we have set max_wait_time
        if(len(events) == 0):
        #closing the client if there is no event triggered.
            client.close()
            
        else:
        
            for event in events:
                #Event.body operation
                body=event.body
                event_df  = pd.DataFrame(body,index = [0])
                nonlocal final_df
                final_df = pd.concat([final_df,event_df],ignore_index= True)
                partition_context.update_checkpoint()

    with client:
        client.receive_batch(
            on_event_batch=on_event_batch,starting_position="-1",max_wait_time = 5,max_batch_size=2  # "-1" is from the beginning of the partition. 
            #Max_wait_time - no activitiy for that much - call back function is called with No events.
        )
    return final_df


df = get_messages()
df.head()

上面的代码实际上会在正常退出后将值设置为数据帧 df

enter image description here

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...