问题描述
我正在使用 java IO 流从 Unix 命名管道读取命令。该程序有一个线程,它使用 mkfifo
创建一个命名管道,然后监听它,允许用户使用 echo "command" > pipe
向它发送命令。
大多数情况下它都可以正常工作,除非我的程序必须在尚未收到任何命令的情况下终止(例如发生不可恢复的异常)。
如此处所述:Not able to read from named pipe in Java FileInputStream
构造函数会阻塞,直到其他进程打开管道进行写入。这让线程实现没有机会处理中断并正确终止。
我制作了一个简单的程序来展示我的问题。它会监听输入命令 3 秒,然后尝试(但失败)自行停止。
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.function.Consumer;
/**
* Read the input named FIFO to dispatch commands to a command processor.
*/
public final class InputReaderThread {
private static final long LOOP_PERIOD_MS = 100;
private final Path fifoPath;
private final Consumer<String[]> processor;
private final Thread thread;
/**
* Constructor.
*
* @param fifoPath path to the named FIFO
* @param processor the command processor
*/
public InputReaderThread(final Path fifoPath,final Consumer<String[]> processor) {
this.fifoPath = fifoPath;
this.processor = processor;
this.thread = new Thread(this::run);
this.thread.setName(InputReaderThread.class.getSimpleName());
}
/**
* Start the input reader thread.<br>
* It is never legal to start a thread more than once. In particular,a thread may not be restarted once it has completed execution.
*
* @see Thread#start()
*/
public final void start() {
System.out.println("Requesting input reader thread start");
this.thread.start();
}
/**
* Request the input reader thread to stop.<br>
* This method does not wait for the thread to terminate,see {@link #join(long)}.
*/
public final void stop() {
System.out.println("Requesting input reader thread stop");
this.thread.interrupt();
}
/**
* Wait for the internal reader thread to terminate.<br>
* Throws an {@link InterruptedException} on timeout.
*
* @param timeout the maximum time to wait,in milliseconds
* @return {@code true} if the input thread terminated within timeout
* @throws InterruptedException the current thread was interrupted while waiting,or the timeout was reached
* @see Thread#join(long)
*/
public final boolean join(final long timeout) throws InterruptedException {
System.out.println("Awaiting input reader thread stop");
this.thread.join(timeout);
return !this.thread.isAlive();
}
private final void run() {
System.out.println("Input reader thread started");
try (final FileInputStream is = new FileInputStream(createFifoPipe(this.fifoPath))) {
final StringBuilder commandBuilder = new StringBuilder();
System.out.println("Listening to input FIFO");
while (!Thread.interrupted()) {
// Avoid reading when there is no data available
if (is.available() > 0) {
final int b = is.read();
if (b == '\n') {
// The command is complete: process it
final String command = commandBuilder.toString().trim();
System.out.println("Received command: " + command);
this.processor.accept(command.split(" "));
// Reset the command builder
commandBuilder.setLength(0);
} else {
// Append the character to the command
commandBuilder.append((char) b);
}
} else {
// Poll the input FIFO periodically
Thread.sleep(LOOP_PERIOD_MS);
}
}
} catch (final IOException e) {
throw new RuntimeException("An IO exception occurred on agent FIFO",e);
} catch (@SuppressWarnings("unused") final InterruptedException e) {
// Handle interruption by terminating the thread
}
System.out.println("Input reader thread terminated");
}
/**
* Helper method to create a Unix named FIFO.
*
* @param fifoPath the FIFO path
* @return the File handler to the created FIFO
* @throws IOException an I/O error occurs
* @throws InterruptedException the thread was interrupted while waiting for the FIFO creation
*/
private static final File createFifoPipe(final Path fifoPath) throws IOException,InterruptedException {
System.out.println("Creating fifo: " + fifoPath);
final File fifo = fifoPath.toFile();
if (fifo.exists()) {
System.err.println("Deleting existing fifo");
Files.delete(fifoPath);
}
final String[] command = new String[] { "mkfifo",fifo.getAbsolutePath() };
final Process process = new ProcessBuilder(command).start();
final int returnStatus = process.waitFor();
if (returnStatus != 0) {
throw new IOException("Failed to create fifo: " + returnStatus);
} else {
System.out.println("Created fifo: " + fifoPath);
}
return fifo;
}
}
NamedPipeTest.java
:
import java.nio.file.Paths;
/**
* Dummy program for {@link InputReaderThread}.
*/
public final class NamedPipeTest {
/**
* Entry point.
*
* @param args one argument (path to named pipe)
*/
public static void main(final String[] args) {
final NamedPipeTest fifoTest = new NamedPipeTest(args[0]);
fifoTest.start();
try {
Thread.sleep(3000);
fifoTest.stop();
} catch (final InterruptedException e) {
System.err.println("Main thread interrupted");
e.printstacktrace(System.err);
}
}
private final InputReaderThread inputReader;
/**
* Constructor.
*
* @param fifoPath the named pipe path
*/
public NamedPipeTest(final String fifoPath) {
this.inputReader = new InputReaderThread(Paths.get(fifoPath),this::process);
}
/**
* Start the program.
*/
public void start() {
this.inputReader.start();
}
/**
* Stop the program.
*/
public void stop() {
try {
this.inputReader.stop();
if (!this.inputReader.join(5000)) {
System.err.println("Failed to terminate input reader thread");
System.exit(-1);
}
} catch (@SuppressWarnings("unused") final InterruptedException e) {
System.err.println("Interrupted while stopping");
System.exit(-1);
}
System.out.println("Stopped successfully");
}
/**
* Process a command.
*
* @param command the command
*/
public void process(final String[] command) {
if (command[0].equals("stop")) {
new Thread(this::stop).start();
}
}
}
运行这个程序在命名管道中写入“停止”显示输入读取器线程在处理中断时正确终止(不确定为什么线程终止被记录两次,但这将是另一个问题) :
$ (java -jar NamedPipeTest.jar fifo &) && sleep 2s && echo "stop" > fifo
Requesting input reader thread start
Input reader thread started
Creating fifo: fifo
Deleting existing fifo
Created fifo: fifo
Listening to input FIFO
Received command: stop
Requesting input reader thread stop
Awaiting input reader thread stop
Input reader thread terminated
Stopped successfully
Requesting input reader thread stop
Awaiting input reader thread stop
Stopped successfully
然而,运行它而不写“停止”命令显示FileInputStream
在打开管道时无法处理中断:
$ java -jar NamedPipeTest.jar fifo &
Requesting input reader thread start
Input reader thread started
Creating fifo: fifo
Deleting existing fifo
Created fifo: fifo
Requesting input reader thread stop
Awaiting input reader thread stop
Failed to terminate input reader thread
我想到的事情:
- 线程不应该成为守护进程,因为它正在执行 IO(混合这两者似乎是个坏主意)
- Java NIO API 依赖于seek(),这对于命名管道来说似乎是不可取的
我不知道如何解决这个问题,我错过了什么吗?感谢您的帮助!
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)