ReadFromKafka 抛出 ValueError:不支持的信号:2

问题描述

目前我尝试将 apache beam 与 apache kafka 结合起来。

Kafka 服务正在(本地)运行,我使用 kafka-console-producer 编写了一些测试消息。

首先我写了这个 Java Codesnippet 来用我知道的语言测试 apache beam。它按预期工作。

public class Main {

  public static void main(String[] args) {

    Pipeline pipeline = Pipeline.create();

    Read<Long,String> kafkaReader = KafkaIO.<Long,String>read()
        .withBootstrapServers("localhost:9092")
        .withTopic("beam-test")
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class);

    kafkaReader.withoutMetadata();

    pipeline
        .apply("Kafka",kafkaReader
        ).apply(
          "Extract words",ParDo.of(new DoFn<KafkaRecord<Long,String>,String>() {
            @ProcessElement
          public void processElement(ProcessContext c){
              System.out.println("Key:" + c.element().getKV().getKey() + " | Value: " + c.element().getKV().getValue());
            }
        })
    );

    pipeline.run();
  }
}

我的目标是在 python 中编写相同的代码,这就是我目前的目标:

def run_pipe():
    
    with beam.Pipeline(options=PipelineOptions()) as p:
        (p
        | 'Kafka Unbounded' >> ReadFromKafka(consumer_config={'bootstrap.servers' : 'localhost:9092'},topics=['beam-test'])
        | 'Test Print' >> beam.Map(print)
        )

if __name__ == '__main__':
    run_pipe()

现在问题来了。当我尝试运行 python 代码时,出现以下错误:

(app) λ python ArghKafkaExample.py 
Traceback (most recent call last):
  File "ArghKafkaExample.py",line 22,in <module>
    run_pipe()
  File "ArghKafkaExample.py",line 10,in run_pipe
    (p
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py",line 1028,in __ror__
    return self.transform.__ror__(pvalueish,self.label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\ptransform.py",line 572,in __ror__
    result = p.apply(self,pvalueish,label)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py",line 648,in apply
    return self.apply(transform,pvalueish)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\pipeline.py",line 691,in apply
    pvalueish_result = self.runner.apply(transform,self._options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py",line 198,in apply
    return m(transform,input,options)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\runners\runner.py",line 228,in apply_PTransform
    return transform.expand(input)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py",line 322,in expand
    self._expanded_components = self._resolve_artifacts(
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\contextlib.py",line 120,in __exit__
    next(self.gen)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py",line 372,in _service
    yield stub
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\transforms\external.py",line 523,in __exit__
    self._service_provider.__exit__(*args)
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py",line 74,in __exit__
    self.stop()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py",line 133,in stop
    self.stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py",line 179,in stop_process
    return super(JavaJarServer,self).stop_process()
  File "C:\Users\gamef\git\BeamMeScotty\app\lib\site-packages\apache_beam\utils\subprocess_server.py",line 143,in stop_process
    self._process.send_signal(signal.SIGINT)
  File "C:\Users\gamef\AppData\Local\Programs\Python\Python38\lib\subprocess.py",line 1434,in send_signal
    raise ValueError("Unsupported signal: {}".format(sig))
ValueError: Unsupported signal: 2

通过谷歌搜索,我发现它与程序退出代码(如 Strg+C)有关,但总的来说我完全不知道问题是什么。

任何建议都会有所帮助!

问候帕斯卡

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)