为什么Kafka使用者外壳无法使用.Net Core控制台中我ProducerBuilder类的消息?

问题描述

我已经在Ubuntu上安装了Kafka并设法仅使用shell文件来测试一个简单的场景:

  • 使用默认配置启动Zookeeper
  • 使用默认配置启动节点或代理
  • 创建了一个主题,为其命名,其单个分区和复制因子为1,并将其与zookeeper的默认地址相关联
  • 开设了生产者和消费者:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname

其次:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic mytopicname --from-beginning

一切顺利,一切正常。现在,我想关闭生产者并在asp.net核心应用程序中为生产者编写代码。

using System;
using System.ComponentModel;
using System.Net;
using Confluent.Kafka;

    namespace KafkaTraining
    {
        class Program
        {
            static void Main(string[] args)
            {
                var config = new ProducerConfig
                {
                    BootstrapServers = "localhost:9092"
                };
    
                using (var producer = new ProducerBuilder<string,string>(config).Build())
                {
                    producer.Produce("mytopicname",new Message<string,string> { Value = "a log message" });
                }
                Console.ReadLine();
            }
        }
    }

在这里找到灵感: // https://docs.confluent.io/current/clients/dotnet.html#

因此,上面的C#代码应该等效于shell命令,再加上我在C#中编写的应该由使用者使用的消息(由于使用shell命令,使用者仍在终端窗口中打开)。

如果我之前发布的shell命令有效,即bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic mytopicname仅包含与本地主机:9092(服务地址)和主题名称相关的其他信息,那么C#程序为什么不能成功替换它,它应该产生消息“日志消息”,并且终端应使用它,但不会。而我该如何调试呢?

PS。我已经通过以下地址在Linux Ubuntu上安装了Kafka:https://www.apache.org/dyn/closer.cgi?path=/kafka/2.6.0/kafka_2.12-2.6.0.tgz 而在我的Asp.Net Core 3.1控制台应用程序中,我已经安装了1.5.0版本,不确定该版本是否起作用,但是不知道如何开始调试... thansk查找任何指针。

<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>netcoreapp3.1</TargetFramework>
    <StartupObject>KafkaDemo.Program</StartupObject>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Confluent.Kafka" Version="1.5.0" />
  </ItemGroup>

</Project>

解决方法

您的消息未生成,因为您在传递消息之前将生产者处置。 Produce方法异步发送消息,并且不等待响应-它立即返回。为确保已发送,您可以使用Flush()-在发送所有正在进行的邮件之前将其阻止,也可以使用await ProduceAsync()

尝试一下:

using (var producer = new ProducerBuilder<string,string>(config).Build())
{
    producer.Produce("mytopicname",new Message<string,string> { Value = "a log message" });
    producer.Flush();
}

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...