Rx.Net-订阅Cold Observable时,发布方法缺少前几项

问题描述

Akavache的启发,我正在尝试创建一个为我提供IObservable<IArticle>解决方案。该方法实质上首先尝试获取数据库中存在的所有文章,然后尝试从Web服务中获取更新的文章,然后从Web服务中获取最新文章,然后尝试将其保存回数据库中。

由于该Web服务本质上是一个冷的可观察性,并且我不想订阅两次,因此我使用Publish连接到它。我的理解是,我使用的是Publish方法的正确版本,但是,很多时候该方法往往会错过GetNewsArticles的前几篇文章。通过用户界面以及在以下调用添加Trace调用都可以观察到这一点。

除了解决问题外,还非常了解如何调试/测试此代码(除了引入DI注入NewsService之外)。

public IObservable<IArticle> GetContents(string newsUrl,IScheduler scheduler)
{
    var newsService = new NewsService(new HttpClient());
    scheduler = scheduler ?? TaskPoolScheduler.Default;

    var fetchObject = newsService
        .GetNewsArticles(newsUrl,scheduler)
        .Do(x => Trace.WriteLine($"Parsing Articles {x.Title}"));

    return fetchObject.Publish(fetchSubject =>
    {
        var updateObs = fetchSubject
            .Do( x =>                         
            {
                // Save to database,all sync calls
            })
            .Where(x => false)
            .Catch(Observable.Empty<Article>());

        var dbArticleObs = Observable.Create<IArticle>(o =>
        {
            return scheduler.ScheduleAsync(async (ctrl,ct) =>
            {
                using (var session = dataBase.GetSession())
                {
                    var articles = await session.GetArticlesAsync(newsUrl,ct);
                    foreach (var article in articles)
                    {
                        o.OnNext(article);
                    }
                }
                o.OnCompleted();
            });
        });

        return
            dbArticleObs                // First get all the articles from dataBase cache
                .Concat(fetchSubject    // Get the latest articles from web service 
                    .Catch(Observable.Empty<Article>())
                    .Merge(updateObs))  // Update the database with latest articles
                .Do(x => Trace.WriteLine($"displaying {x.Title}"));
    });
}

更新-添加了GetArticles

public IObservable<IContent> GetArticles(string FeedUrl,IScheduler scheduler)
{
    return Observable.Create<IContent>(o =>
    {
        scheduler = scheduler ?? DefaultScheduler.Instance;
        scheduler.ScheduleAsync(async (ctrl,ct) =>
        {
            try
            {
                using (var inputStream = await Client.GetStreamAsync(FeedUrl))
                {
                    var settings = new XmlReaderSettings
                    {
                        IgnoreComments = true,IgnoreProcessingInstructions = true,IgnoreWhitespace = true,Async = true
                    };

                    //var parsingState = ParsingState.Channel;
                    Article article = null;
                    Feed Feed = null;

                    using (var reader = XmlReader.Create(inputStream,settings))
                    {
                        while (await reader.ReadAsync())
                        {
                            ct.ThrowIfCancellationRequested();
                            if (reader.IsstartElement())
                            {
                                switch (reader.LocalName)
                                {
                                    ...
                                    // parsing logic goes here
                                    ...
                                }
                            }
                            else if (reader.LocalName == "item" &&
                                     reader.NodeType == XmlNodeType.EndElement)
                            {
                                o.OnNext(article);
                            }
                        }
                    }

                    o.OnCompleted();
                }
            }
            catch (Exception e)
            {
                o.OnError(e);
            }

        });
        return disposable.Empty;
    });
}
更新2

在此处共享指向source code链接

解决方法

对于您的代码,我有些不满意的地方。我假设NewsService是一个IDisposable,因为它需要一个HttpClient(可抛弃)。您没有进行适当的清理。

此外,您还没有提供完整的方法-因为您已尝试将其缩减为一个问题-但这使我们难以推理如何重写代码。

也就是说,Observable.Create是让我看上去非常恐怖的一件事。您可以改用此代码,看看它是否对您有用吗?

    var dbArticleObs =
        Observable
            .Using(
                () => dataBase.GetSession(),session =>
                    from articles in Observable.FromAsync(ct => session.GetArticlesAsync(newsUrl,ct))
                    from article in articles
                    select article);

现在,如果是这样,请在更新`NewService'时尝试重写fetchObject以使用相同的Observable.Using

无论如何,如果您可以在问题中提供GetContentsNewsService和您的dataBase代码的完整实现,那就很好了。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...