如何让 Rx 回调在 ThreadPool 上运行?

问题描述

您是否希望下面的程序打印 False

using System;
using System.Threading;
using System.Reactive.Linq;
using System.Reactive.Concurrency;

public static class Program
{
    public static void Main()
    {
        Observable
            .Return(1)
            .ObserveOn(ThreadPoolScheduler.Instance)
            .Do(x => Console.WriteLine(Thread.CurrentThread.IsThreadPoolThread))
            .Wait();
    }
}

输出

False

我不明白的是,ThreadPoolScheduler 用于安排 ThreadPool 上的工作,但显然这不是正在发生的事情。它的名称可能指的是 Rx 内部的其他一些线程池,而不是 System.Threading 命名空间中的实际 ThreadPool 类。

我进行了各种尝试以强制回调在 ThreadPool 上运行,但没有成功。我尝试过的一些事情:

.ObserveOn(Scheduler.Default)
.ObserveOn(DefaultScheduler.Instance)
.ObserveOn(TaskPoolScheduler.Default)
.ObserveOn(new TaskPoolScheduler(Task.Factory))
.ObserveOn(new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)))
ThreadPoolScheduler.Instance.disableOptimizations(); // At the start of the program

上述最后一次尝试是在阅读了 Microsoft 论坛中的 this 问题之后。无论我尝试过什么,Thread.IsThreadPoolThread 属性都会不断返回 false

我是否应该编写自己的 IScheduler 实现以确保我的代码ThreadPool 上运行?对于这样一个微不足道的目标来说,这听起来是一项非常重要的任务。

.NET 5.0.1、System.Reactive 5.0.0、C# 9

解决方法

你分享的链接其实有解决办法:

var modScheduler = ThreadPoolScheduler.Instance.DisableOptimizations(new[] { typeof(ISchedulerLongRunning) });
Observable
    .Return(1)
    .ObserveOn(modScheduler)
    .Select(_ => Thread.CurrentThread)
    .Subscribe(t => Console.WriteLine(t.IsThreadPoolThread));

如果您查看 ThreadPoolScheduler 的源代码,您会看到所有工作都发送到了 ThreadPool,除了 ScheduleLongRunning。如果禁用该优化,则所有工作都将发送到 ThreadPool。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...