WSO2流集成器-Siddhi.io将多个事件聚合为一个事件

问题描述

我需要将多个事件收集为一个事件,并将其视为一个事件。输出流应将输入事件包括在列表中。

基本上,当以下事件通过时

{InvoiceNumber:1,LineItem: 1,Value: 100}
{InvoiceNumber:1,LineItem: 2,Value: 200}
{InvoiceNumber:1,LineItem: 3,Value: 300}

我需要输出如下所示

[InvoiceNumber:1,lineItems: [{LineItem: 1,Value: 100},{LineItem: 2,Value: 200},{LineItem: 3,Value: 300}]

如何使用WSO2流集成器实现此目标?或Siddhi.io。

我尝试了以下操作,但仍将每个流插入输出流中

partition with (InvoiceNo of CSVInputStream)
begin

    from every e1=CSVInputStream
        within 1 min
    select e1.InvoiceNo,list:collect(e1.LineItem) as lineItems
    group by e1.InvoiceNo
    insert into AggregatedStream;
end;

解决方法

请勿使用分区,因为这是一个简单的用例,请尝试使用Windows。您的情况下的时间批处理窗口https://siddhi.io/en/v5.1/docs/api/latest/#timebatch-window

from CSVInputStream#window.timeBatch(1 min)
select e1.InvoiceNo,list:collect(e1.LineItem) as lineItems
group by e1.InvoiceNo
insert into AggregatedStream;