消费来自Google Pubsub的消息并将其发布到Kafka

问题描述

我正在尝试使用同步PULL API使用Google PubSub消息。在Apache Beam Google PubSub IO连接器库中可用。 我想使用KafkaIO将消耗的消息写到Kafka。我想使用FlinkRunner来执行作业,因为我们在GCP之外运行了该应用程序。

我面临的问题是,消费的邮件在GCP PubSub中没有得到ACK。我已经确认本地Kafka实例具有从GCP PubSub消耗的消息。 GCP DataFlow中的文档表明,当管道终止于数据接收器(在我的情况下为Kafka)时,数据束已完成。

但是,由于代码是在Apache Flink中运行的,而不是在GCP DataFlow中运行的,所以我认为与确认已提交的消息有关的某种回调不会被触发。
在这里做什么错了?

                   pipeline
                    .apply("Read  GCP PubSub Messages",PubsubIO.readStrings()
                            .fromSubscription(subscription)
                    )
                    .apply(ParseJsons.of(User.class))
                    .setCoder(SerializableCoder.of(User.class))
                    .apply("Filter-1",ParDo.of(new FilterTextFn()))
                    .apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
                    .apply("Write to Local Kafka",KafkaIO.<Void,String>write()
                                    .withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
                                    .withTopic("test-topic")
                                    .withValueSerializer((StringSerializer.class))
                                    .values()
                    );

解决方法

在光束documentation on the PubSub IO class中提到了这一点:

检查点既用于将收到的消息ACK确认回Pubsub(以便它们可以在Pubsub端退役),也应用于NACK已消耗的消息(如果需要恢复检查点)(以便Pubsub将立即重新发送这些消息)

ACK未链接到数据流,您在数据流上应具有相同的行为。确认在检查点上发送。通常,检查点是您在流中设置的窗口。

但是,您没有设置窗口!默认情况下,这些窗口是全局的,并且如果您优雅地停止工作(甚至我不确定),它只会在最后关闭。无论如何,更好的解决方案是具有固定的窗口(例如5分钟),以在每个窗口上确认消息。

,

我解决此问题的方法是使用Guillaume Blaquiere(https://stackoverflow.com/users/11372593/guillaume-blaquiere)建议的检查点。即使在管道中添加了Window.into()函数之后,源PubSub订阅终结点也没有收到ACK。
问题出在Flink服务器配置中,我未能提及检查点配置。没有这些参数,检查点将被禁用。

state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-1.9.3/state/checkpoints/

这些配置应该放在flink_home / conf / flink-conf.yaml中。 添加这些条目并重新启动flink之后。在GCP pubsub监视图表中,所有积压(未确认的消息)都变为0。