Service Fabric 中的事件中心使用者

问题描述

我正在尝试使用一种服务结构来始终如一地从 azure 事件中心提取消息。我似乎已连接好一切,但请注意,我的消费者只是停止拉动事件。

我有一个集线器,其中包含已推送到它的数千个事件。为集线器配置了 1 个分区,并且我的服务结构服务也只有 1 个分区以简化调试。

Service 启动,创建 EventHubClient,然后使用它创建 PartitionReceiver。接收器被传递到一个“EventLoop”,它在调用receiver.ReceiveAsync时进入一个“无限”。 EventLoop 的代码如下。

我观察到的是第一次通过循环时我几乎总是收到 1 条消息。第二次我收到了 103 到 200 条左右的消息。在那之后,我没有收到任何消息。似乎如果我重新启动服务,我会再次收到相同的消息 - 但那是因为当我重新启动服务时,我让它在流的开头重新启动。

希望它一直运行,直到我的 2000 条消息被消耗掉,然后它会等待我(偶尔轮询)。

我需要对 Azure.Messaging.EventHubs 5.3.0 包做一些特定的事情以使其继续拉取事件吗?

//Here is how I am creating the EventHubClient:
var connectionString = "something secret";
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
   EntityPath = "NameOfMyEventHub"
};
try
{
   m_eventHubClient = EventHubClient.Create(connectionStringBuilder);
}

//Here is how I am getting the partition receiver
var receiver = m_eventHubClient.CreateReceiver("$Default",m_partitionId,EventPosition.FromStart());

//The event loop which the receiver is passed to
private async Task EventLoop(PartitionReceiver receiver)
  {
     m_started = true;
     while (m_keepRunning)
     {
        var events = await receiver.ReceiveAsync(m_options.BatchSize,TimeSpan.FromSeconds(5));
        if (events != null) //First 2/3 times events aren't null. After that,always null and I kNow there are more in the partition/
        {
           var eventsArray = events as EventData[] ?? events.ToArray();
           m_state.NumProcessedSinceLastSave += eventsArray.Count();

           foreach (var evt in eventsArray)
           {
              //Process the event
              await m_options.Processor.ProcessMessageAsync(evt,null);

              string lastOffset = evt.SystemProperties.Offset;

              if (m_state.NumProcessedSinceLastSave >= m_options.BatchSize)
              {
                 m_state.Offset = lastOffset;
                 m_state.NumProcessedSinceLastSave = 0;
                 await m_state.SaveAsync();
              }
           }
        }
     }

     m_started = false;
  }

**EDIT,问了一个关于分区数量的问题。事件中心有一个分区,SF 服务也有一个分区。

打算使用服务结构状态来跟踪我在集线器中的偏移量,但这不是现在的问题。

为每个分区创建分区侦听器。我得到这样的分区:

public async Task StartAsync()
  {
     // slice the pie according to distribution
     // this partition can get one or more assigned Event Hub Partition ids
     string[] eventHubPartitionIds = (await m_eventHubClient.GetRuntimeinformationAsync()).PartitionIds;
     string[] resolvedEventHubPartitionIds = m_options.ResolveAssignedEventHubPartitions(eventHubPartitionIds);

     foreach (var resolvedPartition in resolvedEventHubPartitionIds)
     {
        var partitionReceiver = new EventHubListenerPartitionReceiver(m_eventHubClient,resolvedPartition,m_options);
        await partitionReceiver.StartAsync();
        m_partitionReceivers.Add(partitionReceiver);
     }
  }

调用partitionListener.StartAsync的时候,其实是创建了PartitionListener,像这样(其实比这个多一点,但是走的分支是这个:

m_eventHubClient.CreateReceiver(m_options.EventHubConsumerGroupName,EventPosition.FromStart());

感谢您的任何提示。 将

解决方法

你有多少个分区?我在您的代码中看不到您如何确保读取默认使用者组中的所有分区。

您使用 PartitionReceiver 而不是 EventProcessorHost 的任何具体原因?

对我来说,SF 似乎非常适合使用事件处理器主机。我看到已经有一个 SF integrated solution 使用有状态服务进行检查点。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...