问题描述
受Akavache的启发,我正在尝试创建一个为我提供IObservable<IArticle>
的解决方案。该方法实质上首先尝试获取数据库中存在的所有文章,然后尝试从Web服务中获取更新的文章,然后从Web服务中获取最新文章,然后尝试将其保存回数据库中。
由于该Web服务本质上是一个冷的可观察性,并且我不想订阅两次,因此我使用Publish
连接到它。我的理解是,我使用的是Publish
方法的正确版本,但是,很多时候该方法往往会错过GetNewsArticles
的前几篇文章。通过用户界面以及在以下调用中添加的Trace
调用都可以观察到这一点。
除了解决问题外,还非常了解如何调试/测试此代码(除了引入DI注入NewsService
之外)。
public IObservable<IArticle> GetContents(string newsUrl,IScheduler scheduler)
{
var newsService = new NewsService(new HttpClient());
scheduler = scheduler ?? TaskPoolScheduler.Default;
var fetchObject = newsService
.GetNewsArticles(newsUrl,scheduler)
.Do(x => Trace.WriteLine($"Parsing Articles {x.Title}"));
return fetchObject.Publish(fetchSubject =>
{
var updateObs = fetchSubject
.Do( x =>
{
// Save to database,all sync calls
})
.Where(x => false)
.Catch(Observable.Empty<Article>());
var dbArticleObs = Observable.Create<IArticle>(o =>
{
return scheduler.ScheduleAsync(async (ctrl,ct) =>
{
using (var session = dataBase.GetSession())
{
var articles = await session.GetArticlesAsync(newsUrl,ct);
foreach (var article in articles)
{
o.OnNext(article);
}
}
o.OnCompleted();
});
});
return
dbArticleObs // First get all the articles from dataBase cache
.Concat(fetchSubject // Get the latest articles from web service
.Catch(Observable.Empty<Article>())
.Merge(updateObs)) // Update the database with latest articles
.Do(x => Trace.WriteLine($"displaying {x.Title}"));
});
}
更新-添加了GetArticles
public IObservable<IContent> GetArticles(string FeedUrl,IScheduler scheduler)
{
return Observable.Create<IContent>(o =>
{
scheduler = scheduler ?? DefaultScheduler.Instance;
scheduler.ScheduleAsync(async (ctrl,ct) =>
{
try
{
using (var inputStream = await Client.GetStreamAsync(FeedUrl))
{
var settings = new XmlReaderSettings
{
IgnoreComments = true,IgnoreProcessingInstructions = true,IgnoreWhitespace = true,Async = true
};
//var parsingState = ParsingState.Channel;
Article article = null;
Feed Feed = null;
using (var reader = XmlReader.Create(inputStream,settings))
{
while (await reader.ReadAsync())
{
ct.ThrowIfCancellationRequested();
if (reader.IsstartElement())
{
switch (reader.LocalName)
{
...
// parsing logic goes here
...
}
}
else if (reader.LocalName == "item" &&
reader.NodeType == XmlNodeType.EndElement)
{
o.OnNext(article);
}
}
}
o.OnCompleted();
}
}
catch (Exception e)
{
o.OnError(e);
}
});
return disposable.Empty;
});
}
更新2
在此处共享指向source code的链接。
解决方法
对于您的代码,我有些不满意的地方。我假设NewsService
是一个IDisposable
,因为它需要一个HttpClient
(可抛弃)。您没有进行适当的清理。
此外,您还没有提供完整的方法-因为您已尝试将其缩减为一个问题-但这使我们难以推理如何重写代码。
也就是说,Observable.Create
是让我看上去非常恐怖的一件事。您可以改用此代码,看看它是否对您有用吗?
var dbArticleObs =
Observable
.Using(
() => dataBase.GetSession(),session =>
from articles in Observable.FromAsync(ct => session.GetArticlesAsync(newsUrl,ct))
from article in articles
select article);
现在,如果是这样,请在更新`NewService'时尝试重写fetchObject
以使用相同的Observable.Using
。
无论如何,如果您可以在问题中提供GetContents
,NewsService
和您的dataBase
代码的完整实现,那就很好了。