用于速率限制的 Flink Kafka 自定义使用者

问题描述

我正在尝试使用 flink 1.12 版扩展 FlinkKafkaConsumer 以在我的 Flink 作业中限制 Kafka 消费者。作为其中的一部分,我尝试遵循以下线程来实现相同的目标。但是,我在创建 AbstractFetcher 时遇到了 createFetcher 方法中的编译问题。

How to use Ratelimiter on flink?

他们的方法是否在 emitRecord 中命名,因为我找不到 KafkaFetcherAbstractFetcher 类?

下面是代码片段

protected AbstractFetcher<T,?> createFetcher(
        SourceContext<T> sourceContext,Map<KafkaTopicPartition,Long> partitionsWithOffsets,SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,StreamingRuntimeContext runtimeContext,OffsetCommitMode offsetCommitMode,MetricGroup consumerMetricGroup,boolean useMetrics)
        throws Exception {

    return new KafkaFetcher<T>(
            sourceContext,partitionsWithOffsets,watermarksPeriodic,watermarksPunctuated,runtimeContext.getProcessingTimeService(),runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),runtimeContext.getUserCodeClassLoader(),runtimeContext.getTaskNameWithSubtasks(),deserializer,properties,pollTimeout,runtimeContext.getMetricGroup(),consumerMetricGroup,useMetrics) {
        protected void emitRecord(T record,KafkaTopicPartitionState<TopicPartition> partitionState,long offset) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecord(record,partitionState,offset);
        }

        @Override
        protected void emitRecordWithTimestamp(T record,long offset,long timestamp) throws Exception {
            subtaskRateLimiter.acquire();
            if (record == null) {
                consumerMetricGroup.counter("invalidRecord").inc();
            }
            super.emitRecordWithTimestamp(record,offset,timestamp);
        }
    };

}

解决此问题的任何建议是获取 Flink Kafka Consumer 的自定义速率限制

解决方法

emitRecord 现在由 org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter#emitRecord 实现。