boost::process 如何正确读取进程 std::cout 和 std::cerr 并保留顺序

问题描述

我正在通过 boost::process 开始一个过程。该过程使用std::coutstd::cerr输出一些信息。我需要检索这些信息。在某些时候,我希望能够存储那些保留顺序和严重性的输出(来自 coutcerr输出)。

但考虑到 boost::process 重定向输出的方式,我无法做到这一点。我只能将 std::cout 重定向到特定的 ipstream,将 std::cerr 重定向到另一个。然后,在阅读它们时,我无法保留顺序。

这是一个 MCVE 隔离问题:

#include <iostream>

#include <boost/process.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

void doReadOutput( boost::process::ipstream* is,boost::process::ipstream* err,std::ostream* out )
{
    std::string line;
    
    if ( std::getline( *is,line ) ) 
        *out << "cout: " << line << std::endl;
    if ( std::getline( *err,line ) ) 
        *out << "cerr: " << line << std::endl;
}

void readOutput( boost::process::ipstream* is,std::ostream* out,std::atomic_bool* continueFlag )
{
    std::string line;
    while ( *continueFlag )
    {
        doReadOutput( is,err,out );
    }

    // get last outputs that may remain in buffers
    doReadOutput( is,out );
}

int main( int argc,char* argv[] )
{
    if ( argc == 1 )
    {
        // run this same program with "foo" as parameter,to enter "else" statement below from a different process
        try
        {
            boost::process::ipstream is_stream,err_stream;
            std::stringstream merged_output;
            std::atomic_bool continueFlag = true;

            boost::process::child child( argv[0],std::vector<std::string>{ "foo" },boost::process::std_out > is_stream,boost::process::std_err > err_stream );

            boost::thread thrd( boost::bind( readOutput,&is_stream,&err_stream,&merged_output,&continueFlag ) );

            child.wait();

            continueFlag = false;

            thrd.join();

            std::cout << "Program output was:" << std::endl;
            std::cout << merged_output.str();
        }
        catch ( const boost::process::process_error& err )
        {
            std::cerr << "Error: " << err.code() << std::endl;
        }
        catch (...)                                                                                                                                      // @NOCOVERAGE
        {                                                     
            std::cerr << "UnkNown error" << std::endl;
        }
    }
    else
    {
        // program invoked through boost::process by "if" statement above

        std::cerr << "Error1" << std::endl;
        std::cout << "Hello World1" << std::endl;
        std::cerr << "Error2" << std::endl;
        std::cerr << "Error3" << std::endl;
        std::cerr << "Error4" << std::endl;
        std::cerr << "Error5" << std::endl;
        std::cout << "Hello World2" << std::endl;
        std::cerr << "Error6" << std::endl;
        std::cout << "Hello World3" << std::endl;
    }

    return 0;
}

当我执行这个程序时(在 Windows 10 下用 Visual Studio 2019 编译),它输出

Program output was:
cout: Hello World1
cerr: Error1
cout: Hello World2
cerr: Error2
cout: Hello World3
cerr: Error3
cerr: Error4
cerr: Error5
cerr: Error6

虽然我想要:

Program output was:
cerr: Error1
cout: Hello World1
cerr: Error2
cerr: Error3
cerr: Error4
cerr: Error5
cout: Hello World2
cerr: Error6
cout: Hello World3

有什么办法可以实现吗?


编辑,按照一些程序员的建议,为每个输出流创建一个线程:

#include <iostream>

#include <boost/process.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

void doReadOutput( boost::process::ipstream* str,const std::string& prefix,boost::mutex* mutex )
{
    std::string line;

    if ( std::getline( *str,line ) )
    {
        boost::mutex::scoped_lock lock( *mutex );
        *out << prefix << ": " << line << std::endl;
    }
}

void readOutput( boost::process::ipstream* str,std::string prefix,boost::mutex* mutex,std::atomic_bool* continueFlag )
{
    while ( *continueFlag )
    {
        doReadOutput( str,out,prefix,mutex );
        boost::thread::yield();
    }

    // get last outputs that may remain in buffers
    doReadOutput( str,mutex );
}

int main( int argc,err_stream;

            std::stringstream merged_output;
            std::atomic_bool continueFlag = true;

            boost::process::child child( argv[0],boost::process::std_err > err_stream );

            boost::mutex mutex;
            boost::thread thrdis( boost::bind( readOutput,"cout",&mutex,&continueFlag ) );
            boost::thread thrderr( boost::bind( readOutput,"cerr",&continueFlag ) );

            child.wait();

            continueFlag = false;

            thrdis.join();
            thrderr.join();

            std::cout << "Program output was:" << std::endl;
            std::cout << merged_output.str();
        }
        catch ( const boost::process::process_error& err )
        {
            std::cerr << "Error: " << err.code() << std::endl;
        }
        catch (...)                                                                                                                                      // @NOCOVERAGE
        {                                                     
            std::cerr << "UnkNown error" << std::endl;
        }
    }
    else
    {
        // program invoked through boost::process by "if" statement above

        std::cerr << "Error1" << std::endl;
        std::cout << "Hello World1" << std::endl;
        std::cerr << "Error2" << std::endl;
        std::cerr << "Error3" << std::endl;
        std::cerr << "Error4" << std::endl;
        std::cerr << "Error5" << std::endl;
        std::cout << "Hello World2" << std::endl;
        std::cerr << "Error6" << std::endl;
        std::cout << "Hello World3" << std::endl;
    }

    return 0;
}

