Elasticsearch 找出用户是停止还是移动 - 可能吗?

问题描述

我想使用关于映射的 elasticsearch 配置来显示用户位置和他/她在我的 Web 应用程序中到管理员的方向。所以我在 elasticsearch 中创建了一个索引,如:

{
  "settings": {
    "index": {
      "number_of_shards": 5,"number_of_replicas": 1
    },"analysis": {
      "analyzer": {
        "analyzer-name": {
          "type": "custom","tokenizer": "keyword","filter": "lowercase"
        }
      }
    }
  },"mappings": {
        "properties": {
          "driver_id": { "type": "integer" },"email": { "type": "text" },"location": { "type": "geo_point" },"app-platform": { "type": "text" },"app-version": { "type": "text" },"created_at": { "type": "date","format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"}
        }
    }
}

并开始使用此 curl 将用户位置插入到 elasticsearch

{
    "driver_id": 357,"driver_email": "[email protected]","location": {
        "lat": 37.3,"lon": 59.52
    },"created_at": "2021-06-04 00:09:00"
}

这个结构从用户移动到我的elasticsearch,之后我编写了这些服务来为我设计的网络端部分获取数据:

module Api
  module V1
    module Drivers
      module Elastic
        class LiveLocation
          include Peafowl

          attribute :driver_id,::Integer

          def call
            @driver = ::Driver.find(driver_id) if driver_id.present?
            result = []
            options = {
                headers: {
                    'Content-Type' => 'application/json'
                },body: @driver.present? ? options_with_driver : options
            }
            begin
              response = HTTParty.get(elasticseach_url.to_s,options)
              records = JSON.parse(response.body)['hits']['hits']
              if records.present?
                records.group_by { |r| r['_source']['driver_id'] }.to_a.each do |record|
                  driver = ::Driver.where(id: record[0]).first
                  if driver.present?
                    location = record[1][0]['_source']['location']
                    app_platform = record[1][0]['_source']['app-platform']
                    app_version = record[1][0]['_source']['app-version']
                    result.push(driver_id: driver.id,driver_email: driver.profile.email,location: location,app_platform: app_platform,app_version: app_version)
                  end
                end
              end
            rescue StandardError => error
              Rails.logger.info "Error => #{error}"
              result = []
            end
            context[:response] = result
          end

          def elasticseach_url
            "#{ENV.fetch('ELASTICSEARCH_BASE_URL','http://127.0.0.1:9200')}/#{ENV.fetch('ELASTICSEARCH_DRIVER_POSITION_INDEX','live_location')}/_search"
          end

          def options
            {
                query: {
                    bool: {
                        filter: [
                            {
                                range: {
                                    created_at: {
                                        gte: (Time.Now.beginning_of_day.strftime '%Y-%m-%d %H:%M:%s')
                                    }
                                }
                            }
                        ]
                    }
                },sort: [
                    {
                        created_at: {
                            order: 'desc'
                        }
                    }
                ]
            }.to_json
          end

          def optinos_with_driver
            {
                query: {
                    bool: {
                        must: [
                            {
                                term: {
                                    driver_id: {
                                        value: @driver.id
                                    }
                                }
                            }
                        ],filter: [
                            {
                                range: {
                                    created_at: {
                                        gte: (Time.Now.beginning_of_day.strftime '%Y-%m-%d %H:%M:%s')
                                    }
                                }
                            }
                        ]
                    }
                },sort: [
                    {
                        created_at: {
                            order: 'desc'
                        }
                    }
                ]
            }.to_json
          end
        end
      end
    end
  end
end

这种结构完美运行,但即使用户在 elasticsearch 保存他的位置时停止,但我需要过滤用户数据,如果用户在原地停留一小时,elasticsearch 理解而不保存数据。是否有可能? 我使用 elticsearch 7.1 和红宝石 2.5

我知道在 kibana 中是可能的,但目前我无法使用 kibana。

解决方法

我不确定这是否可以通过单个 ES 查询来完成...

但是您可以使用 2 个查询:

  • 用于检查用户在过去一小时内的位置是否相同
  • 第二个相同然后不插入
  • 但我不建议这样做

你可以做什么:

  • 使用 REDIS 或任何内存缓存来维护用户上次的地理位置持续时间
  • 在此基础上,更新或跳过更新到 Elastic Search

PS:我不熟悉 ES 地理定位 API