RxJS 按字段分组并返回新的 observable

问题描述

我遵循接口和 Observable<Machine[]>,我想要实现的是在 symbol 中按机器 Observable<Machine[]> 属性分组并返回映射的可观察 Observable<Order[]>

export interface Machine {
    symbol: string;
    price: number;
    amount: number;
    id: number;
}

export interface Order {
    symbol: string;
    machines: OrderMachine[];
}

export interface OrderMachine {
    price: number;
    amount: number;
    id: number;
}

我尝试使用 RxJS 的 gropBy 运算符,但它似乎一一返回分组数组。

machines: Machine[] = [
        { amount: 1,id: 1,symbol: "A",price: 1 },{ amount: 1,id: 2,price: 2 }
    ];


of(machines).pipe(
        takeuntil(this.unsubscribe),mergeMap(res => res),groupBy(m => m.symbol),mergeMap(group => zip(of(group.key),group.pipe(toArray()))),map(x => { // here I have probably wrong model [string,Machine[]]
            const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price,amount: y.amount,id: y.id }})
            return <Order>{ symbol: x[0],machines: orderMachines }  })
        );

结果我有Observable<Order>而不是Observable<Order[]>

预期结果模型:

orders: Order[] = [
        {   
            symbol: "A",machines: [
                { amount: 1,price: 1,id: 1 },price: 2,id: 2 }
            ]
        }
    ];

解决方法

这里有一个基于您的方法的可能解决方案,但有一些变化:

const machines = [
  { amount: 1,id: 1,symbol: "A",price: 1 },{ amount: 1,id: 2,price: 2 },id: 3,symbol: "B",price: 3 }
];

from(machines) // (1)
  .pipe(
    // (2)
    groupBy((m) => m.symbol),mergeMap((group) => group.pipe(toArray())),map((arr) => ({
      symbol: arr[0].symbol,// every group has at least one element
      machines: arr.map(({ price,amount,id }) => ({
        price,id
      }))
    })),toArray(),// (3)
  )
  .subscribe(console.log);

(1) 我将 of(machines) 更改为 from(machines),以便将 machines 中的对象一一发送到流中。在此更改之前,整个数组立即发出,因此流被破坏。

(2) 我从管道中删除了 takeUntil(this.unsubscribe)mergeMap(res => res),因为没有理由在您的示例中使用它们。 takeUntil 不会有任何影响,因为流是有限且同步的。与 res => res 一起应用的标识函数 (mergeMap) 在流的流中有意义,而在您的示例中并非如此。或者您的项目实际上是否需要这些操作符,因为您有无限的可观察数据流?

(3) toArray() 是将 Observable<Order> 转换为 Observable<Order[]>。它一直等到流结束,然后以数组的形式一次性发出所有流过的值。

编辑:

操作员提到他更需要一个与无限流兼容的解决方案,但由于 toArray 仅适用于有限流,因此上面提供的答案在这种情况下永远不会发出任何内容。

为了解决这个问题,我会避免使用 rxjs 中的 groupBy。在其他情况下,它是一个非常强大的工具,您需要将一个流分成几组流,但在您的情况下,您只想对一个数组进行分组,并且有更简单的方法。

this.store.pipe(
    select(fromOrder.getMachines)
    map((arr) =>
        // (*) group by symbol
        arr.reduce((acc,{ symbol,price,id }) => {
            acc[symbol] = {
                symbol,machines: (acc[symbol] ? acc[symbol].machines : [])
                    .concat({ price,id })
            };
            return acc;
        },{})
    ),)
.subscribe((result) => 
    // (**)
    console.log(Object.values(result))
);

(*) 您可以使用普通的 groupBy implementation 返回形状为 {[symbol: string]: Order} 的对象。

(**) result 在这里是一个对象,但您可以轻松地将其转换为数组,但应用 Object.values(result)

,

@kruschid 非常感谢您的回复,它可以正常工作但不幸的是,当我想在我的商店 (ngrx) 中使用它时它不起作用,类型正常但它在 {{1 之后停止显示日志}} 方法:

mergeMap