问题描述
我正在运行无法访问AWS上的x-packs
的Elasticsearch集群,但是我仍然想执行cumulative cardinality aggregation
以确定我的网站的新用户的每日计数
例如,如何转换:
GET /user_hits/_search
{
"size": 0,"aggs": {
"users_per_day": {
"date_histogram": {
"field": "timestamp","calendar_interval": "day"
},"aggs": {
"distinct_users": {
"cardinality": {
"field": "user_id"
}
},"total_new_users": {
"cumulative_cardinality": {
"buckets_path": "distinct_users"
}
}
}
}
}
}
要在没有cumulative_cardinality
的情况下产生相同的结果?
解决方法
正是由于这个原因才添加了累积基数-在...之前不易计算...
不过,与ElasticSearch中的几乎所有内容一样,有一个脚本可以完成它。这是我的看法。
- 设置索引
PUT user_hits
{
"mappings": {
"properties": {
"timestamp": {
"type": "date","format": "yyyy-MM-dd"
},"user_id": {
"type": "keyword"
}
}
}
}
- 在一天之内添加1个新用户,然后在第二天又添加2个用户,其中之一并不是严格意义上的“新用户”。
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-01"}
POST user_hits/_doc
{"user_id":1,"timestamp":"2020-10-02"}
POST user_hits/_doc
{"user_id":3,"timestamp":"2020-10-02"}
- 使用参数化的开始时间+天数模拟日期直方图,对用户进行分组,然后将天数结果与 进行比较。
GET /user_hits/_search
{
"size": 0,"query": {
"range": {
"timestamp": {
"gte": "2020-10-01"
}
}
},"aggs": {
"new_users_count_vs_prev_day": {
"scripted_metric": {
"init_script": """
state.by_day_map = [:];
state.start_millis = new SimpleDateFormat("yyyy-MM-dd").parse(params.start_date).getTime();
state.day_millis = 24 * 60 * 60 * 1000;
state.dt_formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneOffset.UTC);
""","map_script": """
for (def step = 1; step < params.num_of_days + 1; step++) {
def timestamp = doc.timestamp.value.millis;
def user_id = doc['user_id'].value;
def anchor = state.start_millis + (step * state.day_millis);
// add a `n__` prefix to more easily sort the resulting map later on
def anchor_pretty = step + '__' + state.dt_formatter.format(Instant.ofEpochMilli(anchor));
if (timestamp <= anchor) {
if (state.by_day_map.containsKey(anchor_pretty)) {
state.by_day_map[anchor_pretty].add(user_id);
} else {
state.by_day_map[anchor_pretty] = [user_id];
}
}
}
""","combine_script": """
List keys=new ArrayList(state.by_day_map.keySet());
Collections.sort(keys);
def unique_sorted_map = new TreeMap();
def unique_from_prev_day = [];
for (def key : keys) {
def unique_users_per_day = new HashSet(state.by_day_map.get(key));
unique_users_per_day.removeIf(user -> unique_from_prev_day.contains(user));
// remove the `n__` prefix
unique_sorted_map.put(key.substring(3),unique_users_per_day.size());
unique_from_prev_day.addAll(unique_users_per_day);
}
return unique_sorted_map
""","reduce_script": "return states","params": {
"start_date": "2020-10-01","num_of_days": 5
}
}
}
}
}
屈服
"aggregations" : {
"new_users_count_vs_prev_day" : {
"value" : [
{
"2020-10-01" : 1,<-- 1 new unique user
"2020-10-02" : 1,<-- another new unique user
"2020-10-03" : 0,"2020-10-04" : 0,"2020-10-05" : 0
}
]
}
}
该脚本一定很慢,但是有一个可能非常有用的优点-您可以对其进行调整以返回新用户ID的完整列表,而不仅仅是返回您要计算的数量从according to its implementation's author的累积基数中获取,该累积基数在设计上只能以顺序,累积的方式工作。