问题描述
我正在使用 Elasticsearch DSL,我正在尝试使用查询结果作为另一个查询的参数,如下所示:
{
"query": {
"bool": {
"must_not": {
"terms": {
"request_id": {
"query": {
"match": {
"processing.message": "OUT Followup Synthesis"
}
},"fields": [
"request_id"
],"_source": false
}
}
}
}
}
}
正如您在上面看到的,我正在尝试搜索其 request_id
不是 request_ids
中的 processing.message
的来源之一,其中 OUT Followup Synthesis
等于 Error loading data [x_content_parse_exception] [1:1660] [terms_lookup] unkNown field [query]
。
{{1}}
如何使用 Elasticsearch DSL 实现我的目标?
解决方法
从评论中提取的原始问题
我正在尝试获取带有 processing.message 等于“IN Followup Sythesis”的数据,而它们的 request_id 没有出现在 processing.message 等于“OUT Followup Sythesis”的数据中。在 SQL 语言中:
SELECT d FROM data d
WHERE d.processing.message = 'IN Followup Sythesis'
AND d.request_id NOT IN (SELECT request_id FROM data WHERE processing.message = 'OUT Followup Sythesis');
答案:一般来说,Elasticsearch 中既不是 application-side joins 也不是 subqueries are supported。
因此,您必须运行第一个查询,获取检索到的 ID 并将它们放入第二个查询中 — 理想情况下为 terms
query。
当然,这个限制可以通过“劫持”scripted metric aggregation来克服。
以这3个文件为例:
POST reqs/_doc
{"request_id":"abc","processing":{"message":"OUT Followup Synthesis"}}
POST reqs/_doc
{"request_id":"abc","processing":{"message":"IN Followup Sythesis"}}
POST reqs/_doc
{"request_id":"xyz","processing":{"message":"IN Followup Sythesis"}}
你可以跑
POST reqs/_search
{
"size": 0,"query": {
"match": {
"processing.message": "IN Followup Sythesis"
}
},"aggs": {
"subquery_mock": {
"scripted_metric": {
"params": {
"disallowed_msg": "OUT Followup Synthesis"
},"init_script": "state.by_request_ids = [:]; state.disallowed_request_ids = [];","map_script": """
def req_id = params._source.request_id;
def msg = params._source.processing.message;
if (msg.contains(params.disallowed_msg)) {
state.disallowed_request_ids.add(req_id);
// won't need this particular doc so continue looping
return;
}
if (state.by_request_ids.containsKey(req_id)) {
// there may be multiple docs under the same ID
// so concatenate them
state.by_request_ids[req_id].add(params._source);
} else {
// initialize an appendable arraylist
state.by_request_ids[req_id] = [params._source];
}
""","combine_script": """
state.by_request_ids.entrySet()
.removeIf(entry -> state.disallowed_request_ids.contains(entry.getKey()));
return state.by_request_ids
""","reduce_script": "return states"
}
}
}
}
它只会返回正确的请求:
"aggregations" : {
"subquery_mock" : {
"value" : [
{
"xyz" : [
{
"processing" : { "message" : "IN Followup Sythesis" },"request_id" : "xyz"
}
]
}
]
}
}
⚠️ 这几乎肯定会很慢,并且违背了 not accessing the _source
field 的建议指导。但这也表明子查询是可以“模拟”的。
? 我建议在让它针对整个索引之前在较小的文档集上测试这个脚本——也许通过日期 range
query 或类似的日期来限制它。
仅供参考 Elasticsearch 公开了一个 SQL API,尽管它仅通过付费产品 X-Pack 提供。