Apache Beam:从具有存储数据的多个主题中读取

问题描述

我需要从多个 Kafka 主题中读取按时间计算的起始偏移量,按时间戳对它们进行排序并发送到另一个 kafka 主题。所有 kafka 主题都有 1 个分区。

再举一个例子来更好地描述用例。

我们有 inputTopic1,inputTopic2outputTopic。我们需要使用过去 12 小时内来自 inputTopics 的数据并继续使用实时数据。所有消耗的数据都需要排序并发布到outputTopic

我尝试创建自定义 windowFn 来处理此用例,但我在 output 主题中获得了无序数据。

我有两个问题。

我是否选择了正确的方法解决这个问题?我是否以正确的方式实施它?

管道

Instant NowAsInstant = Instant.Now();
        Pipeline pipeline = Pipeline.create(options);
        pipeline.apply("Read from Topics",KafkaIO.<String,String>read()
                .withTopics(List.of("topic1","topic2"))
                .withBootstrapServers("localhost:9092")
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(StringDeserializer.class)
                .withCreateTime(Duration.ZERO)
                .withStartReadTime(NowAsInstant.minus(Duration.standardHours(12)))
                .withConsumerConfigUpdates(consumerConfig)
                .commitOffsetsInFinalize())
                .apply(Window.into(new CustomWindowFn(NowAsInstant,Duration.millis(500))))
                .apply(Combine.globally(new ListCombiner()).withoutDefaults())
                .apply("Sort",MapElements.via(
                        new SimpleFunction<Iterable<KafkaRecord<String,String>>,List<KafkaRecord<String,String>>>() {
                            @Override
                            public List<KafkaRecord<String,String>> apply(Iterable<KafkaRecord<String,String>> input) {
                                return StreamSupport.stream(input.spliterator(),false)
                                        .sorted(KAFKA_RECORD_COMParaTOR)
                                        .collect(Collectors.toUnmodifiableList());
                            }
                        }
                ))
                .apply(Flatten.iterables())
                .apply("mapToProducerRecord",MapElements.<ProducerRecord<String,String>>into(new ProducerRecordCoder<>(
                        StringUtf8Coder.of(),StringUtf8Coder.of()).getEncodedTypeDescriptor())
                        .via((SerializableFunction<KafkaRecord<String,String>,ProducerRecord<String,String>>)
                                new SimpleFunction<KafkaRecord<String,String>>() {
                                    @Override
                                    public ProducerRecord<String,String> apply(KafkaRecord<String,String> input) {
                                        return new ProducerRecord<>("outputTopic",null,input.getTimestamp(),input.getKV().getKey(),input.getKV().getValue(),input.getHeaders());
                                    }
                                }))
                .setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()))
                .apply("Write to Kafka",String>writeRecords()
                        .withBootstrapServers("localhost:9092")
                        .withKeySerializer(StringSerializer.class)
                        .withValueSerializer(StringSerializer.class));

        LOG.info("Starting pipeline...");
        pipeline.run();

CustomWindowFn

public class CustomWindowFn extends PartitioningWindowFn<KafkaRecord<String,IntervalWindow> {
    private final Instant startingTime;
    private final Duration size;
    private Instant intervalStartTime;

    public CustomWindowFn(Instant startingTime,Duration size) {
        this.startingTime = startingTime;
        this.size = size;
    }

    @Override
    public IntervalWindow assignWindow(Instant timestamp) {
        if (timestamp.isBefore(startingTime)) {
            Instant firstRecordTimestamp = getIntervalStartTime(timestamp);
            return new IntervalWindow(firstRecordTimestamp,startingTime);
        }

        Instant start =
                new Instant(timestamp.getMillis()
                        - timestamp.plus(size).getMillis() % size.getMillis());

        return new IntervalWindow(start,start.plus(size));
    }

    private Instant getIntervalStartTime(Instant timestamp) {
        if (isNull(intervalStartTime)) {
            intervalStartTime = timestamp;
        }

        return intervalStartTime;
    }

    @Override
    public boolean isCompatible(WindowFn<?,?> other) {
        return this.equals(other);
    }

    @Override
    public Coder<IntervalWindow> windowCoder() {
        return IntervalWindow.getCoder();
    }

    @Override
    public void verifyCompatibility(WindowFn<?,?> other) throws IncompatibleWindowException {
        if (!this.isCompatible(other)) {
            throw new IncompatibleWindowException(
                    other,String.format(
                            "Only %s objects with the same size and startingTime are compatible.",CustomWindowFn.class.getSimpleName()));
        }
    }


    public Instant getStartingTime() {
        return startingTime;
    }

    public Duration getSize() {
        return size;
    }

    @Override
    public boolean equals(@Nullable Object object) {
        if (!(object instanceof CustomWindowFn)) {
            return false;
        }

        CustomWindowFn other = (CustomWindowFn) object;
        return getStartingTime().equals(other.getStartingTime())
                && getSize().equals(other.getSize());
    }

    @Override
    public int hashCode() {
        return Objects.hash(size,startingTime);
    }
}

ListCombiner

public class ListCombiner extends Combine.CombineFn<KafkaRecord<String,String>>> {

    @Override
    public List<KafkaRecord<String,String>> createAccumulator() {
        return new ArrayList<>();
    }

    @Override
    public List<KafkaRecord<String,String>> addInput(List<KafkaRecord<String,String>> mutableAccumulator,KafkaRecord<String,String> input) {
        if(input != null) {
            mutableAccumulator.add(input);
        }

        return mutableAccumulator;
    }

    @Override
    public List<KafkaRecord<String,String>> mergeAccumulators(Iterable<List<KafkaRecord<String,String>>> accumulators) {
        return StreamSupport.stream(accumulators.spliterator(),false)
                .flatMap(Collection::stream)
                .collect(Collectors.toList());
    }

    @Override
    public List<KafkaRecord<String,String>> extractOutput(List<KafkaRecord<String,String>> accumulator) {
        return accumulator;
    }
}

我还阅读了 timelystateful 处理文章,但我不确定如何将其用于以下任务。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)