问题描述
我有一个在 Minikube 中运行的本地集群。我的管道工作是用 python 编写的,是 Kafka 的基本使用者。我的管道如下所示:
df1 <- structure(list(ID = 1:10,Product = c("VENLAFAXINE HCL CAP ER 24HR 37.5 MG (BASE EQUIVALENT)","MINOXIDIL POWDER","MENTHOL LOZENGE 10 MG","ZINC CHLORIDE GRANULES","CLOPIDOgrel BISULFATE TAB 75 MG (BASE EQUIV)","METHYLPRednISOLONE TAB THERAPY PACK 4 MG (21)","DEXAMETHASONE TAB THERAPY PACK 1.5 MG (7)","METHYLPRednISOLONE DOSE P (16)","MILLIPRED DP (13)","ZONACORT 7 DAY")),class = "data.frame",row.names = c(NA,-10L))
Flink runner 在“Records received”中显示没有记录通过
我是否遗漏了一些基本的东西?
解决方法
--environment_type=EXTERNAL
表示您正在手动启动工作程序,主要用于内部测试。如果您根本不指定 environment_type/config 是否有效?
def run(bootstrap_servers,topic,pipeline_args):
bootstrap_servers = 'localhost:9092'
topic = 'wordcount'
pipeline_args = pipeline_args.append('--flink_submit_uber_jar')
pipeline_options = PipelineOptions([
"--runner=FlinkRunner","--flink_master=localhost:8081","--flink_version=1.12",pipeline_args
],save_main_session=True,streaming=True)
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| ReadFromKafka(
consumer_config={'bootstrap.servers': bootstrap_servers},topics=[topic])
| beam.FlatMap(lambda kv: log_ride(kv[1])))
我在使用最新的 apache Beam 2.30.0、Flink 1.12.4 时遇到了另一个问题
2021/06/10 17:39:42 Initializing python harness: /opt/apache/beam/boot --id=1-2 --provision_endpoint=localhost:42353
2021/06/10 17:39:50 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc =
2021-06-10 17:39:53,076 WARN org.apache.flink.runtime.taskmanager.Task [] - [3]ReadFromKafka(beam:external:java:kafka:read:v1)/{KafkaIO.Read,Remove Kafka Metadata} -> [1]FlatMap(<lambda at kafka-taxi.py:88>) (1/1)#0 (9d941b13ae9f28fd1460bc242b7f6cc9) switched from RUNNING to FAILED.
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id d727ca3c0690d949f9ed1da9c3435b3ab3af70b6b422dc82905eed2f74ec7a15