问题描述
我正在尝试使用 flink 1.12 版扩展 FlinkKafkaConsumer 以在我的 Flink 作业中限制 Kafka 消费者。作为其中的一部分,我尝试遵循以下线程来实现相同的目标。但是,我在创建 AbstractFetcher 时遇到了 createFetcher 方法中的编译问题。
How to use Ratelimiter on flink?
他们的方法是否在 emitRecord 中命名,因为我找不到 KafkaFetcher 和 AbstractFetcher 类?
下面是代码片段
protected AbstractFetcher<T,?> createFetcher(
SourceContext<T> sourceContext,Map<KafkaTopicPartition,Long> partitionsWithOffsets,SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,StreamingRuntimeContext runtimeContext,OffsetCommitMode offsetCommitMode,MetricGroup consumerMetricGroup,boolean useMetrics)
throws Exception {
return new KafkaFetcher<T>(
sourceContext,partitionsWithOffsets,watermarksPeriodic,watermarksPunctuated,runtimeContext.getProcessingTimeService(),runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),runtimeContext.getUserCodeClassLoader(),runtimeContext.getTaskNameWithSubtasks(),deserializer,properties,pollTimeout,runtimeContext.getMetricGroup(),consumerMetricGroup,useMetrics) {
protected void emitRecord(T record,KafkaTopicPartitionState<TopicPartition> partitionState,long offset) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecord(record,partitionState,offset);
}
@Override
protected void emitRecordWithTimestamp(T record,long offset,long timestamp) throws Exception {
subtaskRateLimiter.acquire();
if (record == null) {
consumerMetricGroup.counter("invalidRecord").inc();
}
super.emitRecordWithTimestamp(record,offset,timestamp);
}
};
}
解决此问题的任何建议是获取 Flink Kafka Consumer 的自定义速率限制
解决方法
emitRecord
现在由 org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter#emitRecord
实现。