终止在从 UNIX 命名管道打开输入流时阻塞的线程

问题描述

我正在使用 java IO 流从 Unix 命名管道读取命令。该程序有一个线程,它使用 mkfifo 创建一个命名管道,然后监听它,允许用户使用 echo "command" > pipe 向它发送命令。
大多数情况下它都可以正常工作,除非我的程序必须在尚未收到任何命令的情况下终止(例如发生不可恢复的异常)。
如此处所述:Not able to read from named pipe in Java FileInputStream 构造函数会阻塞,直到其他进程打开管道进行写入。这让线程实现没有机会处理中断并正确终止。

我制作了一个简单的程序来展示我的问题。它会监听输入命令 3 秒,然后尝试(但失败)自行停止。

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Consumer;

/**
 * Read the input named FIFO to dispatch commands to a command processor.
 */
public final class InputReaderThread {

    private static final long LOOP_PERIOD_MS = 100;

    private final Path fifoPath;

    private final Consumer<String[]> processor;

    private final Thread thread;

    /**
     * Constructor.
     *
     * @param fifoPath path to the named FIFO
     * @param processor the command processor
     */
    public InputReaderThread(final Path fifoPath,final Consumer<String[]> processor) {
        this.fifoPath = fifoPath;
        this.processor = processor;

        this.thread = new Thread(this::run);
        this.thread.setName(InputReaderThread.class.getSimpleName());
    }

    /**
     * Start the input reader thread.<br>
     * It is never legal to start a thread more than once. In particular,a thread may not be restarted once it has completed execution.
     *
     * @see Thread#start()
     */
    public final void start() {
        System.out.println("Requesting input reader thread start");
        this.thread.start();
    }

    /**
     * Request the input reader thread to stop.<br>
     * This method does not wait for the thread to terminate,see {@link #join(long)}.
     */
    public final void stop() {
        System.out.println("Requesting input reader thread stop");
        this.thread.interrupt();
    }

    /**
     * Wait for the internal reader thread to terminate.<br>
     * Throws an {@link InterruptedException} on timeout.
     *
     * @param timeout the maximum time to wait,in milliseconds
     * @return {@code true} if the input thread terminated within timeout
     * @throws InterruptedException the current thread was interrupted while waiting,or the timeout was reached
     * @see Thread#join(long)
     */
    public final boolean join(final long timeout) throws InterruptedException {
        System.out.println("Awaiting input reader thread stop");
        this.thread.join(timeout);
        return !this.thread.isAlive();
    }

    private final void run() {
        System.out.println("Input reader thread started");

        try (final FileInputStream is = new FileInputStream(createFifoPipe(this.fifoPath))) {
            final StringBuilder commandBuilder = new StringBuilder();

            System.out.println("Listening to input FIFO");
            while (!Thread.interrupted()) {
                // Avoid reading when there is no data available
                if (is.available() > 0) {
                    final int b = is.read();

                    if (b == '\n') {
                        // The command is complete: process it
                        final String command = commandBuilder.toString().trim();
                        System.out.println("Received command: " + command);
                        this.processor.accept(command.split(" "));

                        // Reset the command builder
                        commandBuilder.setLength(0);
                    } else {
                        // Append the character to the command
                        commandBuilder.append((char) b);
                    }
                } else {
                    // Poll the input FIFO periodically
                    Thread.sleep(LOOP_PERIOD_MS);
                }
            }
        } catch (final IOException e) {
            throw new RuntimeException("An IO exception occurred on agent FIFO",e);
        } catch (@SuppressWarnings("unused") final InterruptedException e) {
            // Handle interruption by terminating the thread
        }

        System.out.println("Input reader thread terminated");
    }

    /**
     * Helper method to create a Unix named FIFO.
     *
     * @param fifoPath the FIFO path
     * @return the File handler to the created FIFO
     * @throws IOException an I/O error occurs
     * @throws InterruptedException the thread was interrupted while waiting for the FIFO creation
     */
    private static final File createFifoPipe(final Path fifoPath) throws IOException,InterruptedException {
        System.out.println("Creating fifo: " + fifoPath);

        final File fifo = fifoPath.toFile();
        if (fifo.exists()) {
            System.err.println("Deleting existing fifo");
            Files.delete(fifoPath);
        }

        final String[] command = new String[] { "mkfifo",fifo.getAbsolutePath() };
        final Process process = new ProcessBuilder(command).start();

        final int returnStatus = process.waitFor();
        if (returnStatus != 0) {
            throw new IOException("Failed to create fifo: " + returnStatus);
        } else {
            System.out.println("Created fifo: " + fifoPath);
        }

        return fifo;
    }
}

NamedPipeTest.java

import java.nio.file.Paths;

/**
 * Dummy program for {@link InputReaderThread}.
 */
public final class NamedPipeTest {

    /**
     * Entry point.
     *
     * @param args one argument (path to named pipe)
     */
    public static void main(final String[] args) {
        final NamedPipeTest fifoTest = new NamedPipeTest(args[0]);
        fifoTest.start();

        try {
            Thread.sleep(3000);
            fifoTest.stop();
        } catch (final InterruptedException e) {
            System.err.println("Main thread interrupted");
            e.printstacktrace(System.err);
        }
    }

    private final InputReaderThread inputReader;

    /**
     * Constructor.
     *
     * @param fifoPath the named pipe path
     */
    public NamedPipeTest(final String fifoPath) {
        this.inputReader = new InputReaderThread(Paths.get(fifoPath),this::process);
    }

    /**
     * Start the program.
     */
    public void start() {
        this.inputReader.start();
    }

    /**
     * Stop the program.
     */
    public void stop() {
        try {
            this.inputReader.stop();
            if (!this.inputReader.join(5000)) {
                System.err.println("Failed to terminate input reader thread");
                System.exit(-1);
            }
        } catch (@SuppressWarnings("unused") final InterruptedException e) {
            System.err.println("Interrupted while stopping");
            System.exit(-1);
        }
        System.out.println("Stopped successfully");
    }

    /**
     * Process a command.
     *
     * @param command the command
     */
    public void process(final String[] command) {
        if (command[0].equals("stop")) {
            new Thread(this::stop).start();
        }
    }
}

运行这个程序在命名管道中写入“停止”显示输入读取器线程在处理中断时正确终止(不确定为什么线程终止被记录两次,但这将是另一个问题) :

$ (java -jar NamedPipeTest.jar fifo &) && sleep 2s && echo "stop" > fifo
Requesting input reader thread start
Input reader thread started
Creating fifo: fifo
Deleting existing fifo
Created fifo: fifo
Listening to input FIFO
Received command: stop
Requesting input reader thread stop
Awaiting input reader thread stop
Input reader thread terminated
Stopped successfully
Requesting input reader thread stop
Awaiting input reader thread stop
Stopped successfully

然而,运行它而不写“停止”命令显示FileInputStream在打开管道时无法处理中断:

$ java -jar NamedPipeTest.jar fifo &
Requesting input reader thread start
Input reader thread started
Creating fifo: fifo
Deleting existing fifo
Created fifo: fifo
Requesting input reader thread stop
Awaiting input reader thread stop
Failed to terminate input reader thread

我想到的事情:

  • 线程不应该成为守护进程,因为它正在执行 IO(混合这两者似乎是个坏主意)
  • Java NIO API 依赖于seek(),这对于命名管道来说似乎是不可取的

我不知道如何解决这个问题,我错过了什么吗?感谢您的帮助!

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...