使用 C++ stdlib 从管道中非阻塞读取

问题描述

在 2021 年,有没有办法使用 C++ 标准库的高级工具从管道中进行非阻塞读取? IE。 C++17(或者C++20)?在 Linux 上开发,但需要一定程度的可移植性到其他 *NIX。 Boost 不是解决我手头问题的选项。

我非常了解 POSIX 函数,即 poll()open()O_NONBLOCK。沿着这条路线,我将实现自己的缓冲并拆分成行,而且该数据是二进制的(实际上这并不是看起来的那么大的问题,只要它是 unicode-utf8 并且基本上只是传递到一些报告,其他现有软件将在其中正确呈现字符,例如 Markdown 到 HTML)。

我只是在问自己,我真的必须(再次)实现它吗?或者是否有一些现成的解决方案可用,我只是忽略了?据我了解,std::basic_istream<..> 不太适合,因为它会尝试填充底层缓冲区,并且会在管道中没有足够数据时阻塞。

对于背景: 我正在通过两个管道从子进程中检索 STDIN 和 STDERR。我必须逐行交错这两个流。这发生在一个专用的读取器线程中——然而,这个读取器线程不能被卡住,即使子进程进入一个实时锁。

解决方法

C++ 库没有“管道”或其他操作系统特定资源的概念。

背景:我正在从子进程中检索 STDIN 和 STDERR 通过两个管道。我必须在一条线上交错这两个流 线基。

对于这种情况我会怎么做:子类化 std::streambuf 并覆盖 underflow()。重写的 underflow() 分别实现了来自两个管道的非阻塞读取,为每个管道保留了单独的缓冲区。并且 std::streambuf 的缓冲区填充了来自任何管道的已完成行,只要有一个已完成的行就可以从中读取。

有一个重载的 std::istream 构造函数,它接受一个指向自定义 std::streambuf 参数的指针。

你最终会得到一个看起来像普通的花园品种,std::istream,你可以std::getline,并让任何管道设法产生一条完整的线,最终以逐行交错的方式结束输入。

,

产生两个 std::thread,每个读数来自不同的管道。将 std::getline 读入单独的 std::string。读完一行后,将 std::string 放入受 std::vector<std::string> 保护的 std::mutex,然后通知 condition_variable。在主线程中,您可以等待新事件的 condition_variable,然后获取 mutex 并将 std::vector<std::string> 中的所有行刷新到输出。

像这样:

#include <string>
#include <thread>
#include <iostream>
#include <mutex>
#include <vector>
#include <condition_variable>
#include <fstream>
#include <assert.h>

std::mutex g_lines_mutex;
std::condition_variable g_lines_notify;
std::vector<std::string> g_lines;

void outputter() {
    while (1) {
        std::unique_lock<std::mutex> guard(g_lines_mutex);
        if (g_lines.empty()) {
             g_lines_notify.wait(guard);
        }
        for (auto&& i : g_lines) {
            std::cout << "Read line: " << i << "\n";
        }
        g_lines.clear();
    }
}

void interleaver(const char *arg) {
    std::ifstream f(arg);
    std::string line;
    while (std::getline(f,line)) {
        {
            std::lock_guard<std::mutex> guard(g_lines_mutex);
            g_lines.emplace_back(std::move(line));
        }
        g_lines_notify.notify_one();
    }
}

int main(int argc,char *argv[]) {
    assert(argc == 3);
    std::array<std::thread,3> t = {
        std::thread{ outputter },std::thread{ interleaver,argv[1] },argv[2] },};
    for (auto&& i : t) {
        i.join();
    }
}

然后这样的程序编译并运行:

$ mkfifo fifo1 fifo2; 
$ ( exec 3> fifo1 ; exec 4> fifo2; while sleep 1; do echo 1 $(date) >&3 ; echo 2 $(date) >&4; done; ) &
$ g++ -pthread ./1.cpp && ./a.out fifo1 fifo2
Read line: 1 Sun,01 Aug 2021 17:41:25 +0200
Read line: 2 Sun,01 Aug 2021 17:41:25 +0200
Read line: 1 Sun,01 Aug 2021 17:41:26 +0200
Read line: 2 Sun,01 Aug 2021 17:41:26 +0200
Read line: 1 Sun,01 Aug 2021 17:41:27 +0200
Read line: 2 Sun,01 Aug 2021 17:41:27 +0200

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...