boost::asio::async_read_until 带有自定义 match_char 以仅接受 JSON 格式

问题描述

我一直在尝试更改 match_char 函数以在从套接字读取数据时仅接受 JSON 消息。

我有 2 个实现(一个不起作用,另一个有效,但我认为效率不高)。

1- 第一种方法(工作)

    typedef boost::asio::buffers_iterator<boost::asio::streambuf::const_buffers_type> buffer_iterator;

    static std::pair<buffer_iterator,bool> match_json2(const buffer_iterator begin,const buffer_iterator end) {
        buffer_iterator i = begin;
        while (i != end) {
            if ((*i == ']') || (*i == '}')) {
                return std::make_pair(i,true);
            }
            *i++;
        }
        return std::make_pair(i,false);
    }

根据这个定义,我循环读取并重建json。这是一个工作版本,但如果我收到一条与有效 json 不同的消息,我会留在循环中,无法清除 tmp_response 并且永远不会从中恢复...

        std::string read_buffer_string() {
            std::string response;
            bool keepReading = true;
            while (keepReading) {
                std::string tmp_response;
                async_read_until(s,ba::dynamic_buffer(tmp_response),match_json2,yc);
                if (!tmp_response.empty()) {
                    response += tmp_response;
                    if (nlohmann::json::accept(response)) {
                        keepReading = false;
                    }
                }
            }
            return response;
        }
  1. 第二种方法(不起作用)。理想情况下,我想要这样的东西(此实现不起作用,因为开始迭代器并不总是指向消息的开头 - 我猜一些数据已经传输到缓冲区 - 因此 match_json 返回无效值。

     static std::pair<buffer_iterator,bool> match_json(const buffer_iterator begin,const buffer_iterator end) {
         buffer_iterator i = begin;
         while (i != end) {
             if ((*i == ']') || (*i == '}')) {
                 std::string _message(begin,i);
                 std::cout << _message << std::endl;
                 if (nlohmann::json::accept(_message)) {
                     return std::make_pair(i,true);
                 }
             }
             *i++;
         }
         return std::make_pair(i,false);
     }
    

然后这样称呼它:

        std::string read_buffer_string() {
            std::string response;
            async_read_until(s,ba::dynamic_buffer(response),match_json,yc);
            return response;
        }

现在有没有人有更有效的方法来做到这一点? 提前致谢! :)

解决方法

当然,在发布我的 other answer 后,我想起 Boost 在 1.75.0 中接受了 Boost JSON。

它以更优雅的方式进行流解析:https://www.boost.org/doc/libs/1_75_0/libs/json/doc/html/json/ref/boost__json__stream_parser.html#json.ref.boost__json__stream_parser.usage

它实际上也处理尾随数据!

stream_parser p;                  // construct a parser
std::size_t n;                    // number of characters used
n = p.write_some( "[1,2" );       // parse some of a JSON
assert( n == 4 );                 // all characters consumed
n = p.write_some( ",3,4] null" ); // parse the remainder of the JSON
assert( n == 6 );                 // only some characters consumed
assert( p.done() );               // we have a complete JSON
value jv = p.release();           // take ownership of the value

我还认为这可能更适合 CompletionCondition:参见 https://www.boost.org/doc/libs/1_75_0/doc/html/boost_asio/reference/read/overload3.html

这是我测试过的实现:

template <typename Buffer,typename SyncReadStream>
static size_t read_json(SyncReadStream& s,Buffer buf,boost::json::value& message,boost::json::parse_options options = {})
{
    boost::json::stream_parser p{{},options};

    size_t total_parsed = 0;
    boost::asio::read(s,buf,[&](boost::system::error_code ec,size_t /*n*/) {
        size_t parsed = 0;

        for (auto& contiguous : buf.data()) {
            parsed += p.write_some(
                boost::asio::buffer_cast<char const*>(contiguous),contiguous.size(),ec);
        }
        buf.consume(parsed);
        total_parsed += parsed;
        return ec || p.done(); // true means done
    });

    message = p.release(); // throws if incomplete
    return total_parsed;
}

