问题描述
我想要一种将异步方法转换为可观察方法的通用方法。就我而言,我正在处理使用HttpClient
从API提取数据的方法。
假设我们有一种方法Task<string> GetSomeData()
,它需要变成一个单独的Observable<string>
,其中值是由以下各项的组合生成的:
- 重复调用
GetSomeData()
(例如,每x秒) - 在任何给定时间(例如,当用户点击刷新时)手动触发对
GetSomeData()
的呼叫。
由于有两种触发GetSomeData()
并发执行的方式可能是个问题。为了避免要求GetSomeData()
是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。结果,我需要使用某种策略来处理重叠的请求。我制作了一种大理石图,试图描述问题和想要的结果
https://docs.wildfly.org/21/wildscribe/subsystem/jgroups/stack/transport/TCP/index.html
我的直觉告诉我有一种简单的方法可以实现这一目标,所以请给我一些见解:)
这是我到目前为止的解决方案。不幸的是,它不能解决并发问题。
public class ObservableCreationWrapper<T>
{
private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
private Func<Task<T>> _methodToCall;
private IObservable<T> _manualCalls;
public IObservable<T> Stream { get; private set; }
public ObservableCreationWrapper(Func<Task<T>> methodToCall,TimeSpan period)
{
_methodToCall = methodToCall;
_manualCalls = _manualCallsSubject.AsObservable()
.Select(x => Observable.FromAsync(x => methodToCall()))
.Merge(1);
Stream = Observable.FromAsync(() => _methodToCall())
.DelayRepeat(period)
.Merge(_manualCalls);
}
public void TriggerAdditionalCall()
{
_manualCallsSubject.OnNext(Unit.Default);
}
}
延后重复的扩展方法:
static class Extensions
{
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source,TimeSpan delay) => source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
}
包含用于生成可观察对象的方法的服务示例
class SomeService
{
private int _ticks = 0;
public async Task<string> GetSomeValueAsync()
{
//Just a hack to dermine if request was triggered manuall or by timer
var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";
//Here we have a data race! We would like to limit access to this method
var valueToReturn = $"{_ticks} ({initiatationWay})";
await Task.Delay(500);
_ticks += 1;
return valueToReturn;
}
}
这样使用(会发生数据争用):
static async Task Main(string[] args)
{
//Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
var someService = new SomeService();
var stopwatch = Stopwatch.StartNew();
var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync,TimeSpan.FromMilliseconds(2000));
observableWrapper.Stream
.Take(6)
.Subscribe(x =>
{
Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
});
await Task.Delay(4000);
observableWrapper.TriggerAdditionalCall();
observableWrapper.TriggerAdditionalCall();
Console.ReadLine();
}
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)