从定期异步请求创建可观察的

问题描述

我想要一种将异步方法转换为可观察方法的通用方法。就我而言,我正在处理使用HttpClient从API提取数据的方法。

假设我们有一种方法Task<string> GetSomeData(),它需要变成一个单独的Observable<string>,其中值是由以下各项的组合生成的:

  • 重复调用GetSomeData()(例如,每x秒)
  • 在任何给定时间(例如,当用户点击刷新时)手动触发对GetSomeData()的呼叫。

由于有两种触发GetSomeData()并发执行的方式可能是个问题。为了避免要求GetSomeData()是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。结果,我需要使用某种策略来处理重叠的请求。我制作了一种大理石图,试图描述问题和想要的结果

https://docs.wildfly.org/21/wildscribe/subsystem/jgroups/stack/transport/TCP/index.html

我的直觉告诉我有一种简单的方法可以实现这一目标,所以请给我一些见解:)

这是我到目前为止的解决方案。不幸的是,它不能解决并发问题。

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall,TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

延后重复的扩展方法:

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source,TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

包含用于生成可观察对象的方法的服务示例

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

这样使用(会发生数据争用):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync,TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)