问题描述
我们使用Kafka流将其插入到Postgresql中,因为该流太高,避免了直接插入。消费者似乎工作得很好,但偶尔会卡住,找不到根本原因。
该消费者已经运行了大约6个月,已经消耗了数十亿条记录。我不明白为什么它最近会卡住。我什至不知道从哪里开始调试。
以下是用于处理记录的代码: `private static void readFromTopic(DataSource datasource,ConsumerOptions options){
KafkaConsumer<String,String> consumer = KafkaConsumerConfig.createConsumerGroup(options);
Producer<Long,String> producer = KafkaProducerConfig.createKafkaProducer(options);
if (options.isReadFromAnOffset()) {
// if want to assign particular offsets to consume from
// will work for only a single partition for a consumer
List<TopicPartition> tpartition = new ArrayList<TopicPartition>();
tpartition.add(new TopicPartition(options.getTopicName(),options.getPartition()));
consumer.assign(tpartition);
consumer.seek(tpartition.get(0),options.getoffset());
} else {
// use auto assign partition & offsets
consumer.subscribe(Arrays.asList(options.getTopicName()));
log.debug("subscribed to topic {}",options.getTopicName());
}
List<Payload> payloads = new ArrayList<>();
while (true) {
// timer is the time to wait for messages to be received in the broker
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(50));
if(records.count() != 0 )
log.debug("poll size is {}",records.count());
Set<TopicPartition> partitions = records.partitions();
// reading normally as per round robin and the last committed offset
for (ConsumerRecord<String,String> r : records) {
log.debug(" Parition : {} Offset : {}",r.partition(),r.offset());
try {
JSONArray arr = new JSONArray(r.value());
for (Object o : arr) {
Payload p = JsonIterator.deserialize(((JSONObject) o).toString(),Payload.class);
payloads.add(p);
}
List<Payload> steplist = new ArrayList<>();
steplist.addAll(payloads);
// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Connection conn = datasource.getConnection();
PgInsert.insertIntoPg(steplist,conn,consumer,r,options.getTopicName(),options.getErrorTopic(),producer);
} catch (Exception e) {
log.error("error in processing future {}",e);
}
}
},executorService);
// used to combine all futures
allfutures.add(future);
payloads.clear();
} catch (Exception e) {
// pushing into new topic for records which have Failed
log.debug("error in kafka consumer {}",e);
ProducerRecord<Long,String> record = new ProducerRecord<Long,String>(options.getErrorTopic(),r.offset(),r.value());
producer.send(record);
}
}
// commiting after every poll
consumer.commitSync();
if (records.count() != 0) {
Map<TopicPartition,OffsetAndMetadata> Metadata = consumer.committed(partitions);
// reading the committed offsets for each partition after polling
for (TopicPartition tpartition : partitions) {
OffsetAndMetadata offsetdata = Metadata.get(tpartition);
if (offsetdata != null && tpartition != null)
log.debug("committed offset is " + offsetdata.offset() + " for topic partition "
+ tpartition.partition());
}
}
// waiting for all threads to complete after each poll
try {
waitForFuturesToEnd();
allfutures.clear();
} catch (InterruptedException e) {
e.printstacktrace();
} catch (ExecutionException e) {
e.printstacktrace();
}
}
}`
我之前认为它卡住的原因是消耗的记录的大小,所以我将MAX_POLL_RECORDS_CONfig
减小为10
。这将确保在轮询中获取的记录不会超过200kb,因为每条记录的最大大小为20kb。
想用Spring框架解决此问题,但在此之前想知道为什么消费者会卡住。对此的任何见解都会有所帮助。
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)