IObservable ObserveOn 正在锁定线程,这是可以预防的吗?

问题描述

我正在设计一个服务器,它将客户端请求转移到一个专用于处理数据的线程。我这样做是为了防止正在处理的数据出现任何竞争条件或并发问题。因为服务器被设计为反应式的,所以每当服务器收到请求时,我都会使用 Observables 将请求通知给程序的其余部分。现在因为服务器套接字正在侦听和发射来自多个线程的信号,我想确保无论服务器在哪个线程发射,都将始终观察专用数据处理线程。我选择使用 ObserveOn 方法,这立即适得其反。我立即注意到,在一次可观察到的射击时,其他人都没有射击。 不仅如此,发送到专用线程的其他操作也没有触发。

本质上,可观察对象似乎在为自己“声明”线程。该线程完全被 observable 阻塞,除了该 observable 的排放之外,不能用于任何其他事情。我不希望这种情况发生,因为该线程专用于所有数据处理操作,这阻止了我将该线程用于任何其他可观察对象或未来的数据处理任务。那么,我有什么选择可以防止 observable 将线程锁定到自身,或者强制将 observable 观察到我的专用线程而不阻塞其他 observable。

此示例代码演示了该问题。在这里,我们使用单线程任务调度程序,并注意到它运行得很好,直到第一个主题(已设置为 ObserveOn 调度程序)发出它的字符串。发生这种情况后,不会再触发任何主题或动作。第一个主题有效地为自己锁定了线程。

public static class Program
{
    static void Main(string[] args)
    {
        //Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
        var _t = new Tester();

        string _string = "Hello World";

        //These three will print their string to the console
        _t.PrintDirectlyWithAction(_string);//Succeeds
        _t.PrintDirectlyWithAction(_string);//Succeeds
        _t.PrintDirectlyWithAction(_string);//Succeeds

        //Only subject 1 will emit and print it's string,the other two fail
        _t.PrintThroughSubject1(_string);//Succeeds
        _t.PrintThroughSubject2(_string);//Fails
        _t.PrintThroughSubject3(_string);//Fails

        _t.PrintDirectlyWithAction(_string);//Fails
        _t.PrintDirectlyWithAction(_string);//Fails
        _t.PrintDirectlyWithAction(_string);//Fails

        //We essentially can't do anything with the thread after subject 1 observed on it

        Console.ReadLine();
    }

    public class Tester
    {
        TaskFactory tf;
        TaskPoolScheduler pool;
        int _actionCount = 0;
        Subject<string> s1 = new Subject<string>();
        Subject<string> s2 = new Subject<string>();
        Subject<string> s3 = new Subject<string>();

        public Tester()
        {
            //We're create a task pool that uses a single threaded concurrent task scheduler
            var _scheduler = new ConcurrentExclusiveSchedulerPair();
            tf = new TaskFactory(_scheduler.ExclusiveScheduler);
            pool = new TaskPoolScheduler(tf);

            //And then we set the subjects to each be observed on the single threaded scheduler
            s1.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
                $"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
            s2.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
                $"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
            s3.ObserveOn(pool).Subscribe(_s => Console.WriteLine(
                $"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
        }

        public void PrintThroughSubject1(string _string)
        {
            s1.OnNext(_string);
        }

        public void PrintThroughSubject2(string _string)
        {
            s2.OnNext(_string);
        }

        public void PrintThroughSubject3(string _string)
        {
            s3.OnNext(_string);
        }

        public void PrintDirectlyWithAction(string _string)
        {
            //This is here to demonstrate that the single threaded task scheduler accepts actions just fine
            //and can handle them in sequence
            tf.StartNew(() =>
            {
                Console.WriteLine(
                    $"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
            });
        }

    }
}

TL;DR:我需要能够强制在特定线程上观察多个可观察的发射,但 RxNet 似乎只允许在一个线程上观察单个主题,而没有其他任何东西可以。我如何规避这一点以在同一线程上观察多个 observable?

解决方法

我可能把它复杂化了。 EventLoopScheduler 可能正是您所需要的。

试试这个:

public static class Program
{
    static void Main(string[] args)
    {
        //Within the Tester class we setup a single threaded task scheduler that will be handling all of these methods
        var _t = new Tester();

        string _string = "Hello World";

        //These three will print their string to the console
        _t.PrintDirectlyWithAction(_string);//Succeeds
        _t.PrintDirectlyWithAction(_string);//Succeeds
        _t.PrintDirectlyWithAction(_string);//Succeeds

        //Only subject 1 will emit and print it's string,the other two fail
        _t.PrintThroughSubject1(_string);//Succeeds
        _t.PrintThroughSubject2(_string);//Fails
        _t.PrintThroughSubject3(_string);//Fails

        _t.PrintDirectlyWithAction(_string);//Fails
        _t.PrintDirectlyWithAction(_string);//Fails
        _t.PrintDirectlyWithAction(_string);//Fails

        //We essentially can't do anything with the thread after subject 1 observed on it

        Console.ReadLine();
    }

    public class Tester
    {
        private EventLoopScheduler els = new EventLoopScheduler();
        int _actionCount = 0;
        Subject<string> s1 = new Subject<string>();
        Subject<string> s2 = new Subject<string>();
        Subject<string> s3 = new Subject<string>();

        public Tester()
        {
            //We're create a task pool that uses a single threaded concurrent task scheduler


            //And then we set the subjects to each be observed on the single threaded scheduler
            s1.ObserveOn(els).Subscribe(_s => Console.WriteLine(
                $"Subject (1) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
            s2.ObserveOn(els).Subscribe(_s => Console.WriteLine(
                $"Subject (2) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
            s3.ObserveOn(els).Subscribe(_s => Console.WriteLine(
                $"Subject (3) says \"{_s}\" - on thread {Thread.CurrentThread.ManagedThreadId}"));
        }

        public void PrintThroughSubject1(string _string)
        {
            s1.OnNext(_string);
        }

        public void PrintThroughSubject2(string _string)
        {
            s2.OnNext(_string);
        }

        public void PrintThroughSubject3(string _string)
        {
            s3.OnNext(_string);
        }

        public void PrintDirectlyWithAction(string _string)
        {
            //This is here to demonstrate that the single threaded task scheduler accepts actions just fine
            //and can handle them in sequence
            els.Schedule(() =>
            {
                Console.WriteLine(
                    $"Direct action ({_actionCount++}) says \"{_string}\" - on thread {Thread.CurrentThread.ManagedThreadId}");
            });
        }

    }
}

我得到了这个结果:

Direct action (0) says "Hello World" - on thread 17
Direct action (1) says "Hello World" - on thread 17
Direct action (2) says "Hello World" - on thread 17
Subject (1) says "Hello World" - on thread 17
Subject (2) says "Hello World" - on thread 17
Subject (3) says "Hello World" - on thread 17
Direct action (3) says "Hello World" - on thread 17
Direct action (4) says "Hello World" - on thread 17
Direct action (5) says "Hello World" - on thread 17

完成后不要忘记.Dispose()你的EventLoopScheduler

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...