为流缓冲添加委托重载:

template <typename SyncReadStream,typename Alloc>
static size_t read_json(SyncReadStream& s,boost::asio::basic_streambuf<Alloc>& buf,boost::json::parse_options options = {})
{
    return read_json(s,boost::asio::basic_streambuf_ref<Alloc>(buf),message,options);
}

演示程序

此演示程序添加了测试用例 from earlier 以及添加了一些基准统计数据的套接字客户端。参数:

  • test 代替套接字客户端运行测试
  • streambuf 使用 streambuf 重载而不是 std::string 动态缓冲区
  • comments 允许在 JSON 中添加注释
  • trailing_commas 允许在 JSON 中使用尾随逗号
  • invalid_utf8 允许在 JSON 中使用无效的 utf8

Live On Compiler Explorer¹

#include <boost/spirit/home/x3.hpp>
#include <boost/fusion/adapted.hpp>
#include <iomanip>
#include <iostream>
namespace x3 = boost::spirit::x3;

int main() {
    std::string const s = 
        "? 8==2 : true ! false"
        "? 9==3 : 'book' ! 'library'";

    using expression = std::string;
    using ternary = std::tuple<expression,expression,expression>;
    std::vector<ternary> parsed;

    auto expr_ = x3::lexeme [+~x3::char_("?:!")];
    auto ternary_ = "?" >> expr_ >> ":" >> expr_ >> "!" >> expr_;

    std::cout << "=== parser approach:\n";
    if (x3::phrase_parse(begin(s),end(s),*x3::seek[ ternary_ ],x3::space,parsed)) {

        for (auto [cond,e1,e2] : parsed) {
            std::cout
                << " condition " << std::quoted(cond) << "\n"
                << " true expression " << std::quoted(e1) << "\n"
                << " else expression " << std::quoted(e2) << "\n"
                << "\n";
        }
    } else {
        std::cout << "non matching" << '\n';
    }
}

使用 test 打印:

 ----- valid test cases
Testing {}                     -> Success {}
Testing {"a":4,"b":5}         -> Success {"a":4,"b":5}
Testing []                     -> Success []
Testing [4,"b"]               -> Success [4,"b"]
 ----- incomplete test cases
