如何避免 Java ExecutorService 中的上下文切换

问题描述

我使用软件 (AnyLogic) 导出可运行的 jar 文件,这些文件本身会重复重新运行一组具有不同参数的模拟(所谓的参数变化实验)。我正在运行的模拟非常占用 RAM,因此我必须限制 jar 文件可用的内核数。在 AnyLogic 中,可用内核的数量很容易设置,但是从服务器上的 Linux 命令行,我知道如何做到这一点的唯一方法是使用 taskset 命令手动指定要使用的可用内核(使用 cpu 亲和性“掩码”)。到目前为止,这种方法运行良好,但由于您必须指定要使用的单个内核,因此我了解到,根据您选择的内核,性能可能存在相当大的差异。例如,您可能希望最大限度地利用 cpu 缓存级别,因此如果您选择共享过多缓存的内核,您的性能会大大降低。

由于 AnyLogic 是用 Java 编写的,所以我可以使用 Java 代码来指定模拟的运行。我正在考虑使用 Java ExecutorService 来构建一个单独运行的池,这样我就可以将池的大小指定为与我正在使用的机器的 RAM 匹配的内核数量。我认为这会带来许多好处,最重要的是,也许计算机的调度程序可以更好地选择内核以最大程度地缩短运行时间。

在我的测试中,我构建了一个运行时间大约为 10 秒的小型 AnyLogic 模型(它只是在 2 个状态图状态之间反复切换)。然后我用这个简单的代码创建了一个自定义实验。

ExecutorService service = Executors.newFixedThreadPool(2);

for (int i=0; i<10; i++)
{
    Simulation experiment = new Simulation();
    experiment.variable = i;
    service.execute( () -> experiment.run() );
}

我希望看到的是一次只有 2 个 Simulation 对象启动,因为这是线程池的大小。但是我看到所有 10 个启动并在 2 个线程上并行运行。这让我认为正在发生上下文切换,我认为这非常低效。

当我没有调用 AnyLogic Simulation,而是在 service.execute 函数调用自定义 Java 类(如下)时,它似乎工作正常,仅显示 2 个 Tasks 正在运行一次。

public class Task implements Runnable,Serializable {

    public void run() {
        traceln("Starting task on thread " + Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printstacktrace();
        }
        traceln("Ending task on thread " + Thread.currentThread().getName());
    }
}

有谁知道为什么 AnyLogic 功能似乎同时设置所有模拟?

解决方法

我猜 Simulation 是从 ExperimentParamVariation 扩展而来的。实现您想要的目标的关键是确定实验何时结束。

文档显示了一些有趣的方法,例如 getProgress()getState(),但您必须轮询这些方法,直到进度为 1 或状态为 FINISHED 或 { {1}}。还有一些方法 onAfterExperiment()onError() 应该由引擎调用以指示实验已经结束或出现错误。我认为您可以使用带有 Semaphore 的最后两种方法来控制一次运行的实验数量:

ERROR

然后,您必须配置一个具有所需许可数量的信号量,并将其传递给 import java.util.concurrent.Semaphore; import com.anylogic.engine.ExperimentParamVariation; public class Simulation extends ExperimentParamVariation</* Agent */> { private final Semaphore semaphore; public Simulation(Semaphore semaphore) { this.semaphore = semaphore; } public void onAfterExperiment() { this.semaphore.release(); super.onAfterExperiment(); } public void onError(Throwable error) { this.semaphore.release(); super.onError(error); } // run() cannot be overriden because it is final // You could create another run method or acquire a permit from the semaphore elsewhere public void runWithSemaphore() throws InterruptedException { // This acquire() will block until a permit is available or the thread is interrupted this.semaphore.acquire(); this.run(); } } 实例:

Simulation
,

首先,我认为是 AnyLogic 功能的一个相对较新的补充,这整个问题已经被否定了。您可以指定具有指定数量的“并行工作器”的 ini 文件。

https://help.anylogic.com/index.jsp?topic=%2Fcom.anylogic.help%2Fhtml%2Frunning%2Fexport-java-application.html&cp=0_3_9&anchor=customize-settings

但是在找到这个(更好的)选项之前,我已经设法找到了一个可行的解决方案。 Hernan 的回答几乎就足够了。我认为它受到了 AnyLogic 引擎的一些变幻莫测的阻碍(正如我在评论中详述的那样)。

我能想到的最好的版本是使用 ExecuterService。在自定义实验中,我放置了以下代码:

ExecutorService service = Executors.newFixedThreadPool(2);

List<Callable<Integer>> tasks = new ArrayList<>();

for (int i=0; i<10; i++)
{
    int t = i;
    tasks.add( () -> simulate(t) );
}

try{
    traceln("starting setting up service");
    List<Future<Integer>> futureResults = service.invokeAll(tasks);
    
    traceln("finished setting up service");
    
    List<Integer> res = futureResults.stream().parallel().map(
        f -> {
            try {
                return f.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return null;
        }).collect(Collectors.toList());
    System.out.println("----- Future Results are ready -------");
    

    System.out.println("----- Finished -------");
    
} catch (InterruptedException e) {
    e.printStackTrace();
}

service.shutdown();

这里的关键是使用 Java Future。此外,为了使用 invokeAll 函数,我在附加类代码块中创建了一个函数:

public int simulate(int variable){
    // Create Engine,initialize random number generator:
    Engine engine = createEngine();
    // Set stop time
    engine.setStopTime( 100000 );
    // Create new root object:
    Main root = new Main( engine,null,null );
    root.parameter = variable;
    // Prepare Engine for simulation:
    engine.start( root );
    // Start simulation in fast mode:
    //traceln("attempting to acquire 1 permit on run "+variable);
    //s.acquireUninterruptibly(1);
    traceln("starting run "+variable);
    engine.runFast();
    traceln("ending run "+variable);
    //s.release();
    // Destroy the model:
    engine.stop();
    
    traceln( "Finished,run "+variable);
    return 1;
}

我可以看到这种方法的唯一限制是我没有每隔几分钟输出一次等待循环。但是我没有找到解决方案,而是必须放弃这项工作,以便在顶部链接中找到更好的设置文件解决方案。