那么输出为:

Program output was:
cerr: Error1
cout: Hello World1
cerr: Error2
cout: Hello World2
cerr: Error3
cout: Hello World3
cerr: Error4
cerr: Error5
cerr: Error6

还是没想到...

解决方法

您将需要非阻塞 IO。库中支持的方式是使用异步管道。

你会为 stderr/stdout 运行一个循环

  • async_read 到缓冲区中,直到你得到一行或更多行
  • 在行可用时将其从输入缓冲区复制到输出缓冲区

因为你最终会在管道/缓冲区状态上有两次非常相同的循环,所以将它封装成一个类型是有意义的,例如

    struct IoPump {
        IoPump(io_context& io,std::string& merged) : _pipe(io),_merged(merged) {}

        boost::asio::streambuf _buf;
        bp::async_pipe         _pipe;
        std::string&           _merged;

        void do_loop();
    };

    io_context io;

    std::string merged;
    IoPump outp{io,merged},errp{io,merged};

    bp::child child(program,std::vector<std::string> { "foo" },bp::std_out > outp._pipe,bp::std_err > errp._pipe);

    outp.do_loop(); // prime the pump
    errp.do_loop(); // prime the pump
    io.run();

仅此而已。嗯,当然,除了 IoPump::do_loop() 的实际作用:

void do_loop() {
    boost::asio::async_read_until(_pipe,_buf,"\n",[this,out = boost::asio::dynamic_buffer(_merged)](
            error_code ec,size_t xfer) mutable {
            if (!ec) {
                out.commit(buffer_copy(
                    out.prepare(xfer),_buf.data(),xfer));
                _buf.consume(xfer);

                do_loop(); // chain
            } else {
                std::cerr << "IoPump: " << ec.message() << "\n";
            }
        });
}

注意

  • 您的主应用程序完全是单线程的
  • 意味着异步完成处理程序永远不会同时运行
  • 意味着直接访问 std::string merged; 输出缓冲区是安全的,无需担心同步

现场演示

Live On Coliru

static void main_program(char const* program);
static void child_program();

int main(int argc,char** argv) {
    if (argc == 1)
        main_program(argv[0]);
    else
        child_program();
}

#include <iostream>
static void child_program() {
    std::cerr << "Error1"       << std::endl;
    std::cout << "Hello World1" << std::endl;
    std::cerr << "Error2"       << std::endl;
    std::cerr << "Error3"       << std::endl;
    std::cerr << "Error4"       << std::endl;
    std::cerr << "Error5"       << std::endl;
    std::cout << "Hello World2" << std::endl;
    std::cerr << "Error6"       << std::endl;
    std::cout << "Hello World3" << std::endl;
}

#include <boost/process.hpp>
#include <boost/asio.hpp>

static void main_program(char const* program) {
    namespace bp = boost::process;
    try {
        using boost::system::error_code;
        using boost::asio::io_context;

        struct IoPump {
            IoPump(io_context& io,_merged(merged) {}

            boost::asio::streambuf _buf;
            bp::async_pipe         _pipe;
            std::string&           _merged;

            void do_loop() {
                boost::asio::async_read_until(_pipe,out = boost::asio::dynamic_buffer(_merged)](
                        error_code ec,size_t xfer) mutable {
                        if (!ec) {
                            out.commit(buffer_copy(
                                out.prepare(xfer),xfer));
                            _buf.consume(xfer);

                            do_loop(); // chain
                        } else {
                            std::cerr << "IoPump: " << ec.message() << "\n";
                        }
                    });
            }
        };

        io_context io;

        std::string merged;
        IoPump outp{io,merged};

        bp::child child(program,bp::std_err > errp._pipe);

        outp.do_loop(); // prime the pump
        errp.do_loop(); // prime the pump
        io.run();

        std::cout << "Program output was:" << std::endl;
        std::cout << merged;
    } catch (const bp::process_error& err) {
        std::cerr << "Error: " << err.code().message() << std::endl;
    } catch (...) { // @NOCOVERAGE
        std::cerr << "Unknown error" << std::endl;
    }
}

印刷品

IoPump: End of file
IoPump: End of file

和标准输出:

Program output was:
Error1
Error2
Hello World1
Error3
Hello World2
Error4
Hello World3
Error5
Error6

其他示例

我在这个网站上已经有很多例子了。只需寻找async_pipe

开箱即用

您可以简单地在描述符级别将 stderr 重定向到 stdout 并完成!例如

Live On Coliru

    boost::asio::io_context io;
    std::future<std::string> merged;

    bp::child child(program,bp::std_out > merged,bp::posix::fd.bind(2,1),io);

    io.run();

    std::cout << "Program output was:" << std::quoted(merged.get()) << "\n";

或者使用逐行阅读循环:

Live On Coliru

    bp::ipstream merged;

    bp::child child(program,1));

    child.wait();

    std::cout << "Program output was:" << std::endl;
    for (std::string line; getline(merged,line);)
        std::cout << "merged: " << std::quoted(line) << "\n";

打印

Program output was:
merged: "Error1"
merged: "Hello World1"
merged: "Error2"
merged: "Error3"
merged: "Error4"
merged: "Error5"
merged: "Hello World2"
merged: "Error6"
merged: "Hello World3"