问题描述
我有一个带有2个分区的事件中心,并使用以下代码使用不同的分区键(基于https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs的文档)向其发送事件。我正在使用用于.NET的Azure.Messaging.EventHubs库(带有.net核心3.1)
await using (var producer = new EventHubProducerClient(connectionString,eventHubName))
{
using EventDataBatch eventBatch = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionA" });
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("First")));
eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Second")));
await produce.SendAsync(eventBatch);
using EventDataBatch eventBatch2 = await producer.CreateBatchAsync(new CreateBatchOptions() { PartitionKey = "MyPartitionB" });
eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Third")));
eventBatch2.TryAdd(new EventData(Encoding.UTF8.GetBytes("Fourth")));
await producer.SendAsync(eventBatch2);
}
如您所见,我发送了第一批包含2个事件的分区键作为MyPartitionA,发送了第二批包含2个事件的分区键作为MyPartitionB。有趣的是,来自两个分区键的事件都进入了同一分区(即事件中心上的分区0)。
在接收方,我尝试使用https://github.com/Azure/azure-sdk-for-net/tree/master/sdk/eventhub/Azure.Messaging.EventHubs.Processor#start-and-stop-processing处的代码示例,如下所示(我正在使用.NET的Azure.Messaging.EventHubs.Processor库。)
async Task processEventHandler(ProcessEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an event
await DoSomethingWithTheEvent(eventArgs.Partition,eventArgs.Data);
}
catch
{
// Handle the exception from handler code
}
}
async Task processErrorHandler(ProcessErrorEventArgs eventArgs)
{
try
{
// Perform the application-specific processing for an error
await DoSomethingWithTheError(eventArgs.Exception);
}
catch
{
// Handle the exception from handler code
}
}
private async Task ProcessUntilCanceled(CancellationToken cancellationToken)
{
var storageClient = new BlobContainerClient(storageConnectionString,blobContainerName);
var processor = new EventProcessorClient(storageClient,consumerGroup,eventHubsConnectionString,eventHubName);
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
await processor.StartProcessingAsync();
try
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(1));
}
await processor.StopProcessingAsync();
}
finally
{
// To prevent leaks,the handlers should be removed when processing is complete
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
}
}
但是我在上面的代码中找不到一种方法来仅接收给定分区的事件(例如MyPartitionA),而不接收其他分区的事件(例如MyPartitionB)。
- 是否可以注册处理器以基于特定分区键(而非分区ID)接收事件?
- 如果将具有分区键MyPartitionA和MyPartitionB的事件都发送到事件中心的分区0,是否仍可能仅接收单个分区键的事件(例如MyPartitionA),而不接收其他不具有相同分区的事件键,即使它们可能位于事件中心的同一分区中?
解决方法
您无法使用SDK中的任何客户端读取基于分区键的事件。
分区键是一个综合概念,在发布事件后不会保留。当您使用分区键进行发布时,该键将被散列,并且结果值将用于选择将事件路由到的分区;其目的是确保将相关事件路由到同一分区,而无需了解选择了哪个分区并且不提供任何公平分配的保证。
要完成您要进行的过滤,您需要将分区键存储为事件中的application property,然后在您的ProcessEventAsync
处理程序中将该值用作过滤器。请注意,您将从所有分区接收所有事件-这是EventProcessorClient
的主要目标。
我不认为我们对您的应用程序场景了解得足够多,无法帮助您确定最佳方法,但是基于我们所知道的知识,我建议您考虑采用其他方法。由于您似乎需要显式读取一组事件,因此使用其ID(而不是键)发布到知名分区可能会有所帮助。然后,您可以使用EventHubConsumerClient::ReadEventsFromPartitionAsync
方法从该分区中专门读取事件。当然,这将要求您还明确控制其他事件在应用程序中的发布位置,以确保将它们路由到第二个分区。
EventHubs是高吞吐量的持久流,它提供了流级别的语义(与服务总线相比)
现在问您的问题,您希望基于事件的属性(事件级操作-不是流级操作)过滤事件。
没有直接方法可以满足您的要求。
该方法是为您自己实现自定义解决方案-从EventHubs(事件流)中提取事件并通过PartitionKey进行中间处理t0过滤,并将其推送到另一个事件中心或替代机制,以便可以相应地使用它
或者,如果您正在考虑使用服务总线(如果这符合您的要求),则可以参考以下内容:
分区键属性将用于标识分区 session-id时必须在其中存储消息的队列中 邮件的属性未设置。
您可以使用 AcceptMessageSession([PartitionKey])
来接收来自特定分区键或会话ID的消息。参考:AcceptMessageSession(String)
var partitionlistener = qc.AcceptMessageSession("MyPartitionA");
var message = partitionlistener.Receive();
qc = Queue client