TPL .Net与Azure EventHub Producer并发问题

问题描述

我正在使用Azure Event Hub生产者客户端,并从kafka流中读取消息,然后将其传递给反序列化/映射,然后传递给Event Hub。我有一个消耗循环,为每个消耗创建一个任务,然后使用两种方法进行处理(从卡夫卡滞后的角度来看,这似乎大大提高了速度。但是,事件中心使您可以创建一个事件批处理,而我却没有一定要使用。我现在只想一次一次发送数据。为了创建一个新的批处理,我必须调用dispose()。我遇到了一个问题,即另外一个调用函数函数调用dispose()的时间,我收到一条错误消息,指出事件中心正在使用该对象。

我还尝试将重载用于eventHubProducerClient.SendAsync,该重载允许您传递IEnumerable,但与此同时我也遇到了同样的问题。

所以我认为这是一个同步问题,或者也许我需要在某个地方进行锁定?

任何帮助将不胜感激。

       public void Execute()
                {
                    using (_consumer)
                    {
                        try
                        {
                            _consumer.Subscribe(_streamConsumerSettings.Topic);
                            while (true)
                            {
                                var result = _consumer.Consume(1000);
        
                                if (result == null)
                                {
                                    continue;
                                }
                                var process = Task.Factory.StartNew(() => ProcessMessage(result?.Message?.Value));
                                var send = process.ContinueWith(t => SendMessage(process.Result));                        
                            }
        
                        }
                        catch (ConsumeException e)
                        {
                            _logger.LogError(e,e.StackTrace ?? e.Message);
                            _cancelConsume = true;
                            _consumer.Close();
                            RestartConsumer();
                        }
                    }
                }
    
         public static EquipmentJson ProcessMessage(byte[] result)
         {
                    var json = _messageProcessor.DeserializeAndMap(result);
                    return json;
         }
        
         public static void SendMessage(EquipmentJson message)
         {
                    try 
                    {   
        
                        _eventHubClient.AddToBatch(message);             
                        
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e,e.StackTrace ?? e.Message);
                    }
          }
    
     
    
        public async Task AddToBatch(EquipmentJson message)
                {
                    if 
      (!string.IsNullOrEmpty(message.EquipmentLocation))
                    {
                        try
                        {
                            var batch = await _equipmentLocclient.CreateBatchAsync();
                            batch.TryAdd(new EventData(Encoding.UTF8.GetBytes(message.EquipmentLocation)));
                            await _eventHubProducerClient.SendAsync(batch);
                            batch.dispose();
                            _logger.Loginformation($"Data sent {DateTimeOffset.UtcNow}");
                        }
                        catch (Exception e)
                        {
                            _logger.LogError(e,e.StackTrace ?? e.Message);
                        }
                    }
                }

 public class EventHubClient : IEventHubClient
    {
        private readonly ILoggerAdapter<EventHubClient> _logger;
        private readonly EventHubClientSettings _eventHubClientSettings;
        private IMapper _mapper;

        
        private static EventHubProducerClient _equipmentLocclient;


        public EventHubClient(ILoggerAdapter<EventHubClient> logger,EventHubClientSettings eventHubClientSettings,IMapper mapper)
        {
            _logger = logger;
            _eventHubClientSettings = eventHubClientSettings;
            _mapper = mapper;
            _equipmentLocclient = new EventHubProducerClient(_eventHubClientSettings.ConnectionString,_eventHubClientSettings.EquipmentLocation);

        }
    }
}

解决方法

基于我对评论的猜测,我很好奇是否重构为使用async/await而不是主循环中的显式连续可能会有所帮助。也许与以下LinqPad代码段相似:

async Task Main()
{
    while (true)
    {
        var message = await Task.Factory.StartNew(() => GetText());
        var events = new[] { new EventData(Encoding.UTF8.GetBytes(message)) };
        
        await Send(events).ConfigureAwait(false);
    }
}

public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");

public async Task Send(EventData[] events)
{
    try
    {
        await client.SendAsync(events).ConfigureAwait(false);
        "Sent".Dump();
    }
    catch (Exception ex)
    {
        ex.Dump();
    }
}

public string GetText()
{
    Thread.Sleep(250);
    return "Test";
}

如果您打算保留延续性,我想知道在延续中进行轻微的结构重构是否有帮助,既可以推动事件的创建,也可以兑现await语句。也许与以下LinqPad代码段相似:

async Task Main()
{
    while(true)
    {
        var t = Task.Factory.StartNew(() => GetText());
        var _ = t.ContinueWith(async q =>
        {
            var events = new[] { new EventData(Encoding.UTF8.GetBytes(t.Result)) };
            await Send(events).ConfigureAwait(false);
        });
        
        await Task.Yield();
    }
}

public EventHubProducerClient client = new EventHubProducerClient("<< CONNECTION STRING >>");

public async Task Send(EventData[] events)
{
    try
    {
        await client.SendAsync(events).ConfigureAwait(false);
        "Sent".Dump();
    }
    catch (Exception ex)
    {
        ex.Dump();
    }
}

public string GetText()
{
    Thread.Sleep(250);
    return "Test";
}