Testing {                      -> (incomplete...)
Testing {"a":4,"b"            -> (incomplete...)
Testing [                      -> (incomplete...)
Testing [4,"                  -> (incomplete...)
 ----- invalid test cases
Testing }                      -> syntax error
Testing "a":4 }                -> Success "a" -- remaining `:4 }`
Testing ]                      -> syntax error
 ----- excess input test cases
Testing {}{"a":4,"b":5}       -> Success {} -- remaining `{"a":4,"b":5}`
Testing []["a","b"]           -> Success [] -- remaining `["a","b"]`
Testing {} bogus trailing data -> Success {} -- remaining `bogus trailing data`

使用套接字客户端的一些演示:

Mean packet size: 16 in 2 packets
Request: 28 bytes
Request: {"a":4,"b":"5"} bytes
Remaining data: "bye
"
took 0.000124839s,~0.213899MiB/s

具有大型 (448MiB) location_history.json:

Mean packet size: 511.999 in 917791 packets
Request: 469908167 bytes
 (large request output suppressed)
took 3.30509s,~135.59MiB/s

enter image description here


¹ 编译器资源管理器不支持仅链接非标头图书馆

,

TL/DR;

说真的,只需将框架添加到您的有线协议中即可。例如。甚至 HTTP 响应也会这样做(例如,通过内容长度标头,以及分块编码)

更新:

您可以使用我在 another answer 中添加的 Boost JSON 来代替手动操作


第一种方法有缺陷,因为您使用了“async_read_until”,但将操作视为同步操作。

第二个问题是,json::parsejson::accept 都无法报告完整/损坏解析的位置。这意味着您确实需要在有线协议中进行成帧,因为您无法检测消息边界。

本答案的其余部分将首先深入揭示 nlohmann::json 库的局限性如何使您的任务无法完成¹。

因此,即使您使用现有库是值得称道的,我们也在寻找替代方案。

让它发挥作用(?)

您可以使用 Beast 使用的方法 (http::read(s,http::message<>)。即:引用整个缓冲区。

flat_buffer buf;
http::request<http::empty_body> m;
read(s,m); // is a SyncStream like socket

这里,read 是对消息和缓冲区的组合操作。这使得检查完成标准变得容易。在我们的例子中,让我们制作一个也用作匹配条件的阅读器:

template <typename DynamicBuffer_v1>
struct JsonReader {
    DynamicBuffer_v1 _buf;
    nlohmann::json message;

    JsonReader(DynamicBuffer_v1 buf) : _buf(buf) {}

    template <typename It>
    auto operator()(It dummy,It) {
        using namespace nlohmann;

        auto f = buffers_begin(_buf.data());
        auto l = buffers_end(_buf.data());
        bool ok = json::accept(f,l);
        if (ok) {
            auto n = [&] {
                std::istringstream iss(std::string(f,l));
                message = json::parse(iss);
                return iss.tellg(); // detect consumed
            }();

            _buf.consume(n);
            assert(n);
            std::advance(dummy,n);
            return std::pair(dummy,ok);
        } else {
            return std::pair(dummy,ok);
        }
    }
};

namespace boost::asio {
    template <typename T>
    struct is_match_condition<JsonReader<T>> : public boost::true_type { };
}

这是桃子,在快乐的道路上工作。但是您在边缘/错误情况下遇到了大麻烦:

  • 你无法区分不完整的数据和无效的数据,所以你必须假设未被接受的输入只是不完整的(否则你永远不会等待数据完整)
  • 如果数据无效或
  • ,您将等到无穷大数据变为“有效”
  • 更糟糕的是:无限期地继续读取,可能会耗尽内存(除非您限制缓冲区大小;这可能会导致 DoS)
  • 也许最糟糕的是,如果您读取的数据多于单个 JSON 消息(您通常无法在流套接字的上下文中阻止),原始消息将因“过多输入”而被拒绝。糟糕

测试

以下是确认分析结论预测的测试用例:

Live On Compiler Explorer

#include <boost/asio.hpp>
#include <nlohmann/json.hpp>
#include <iostream>
#include <iomanip>

template <typename Buffer>
struct JsonReader {
    static_assert(boost::asio::is_dynamic_buffer_v1<Buffer>::value);
    Buffer _buf;
    nlohmann::json message;

    JsonReader() = default;
    JsonReader(Buffer buf) : _buf(buf) {}

    template <typename It>
    auto operator()(It dummy,l));
                message = json::parse(iss);
                return iss.tellg(); // detect consumed
            }();

            _buf.consume(n);
            assert(n);
            //std::advance(dummy,ok);
        }
    }
};

namespace boost::asio {
    template <typename T>
    struct is_match_condition<JsonReader<T>> : public boost::true_type { };
}

