如何配置EventProcessorClient仅读取特定分区键而不是分区ID的事件?

问题描述

我有一个带有2个分区的事件中心,并使用以下代码使用不同的分区键(基于https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs的文档)向其发送事件。我正在使用用于.NET的Azure.Messaging.EventHubs库(带有.net核心3.1)

await using (var producer = new EventHubProducerClient(connectionString,eventHubName))
{
    using EventDataBatch eventBatch = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionA" });

    eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First")));
    eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second")));
    await produce.SendAsync(eventBatch);

    using EventDataBatch eventBatch2 = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionB" });

    eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third")));
    eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Fourth")));

    await producer.SendAsync(eventBatch2);
}

如您所见,我发送了第一批包含2个事件的分区键作为MyPartitionA,发送了第二批包含2个事件的分区键作为MyPartitionB。有趣的是,来自两个分区键的事件都进入了同一分区(即事件中心上的分区0)。

在接收方,我尝试使用https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs.Processor#start-and-stop-processing处的代码示例,如下所示(我正在使用.NET的Azure.Messaging.EventHubs.Processor库。)

async Task processEventHandler(ProcessEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an event
        await DoSomethingWithTheEvent(eventArgs.Partition,eventArgs.Data);
    }
    catch
    {
        // Handle the exception from handler code
    }
}

async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
    try
    {
        // Perform the application-specific processing for an error
        await DoSomethingWithTheError(eventArgs.Exception);
    }
    catch
    {
        // Handle the exception from handler code
    }   
}

private async Task ProcessUntilCanceled(CancellationToken cancellationToken)
{
    var storageClient = new BlobContainerClient(storageConnectionString,blobContainerName);
    var processor = new EventProcessorClient(storageClient,consumerGroup,eventHubsConnectionString,eventHubName);

    processor.ProcessEventAsync += processEventHandler;
    processor.ProcessErrorAsync += processErrorHandler;
    
    await processor.StartProcessingAsync();
    
    try
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1));
        }
        
        await processor.StopProcessingAsync();
    }
    finally
    {
        // To prevent leaks,the handlers should be removed when processing is complete
        processor.ProcessEventAsync -= processEventHandler;
        processor.ProcessErrorAsync -= processErrorHandler;
    }
}

但是我在上面的代码中找不到一种方法来仅接收给定分区的事件(例如MyPartitionA),而不接收其他分区的事件(例如MyPartitionB)。

  1. 是否可以注册处理器以基于特定分区键(而非分区ID)接收事件?
  2. 如果将具有分区键MyPartitionA和MyPartitionB的事件都发送到事件中心的分区0,是否仍可能仅接收单个分区键的事件(例如MyPartitionA),而不接收其他不具有相同分区的事件键,即使它们可能位于事件中心的同一分区中?

解决方法

您无法使用SDK中的任何客户端读取基于分区键的事件。

分区键是一个综合概念,在发布事件后不会保留。当您使用分区键进行发布时,该键将被散列,并且结果值将用于选择将事件路由到的分区;其目的是确保将相关事件路由到同一分区,而无需了解选择了哪个分区并且不提供任何公平分配的保证。

要完成您要进行的过滤,您需要将分区键存储为事件中的application property,然后在您的ProcessEventAsync处理程序中将该值用作过滤器。请注意,您将从所有分区接收所有事件-这是EventProcessorClient的主要目标。

我不认为我们对您的应用程序场景了解得足够多,无法帮助您确定最佳方法,但是基于我们所知道的知识,我建议您考虑采用其他方法。由于您似乎需要显式读取一组事件,因此使用其ID(而不是键)发布到知名分区可能会有所帮助。然后,您可以使用EventHubConsumerClient::ReadEventsFromPartitionAsync方法从该分区中专门读取事件。当然,这将要求您还明确控制其他事件在应用程序中的发布位置,以确保将它们路由到第二个分区。

,

EventHubs是高吞吐量的持久流,它提供了流级别的语义(与服务总线相比)

现在问您的问题,您希望基于事件的属性(事件级操作-不是流级操作)过滤事件。

没有直接方法可以满足您的要求。

该方法是为您自己实现自定义解决方案-从EventHubs(事件流)中提取事件并通过PartitionKey进行中间处理t0过滤,并将其推送到另一个事件中心或替代机制,以便可以相应地使用它

或者,如果您正在考虑使用服务总线(如果这符合您的要求),则可以参考以下内容:

分区键属性将用于标识分区 session-id时必须在其中存储消息的队列中 邮件的属性未设置。

您可以使用 AcceptMessageSession([PartitionKey])

来接收来自特定分区键或会话ID的消息。

参考:AcceptMessageSession(String)

var partitionlistener = qc.AcceptMessageSession("MyPartitionA");
var message = partitionlistener.Receive();

qc = Queue client

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...