带有 Bindy 的 Apache Camel 反应流 - 仅读取第一行

问题描述

我正在尝试将 Apache Camel(2.25.3 版)反应性流与 Spring Boot 结合使用来读取大型 csv 文件并使用 Bindy 解组这些行。这是“工作”,因为应用程序运行并在文件出现时检测它们,但我只能在我的流中看到文件的第一行。它似乎与 Bindy 相关,因为如果我从等式中取出解组,我会在我的流中恢复 csv 文件的所有行。我已经简化了在 SO 上演示的问题。我正在使用 Spring Webflux 来公开结果发布者。

所以我的骆驼路线如下:

import lombok.requiredArgsConstructor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

@requiredArgsConstructor
@Component
public class TransactionLineCsvRoute extends RouteBuilder {
    private final CamelReactiveStreamsService camelrs;

    @Override
    public void configure() {
        var bindy = new BindyCsvDataFormat(LineItem.class);

        from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
                .unmarshal(bindy)
                .to("reactive-streams:lineItems");
    }

    public Flux<LineItem> getLineItemFlux() {
        Publisher<LineItem> lineItems = camelrs.fromStream("lineItems",LineItem.class);

        return Flux.from(lineItems);
    }
}

Bindy 类:

@ToString
@Getter
@CsvRecord(separator = ";",skipFirstLine = true,skipField =true)
public class LineItem {
    @datafield(pos = 2)
    private String description;
}

以及暴露 Flux 的端点:

@GetMapping(value = "/lineItems",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return lineItemFlux;
}

所以当我现在做卷发时:

curl localhost:8080/lineItems

我只取回第一行,而当我删除“.unmarshal(bind)”行(并将流重构为 String 而不是 LineItem 类型)时,我取回了 csv 文件的所有元素。>

所以我想我没有在反应流上下文中正确使用 Bindy。我跟着这个 Camel documentation 并试图重写我的路线如下:

from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
        .to("reactive-streams:rawLines");

from("reactive-streams:rawLines")
        .unmarshal(bindy)
        .to("reactive-streams:lineItems");

显示路由已正确启动:

2021-01-04 10:13:26.798  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route1 started and consuming from: file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport
2021-01-04 10:13:26.800  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Route: route2 started and consuming from: reactive-streams://rawLines
2021-01-04 10:13:26.801  INFO 26438 --- [           main] o.a.camel.spring.SpringCamelContext      : Total 2 routes,of which 2 are started

但后来我收到一个异常,指出“该流没有活动订阅”:

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId              ProcessorId          Processor                                                                        Elapsed (ms)
[route1            ] [route1            ] [file://input/?include=.*%5C.csv&move=successImport&moveFailed=failImport      ] [         9]
[route1            ] [to1               ] [reactive-streams:rawLines                                                     ] [         5]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------

java.lang.IllegalStateException: The stream has no active subscriptions
    at org.apache.camel.component.reactive.streams.engine.CamelPublisher.publish(CamelPublisher.java:108) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.engine.DefaultCamelReactiveStreamsService.sendCamelExchange(DefaultCamelReactiveStreamsService.java:144) ~[camel-reactive-streams-2.25.3.jar:2.25.3]
    at org.apache.camel.component.reactive.streams.ReactiveStreamsProducer.process(ReactiveStreamsProducer.java:52) ~[camel-reactive-streams-2.25.3.jar:2.25.3]

有没有人有任何指示我可以如何将 Bindy 与反应流结合使用?谢谢!

编辑

在 burki 的非常有用的帖子之后,我能够修复我的代码。因此路由定义更改为以下内容。如您所见,我删除了解组步骤,因此它只是在文件到达时从文件系统中提取文件并将它们放入反应流中:

@Override
public void configure() {
    from("file:input/?include=.*\\.csv&move=successImport&moveFailed=failImport")
            .to("reactive-streams:extractedFile");
}

然后将文件流公开为 Flux:

public Flux<File> getFileFlux() {
    return Flux.from(camelrs.fromStream("extractedFile",File.class));
}

解析 CSV 的代码如下(使用 burki 建议的 OpenCSV,但使用 API 的不同部分):

private Flux<LineItem> readLineItems() {
    return fileFlux
            .flatMap(message -> Flux.using(
                    () -> new CsvToBeanBuilder<LineItem>(createFileReader(message)).withSkipLines(1)
                            .withSeparator(';')
                            .withType(LineItem.class)
                            .build()
                            .stream(),Flux::fromStream,BaseStream::close)
            );
}

private FileReader createFileReader(File file) {
    System.out.println("Reading file from: " + file.getAbsolutePath());
    try {
        return new FileReader(file);
    } catch (FileNotFoundException e) {
        throw new RuntimeException(e);
    }
}

您现在可以将此结果 Flux 作为端点公开:

@GetMapping(value = "/lineItems",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<LineItem> lineItems() {
    return readLineItems();
}

现在当你像我上面那样做卷曲时,你会从 csv 中获得完整的未编组的 LineItem。

我仍然有一个待办事项,无论这是否真的将整个文件加载到内存中。我不这么认为,我想我只得到一个指向文件的指针,然后我将其流式传输到 OpenCSV bean,但我需要验证这一点,可能是我现在首先将整个文件读入内存,然后将其流式传输这会破坏目的。

解决方法

我猜文件使用者只是将整个文件传递给解组步骤。

因此,如果您将文件使用者的结果解组为 LineItem,您将整个文件内容“减少”到第一行

相反,如果您删除解组,您将获得整个文件内容。但可能文件使用者在传递之前将整个文件加载到内存中

但是阅读整个文件并不是您想要的。要逐行读取 CSV 文件,您需要在流模式下拆分文件。

from("file:...")
    .split(body().tokenize(LINE_FEED)).streaming()
    .to("direct:processLine") 

像这样,Splitter 将每一行发送到路由 direct:processLine 以进行进一步处理。

我在这种情况下面临的问题是解析单个 CSV 行。大多数 CSV 库旨在读取和解析整个文件,而不是单行。

然而,相当古老的 OpenCSV 库有一个带有 parseLine(String csvLine) 方法的 CSVParser。所以我用它来解析“完全分离”的单个 CSV 行。