问题描述
我正在读取Kafka并使用Flink将数据写入Cassandra,我遇到LocalDateTime
编解码器问题,并在我的POJO类中为该问题添加了@Column(name = "timeStamp",codec = LocalDateTimeCodec.class)
,如下所示,
import com.datastax.driver.extras.codecs.jdk8.LocalDateTimeCodec;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.UUID;
@Data
@NoArgsConstructor
@Table(keyspace = "personKey",name = "Person")
public class Person implements Serializable {
private static final long serialVersionUID = 1L;
@Column(name = "id")
private UUID id;
@Column(name = "name")
private String name;
@Column(name = "timeStamp",codec = LocalDateTimeCodec.class) // If i remove codec,no linkage error
private LocalDateTime timeStamp;
}
部分代码是
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>
("test-topic",new SimpleStringSchema(),prop);
myConsumer.setStartFromEarliest();
try (CqlSession session = CqlSession.builder()
.addContactPoint(new InetSocketAddress("localhost",9042))
.withLocalDatacenter("datacenter1")
.build()) {
CreateKeyspace createKs = SchemaBuilder.createKeyspace("personKey")
.withSimpleStrategy(1);
session.execute(createKs.build());
CreateTable createTable = SchemaBuilder.createTable("personKey","Person")
.ifNotExists()
.withPartitionKey("id",DataTypes.UUID)
.withColumn("name",DataTypes.TEXT)
.withColumn("timeStamp",DataTypes.TIMESTAMP);
session.execute(createTable.build());
}
DataStream<String> stream = env.addSource(myConsumer);
DataStream<Person> sideOutput = stream.flatMap(new FlatMapFunction<String,Person>() {
@Override
public void flatMap(String value,Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value,Person.class));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
});
CassandraSink.addSink(sideOutput)
.setHost("localhost")
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
.build();
env.execute();
Maven是
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.7.2</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.7.2</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-mapper-runtime</artifactId>
<version>4.7.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.10.1</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.10.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-cassandra_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
</dependencies>
我遇到链接错误,
Caused by: java.lang.IncompatibleClassChangeError:
Class com.datastax.driver.core.DefaultResultSetFuture does not implement the requested interface org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.ListenableFuture
我也尝试了以下配置,弹出相同的错误,我可以在Cassandra中保存数据而无需使用LocalDateTime
cluster.getConfiguration().getCodecRegistry()
.register(LocalDateTimeCodec.instance);
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)