使用Flink将POJO保存到Cassandra

问题描述

我是Flink的新手,我想将Kafka流数据存储到Cassandra中。我已经将String转换为POJO。我的POJO如下,

@Table(keyspace = "sample",name = "contact")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "name")
    private String name;

    @Column(name = "timeStamp")
    private LocalDateTime timeStamp;

我的转换发生在下面,

 stream.flatMap(new FlatMapFunction<String,Person>() {
            public void flatMap(String value,Collector<Person> out) {
                try {
                    out.collect(objectMapper.readValue(value,Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).print(); // I need to use proper method to convert to Datastream.
        env.execute();

我阅读了以下链接上的文档以供参考,

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/cassandra.html

Cassandra Sink接受DataStream实例。我需要转换我的转换并将其存储到Kafka中。

Cant create Cassandra Pojo Sink也给我一些想法。

有方法.forward()返回 DataStream<Reading> forward,并将实例传递给时,

 CassandraSink.addSink(forward)
                .setHost("localhost")
                .build();
cannot access org.apache.flink.streaming.api.scala.DataStream

如何将POJO转换为存储在Cassandra中?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...