static inline void run_tests() {
    std::vector<std::string> valid {
        R"({})",R"({"a":4,"b":5})",R"([])",R"([4,"b"])",},incomplete {
        R"({)","b")",R"([)",")",invalid {
        R"(})",R"("a":4 })",R"(])",excess {
        R"({}{"a":4,R"([]["a",R"({} bogus trailing data)",};

    auto run_tests = [&](auto& cases) {
        for (std::string buf : cases) {
            std::cout << "Testing " << std::left << std::setw(22) << buf;
            bool ok = JsonReader { boost::asio::dynamic_buffer(buf) }
                (buf.begin(),buf.end())
                .second;

            std::cout << " -> " << std::boolalpha << ok << std::endl;

            if (ok && !buf.empty()) {
                std::cout << " -- remaining buffer " << std::quoted(buf) << "\n";
            }
        }
    };

    std::cout << " ----- valid test cases \n";
    run_tests(valid);
    std::cout << " ----- incomplete test cases \n";
    run_tests(incomplete);
    std::cout << " ----- invalid test cases \n";
    run_tests(invalid);
    std::cout << " ----- excess input test cases \n";
    run_tests(excess);
}

template <typename SyncReadStream,typename Buffer>
static void read(SyncReadStream& s,Buffer bufarg,nlohmann::json& message) {
    using boost::asio::buffers_begin;
    using boost::asio::buffers_end;

    JsonReader reader{bufarg};;
    read_until(s,bufarg,reader);
    message = reader.message;
}

int main() {
    run_tests();
}

印刷品

 ----- valid test cases
Testing {}                     -> true
Testing {"a":4,"b":5}         -> true
Testing []                     -> true
Testing [4,"b"]               -> true
 ----- incomplete test cases
Testing {                      -> false
Testing {"a":4,"b"            -> false
Testing [                      -> false
Testing [4,"                  -> false
 ----- invalid test cases
Testing }                      -> false
Testing "a":4 }                -> false
Testing ]                      -> false
 ----- excess input test cases
Testing {}{"a":4,"b":5}       -> false
Testing []["a","b"]           -> false
Testing {} bogus trailing data -> false

寻找替代品

你可以像我过去那样自己动手:

或者我们可以看看另一个库,它允许我们检测有效 JSON 片段的边界或检测并留下尾随输入。

手卷方法

以下是对 that linked answer 的更现代 Spirit X3 的简单翻译:

// Note: first iterator gets updated
// throws on known invalid input (like starting with `]' or '%')
template <typename It>
bool tryParseAsJson(It& f,It l)
{
    try {
        return detail::x3::parse(f,l,detail::json);
    } catch (detail::x3::expectation_failure<It> const& ef) {
        throw std::runtime_error("invalid JSON data");
    }
}

关键是这个*除了返回真/假将更新起始迭代器根据它消耗了多远输入。

namespace JsonDetect {
    namespace detail {
        namespace x3 = boost::spirit::x3;
        static const x3::rule<struct value_> value{"value"};

        static auto primitive_token
            = x3::lexeme[ x3::lit("false") | "null" | "true" ];

        static auto expect_value
            = x3::rule<struct expect_value_> { "expect_value" }
            // array,object,string,number or other primitive_token
            = x3::expect[&(x3::char_("[{\"0-9.+-") | primitive_token | x3::eoi)]
            >> value
            ;

        // 2.4.  Numbers
        // Note our spirit grammar takes a shortcut,as the RFC specification is more restrictive:
        //
        // However non of the above affect any structure characters (:,{}[] and double quotes) so it doesn't
        // matter for the current purpose. For full compliance,this remains TODO:
        //
        //    Numeric values that cannot be represented as sequences of digits
        //    (such as Infinity and NaN) are not permitted.
        //     number = [ minus ] int [ frac ] [ exp ]
        //     decimal-point = %x2E       ; .
        //     digit1-9 = %x31-39         ; 1-9
        //     e = %x65 / %x45            ; e E
        //     exp = e [ minus / plus ] 1*DIGIT
        //     frac = decimal-point 1*DIGIT
        //     int = zero / ( digit1-9 *DIGIT )
        //     minus = %x2D               ; -
        //     plus = %x2B                ; +
        //     zero = %x30                ; 0
        static auto number = x3::double_; // shortcut :)

        // 2.5 Strings
        static const x3::uint_parser<uint32_t,16,4,4> _4HEXDIG;

        static auto char_ = ~x3::char_("\"\\") |
               x3::char_(R"(\)") >> (       // \ (reverse solidus)
                   x3::char_(R"(")") |      // "    quotation mark  U+0022
                   x3::char_(R"(\)") |      // \    reverse solidus U+005C
                   x3::char_(R"(/)") |      // /    solidus         U+002F
                   x3::char_(R"(b)") |      // b    backspace       U+0008
                   x3::char_(R"(f)") |      // f    form feed       U+000C
                   x3::char_(R"(n)") |      // n    line feed       U+000A
                   x3::char_(R"(r)") |      // r    carriage return U+000D
                   x3::char_(R"(t)") |      // t    tab             U+0009
                   x3::char_(R"(u)") >> _4HEXDIG )  // uXXXX                U+XXXX
               ;

        static auto string = x3::lexeme [ '"' >> *char_ >> '"' ];

        // 2.2 objects
        static auto member
            = x3::expect [ &(x3::eoi | '"') ]
            >> string
            >> x3::expect [ x3::eoi | ':' ]
            >> expect_value;

        static auto object
            = '{' >> ('}' | (member % ',') >> '}');

        // 2.3 Arrays
        static auto array
            = '[' >> (']' | (expect_value % ',') >> ']');

        // 2.1 values
        static auto value_def = primitive_token | object | array | number | string;

        BOOST_SPIRIT_DEFINE(value)

        // entry point
        static auto json = x3::skip(x3::space)[expect_value];
    }  // namespace detail
}  // namespace JsonDetect

显然您将实现放在 TU 中,但在 Compiler Explorer 上我们不能:Live On Compiler Explorer,使用调整后的 JsonReader 打印:

SeheX3Detector
==============
 ----- valid test cases 
Testing {}                     -> true
Testing {"a":4,"b"]               -> true
 ----- incomplete test cases 
Testing {                      -> false
Testing {"a":4,"                  -> false
 ----- invalid test cases 
Testing }                      -> invalid JSON data
Testing "a":4 }                -> true -- remaining `:4 }`
Testing ]                      -> invalid JSON data
 ----- excess input test cases 
Testing {}{"a":4,"b":5}       -> true -- remaining `{"a":4,"b"]           -> true -- remaining `["a","b"]`
Testing {} bogus trailing data -> true -- remaining ` bogus trailing data`

NlohmannDetector
================
 ----- valid test cases 
Testing {}                     -> true
Testing {"a":4,"                  -> false
 ----- invalid test cases 
Testing }                      -> false
Testing "a":4 }                -> false
Testing ]                      -> false
 ----- excess input test cases 
Testing {}{"a":4,"b"]           -> false
Testing {} bogus trailing data -> false

注意我们现在是如何实现一些目标的。

  • 接受尾随数据 - 这样我们就不会在收到消息后破坏任何数据
  • 在某些不可能成为有效 JSON 的输入上过早失败
  • 但是,我们无法解决无限期等待/可能/不完整的有效数据的问题
  • 有趣的是,我们的一个“无效”测试用例是错误的 (!)。 (当测试用例失败时,这总是一个好兆头)。这是因为“a”本身实际上是一个有效的 JSON 值。

结论

在一般情况下,不可能在不限制缓冲区大小的情况下进行这样的“完整消息”检测。例如。一个有效的输入可以以一百万个空格开头。你不想等待。

此外,有效输入可以打开字符串、对象或数组²,并且不会在几 GB 内终止。如果您事先停止解析,您将永远不会知道它最终是否是一条有效消息。

尽管无论如何您都不可避免地要处理网络超时,但您更愿意主动了解会发生什么。例如。提前发送有效负载的大小,以便您可以使用 boost::asio::transfer_exactly 并准确验证您期望获得的内容。


¹ 实际上。如果您不关心性能,您可以在增加缓冲区长度时迭代运行 accept

² 上帝保佑,像 0000....00001 这样的数字虽然是 subject to parser implementation differences

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...