Flink不散发要存储在Cassandra中的值

问题描述

我有以下POJO课,

import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;

@Table(keyspace = "testKey",name = "contact")
public class Person implements Serializable {

    private static final long serialVersionUID = 1L;

    @Column(name = "name")
    private String name;

    @Column(name = "timeStamp")
    private LocalDateTime timeStamp;
}

Mapper代码是

DataStream<Reading> sideOutput = stream.flatMap(new FlatMapFunction<String,Person>() {
            @Override
            public void flatMap(String value,Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value,Person.class));
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }).getSideOutput(new OutputTag<>("contact",TypeInformation.of(Person.class)));

 env.execute();
 
 CassandraSink.addSink(sideOutput)
                .setHost("localhost")
                .setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
                .build();

没有.getSideOutput(new OutputTag<>("contact",TypeInformation.of(Person.class)));也无法正常工作。

sideOutput不会散发出要存储在Cassandra中的值。知道我做错了什么吗?

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)