如何制作一个只能订阅一次的轻量级 `Replay` 操作符?

问题描述

在各种情况下,我都希望有一个 Rx Replay 运算符来缓冲传入的通知,在第一次订阅时同步重放其缓冲区,然后停止缓冲。这个轻量级的 Replay 运算符应该只能为一个订阅者提供服务。可以找到此类运算符的一个用例 here,其中在第一次订阅后继续缓冲只是浪费资源。出于演示目的,我将在此处展示一个我希望可以避免的问题行为的人为示例:

var observable = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    .SelectMany(x => Enumerable.Range((int)x * 100_000 + 1,100_000))
    .Take(800_000)
    .Do(x =>
    {
        if (x % 100_000 == 0) Console.WriteLine(
            $"{DateTime.Now:HH:mm:ss.fff} > " +
            $"Produced: {x:#,0},TotalMemory: {GC.GetTotalMemory(true):#,0} bytes");
    })
    .Replay()
    .AutoConnect(0);

await Task.Delay(2200);

Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Subscribing...");

// First subscription
await observable.Do(x =>
{
    if (x % 100_000 == 0)
        Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} > Emitted: {x:#,0}");
});

// Second subscription
Console.WriteLine($"Count: {await observable.Count():#,0}");

observable 总共生成 800,000 个值。 Replay 机制立即连接到源,并在完成前中途订阅。

输出:

16:54:19.893 > Produced: 100,000,TotalMemory: 635,784 bytes
16:54:20.341 > Produced: 200,TotalMemory: 1,164,376 bytes
16:54:20.840 > Produced: 300,TotalMemory: 2,212,992 bytes
16:54:21.354 > Produced: 400,992 bytes
16:54:21.543 > Subscribing...
16:54:21.616 > Emitted: 100,000
16:54:21.624 > Emitted: 200,000
16:54:21.633 > Emitted: 300,000
16:54:21.641 > Emitted: 400,000
16:54:21.895 > Produced: 500,TotalMemory: 4,313,344 bytes
16:54:21.897 > Emitted: 500,000
16:54:22.380 > Produced: 600,TotalMemory: 6,411,208 bytes
16:54:22.381 > Emitted: 600,000
16:54:22.868 > Produced: 700,600 bytes
16:54:22.869 > Emitted: 700,000
16:54:23.375 > Produced: 800,413,400 bytes
16:54:23.376 > Emitted: 800,000
Count: 800,000

订阅后内存使用量不断增长。这是意料之中的,因为所有值都被缓冲,并且在重放的 observable 的整个生命周期内都被缓冲。理想的行为是订阅后内存使用量急剧下降。传播缓冲值后应丢弃缓冲区,因为订阅后它没有用。此外,第二个订阅(await observable.Count())应该失败并显示 InvalidOperationException。在它失去 Replay 功能后,我不希望能够再次订阅 observable。

这是我正在尝试实现的自定义 ReplayOnce 运算符的存根。有没有人知道如何实施它?

public static IConnectableObservable<T> ReplayOnce<T>(this IObservable<T> source)
{
    return source.Replay(); // TODO: enforce the subscribe-once policy
}

顺便说一下,有一个相关的问题 here,关于如何使用可以根据需要偶尔清空的缓冲区制作 Replay 运算符。我的问题有所不同,我希望在订阅后完全禁用缓冲区,并且不再开始增长。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)