Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)

Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)

关于Elasticsearch
安装方式
文档
中文文档
Elasticsearch
Elasticsearch DSL操作文档


本文通过python模块对elasticsearch进行简单的增删改查。
本文测试脚本仓库地址:https://github.com/YangJunJ/studyElasticsearch.git

环境

  • Python 3.6.2
  • elasticsearch 7.13.1
  • elasticsearch-dsl 7.3.0
  • elasticsearch 7.13.1

安装下面两个python模块

sudo pip3 install elasticsearch
sudo pip3 install elasticsearch_dsl



建立连接

创建文件elasticsearch_obj.py

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, UpdateByQuery


class ElasticSearchObj(object):
    def __init__(self):
        self.user_key = "user_index"
        self.friend_key = "friend_index"
        self.test_count_key = "count_test_key"
        # 初始化一个Elasticsearch实例对象
        self.client = Elasticsearch(hosts=[{'host': "127.0.0.1", "port": 9200}])

    def search(self, index):
        # 返回索引对象结果级
        return Search(using=self.client, index=index)

    def update(self, index):
        # ubq = UpdateByQuery(index=index).using(self.client)
        # or
        ubq = UpdateByQuery(index=index, using=self.client)
        return ubq




创建或者更新文档

Elasticsearch 使用 JSON作为文档的序列化格式,用起来有点项nosql,文档的字段数量也可以像mongodb文档一样,不一致不相同的形式存储
index插入
create插入
创建文件study_insert.py

import random
from elasticsearch_obj import ElasticSearchObj


class CreateNewRecord(ElasticSearchObj):
    def index(self, index, doc_id=None, doc_type="_doc", **kwargs):
        """
        如果索引不存在,则会创建索引并写入文档
        self.client.index(index, body, doc_type=None, id=None, params=None, headers=None)
        http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
        :param index: 文档索引名
        :param doc_id: 当doc_id=None时,通过该方法会向文档中插入一条记录,doc_id!=None时,如果doc_id已经存在则会更新,不存在则插入
        :param doc_type: 文档类型,认为_doc,相同index下的记录值必须一致,如果不相等,则会触发异常
        :param kwargs: 文档内容
        :return:
        """
        return self.client.index(index=index, body=kwargs, doc_type=doc_type, id=doc_id)

    def create(self, index, doc_id=None, doc_type="_doc", **kwargs):
        """
        http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
        用法同index,不同的是create必须传递文档Id,否则会触发异常错误
        :param index:
        :param doc_id:
        :param doc_type:
        :param kwargs:
        :return:
        """
        return self.client.create(index=index, id=doc_id, doc_type=doc_type, body=kwargs)

    def test_insert(self):
        """
        插入一些记录,方便等会查询时使用
        :return:
        """
        hobby = ["唱", "跳", "rap", "唱跳rap", "游泳", "打球", "其他"]
        for i in range(100):
            response = self.index(
                index=self.user_key,
                id=i,
                name="".join(random.sample('zyxwvutsrqponmlkjihgfedcba', random.randint(3, 8))),
                age=random.randint(18, 30),
                hobby=random.choice(hobby),
                height=random.randint(155, 185),
                sex=random.choice(["男", "女", "不详"])
            )
            print(f"{self.user_key}插入{i}条数据, 返回:{response}")

        for i in range(100):
            response = self.index(
                index=self.friend_key,
                user_id=i,
                friend_id=random.randint(0, 99),
                word=random.sample(["拧螺丝", "撸代码", "CRUD", "划水", "偷偷写下bug"], random.randint(1, 3)),
                other={"work_place": random.choice(["第一排", "第二排"]), "performance": random.choice(["好", "不好"])}
            )
            print(f"{self.friend_key}插入{i}条数据, 返回:{response}")

        for i in range(20000):
            response = self.index(
                index=self.test_count_key,
                user_id=i,
                friend_id=random.randint(0, 99),
                weight=random.randint(0, 10000)
            )
            print(f"{self.test_count_key}插入{i}条数据, 返回:{response}")


if __name__ == "__main__":
    test = CreateNewRecord()
    test.test_insert()



查询

页查询子句
  • 模糊查询:match, 返回与提供的文本、数字、日期或布尔值匹配的文档,包括用于模糊匹配
  • 精确查询:term, 该术语必须与字段值完全匹配,包括空格和大写
  • 范围查询:range, 返回包含提供范围内的术语的文档
    • gt: 大于(可选)
    • gte: 大于或等于(可选)
    • lt: 小于(可选)
    • lte:小于或等于(可选)
    • format:用于转换date查询中值的日期格式(可选,字符串)

查询上下文query & 过滤上下文filter 差别

用法可以查看下面脚本
创建文件study_search.py

from elasticsearch_obj import ElasticSearchObj


class Search(ElasticSearchObj):
    """
        示例 elasticsearch_dsl.Search 基本操作

        查询的字段值中如果包含中文、下划线等其他特殊字符, 字段需要需要加上 __keyword,否则无法匹配, 用法详见示例


        介绍两个查询中需要使用到的方法:
            elasticsearch_dsl.Search.scan(): 将搜索条件扫描索引,返回所有与条件匹配文档生成器
            elasticsearch_dsl.Search.to_dict(): 将hit对象系列化成python字典记录
    """
    def get_all_record(self):
        """
        示例获取self.key下的所有文档
        :return:
        """
        # all_record 是 elasticsearch_dsl.search.Search对象,可以通过scan() 方法遍历self.key索引内的所有文档
        all_record = self.search(self.user_key)
        for record in all_record.scan():
            # record 是 elasticsearch_dsl.response.hit.Hit 对象
            # 可以通过.属性获取字段值, 比如: record.name
            # 如果字段名不存在,则会触发'Hit' object has no attribute 错误
            print(f"record.id:{record.id}, record.name:{record.name}")

            # 上面的是其中一种方法,它也可以通过to_dict()方法,将记录通过python字典的形式返回
            print(record.to_dict())

    def print_all_record(self, explain, all_record, max_count=3):
        i = 1
        for record in all_record.scan():
            print(explain, record.to_dict())
            if i >= max_count:
                print(explain, f"查询数量大于{i},后面的记录不打印了")
                break
            i += 1
        self.cross_line()

    @staticmethod
    def cross_line():
        print('=================================================')

    def get_record_by_query(self):
        """
        示例query的常用用法
        :return:
        """
        all_record = self.search(self.user_key)

        # 例如要找到爱好中包含rap的用户信息
        match_rap = all_record.query("match", hobby="rap")
        self.print_all_record("查看所有包含爱好rap的用户:", match_rap)

        # 查找爱好中含有rap,并且性别为女的用户信息
        match_rap_female = all_record.query("match", hobby="rap").query("term", sex__keyword="女")
        # match_rap_female = match_rap.query("term", sex__keyword="女")
        self.print_all_record("查看爱好含有rap的所有女用户:", match_rap_female)

        # 查找身高大于175的男用户,并且爱好只有唱的用户
        man_sing_gt_175 = all_record.query("range", height={"gt": 175}).query(
            "term", sex__keyword="男").query(
            "term", hobby__keyword="唱")
        self.print_all_record("查找身高大于175的男用户,并且爱好只有唱的用户", man_sing_gt_175)

    def get_record_by_filter(self):
        """
        示例filter的用法
        filter 与 query用法一样,这里只举一个简单例子
        :return:
        """
        all_record = self.search(self.user_key)

        # 查找年龄大于等于25的男用户,并且爱好是唱跳rap
        all_record = all_record.filter("range", age={"gte": 25}).filter(
            "term", sex__keyword="男").filter(
            "term", hobby__keyword="唱跳rap")
        self.print_all_record("查找年龄大于等于25的男用户,并且爱好是唱跳rap:", all_record)

    def get_record_by_query_and_filter(self):
        """
        query、filter可以混用
        举一个例子
        :return:
        """
        all_record = self.search(self.user_key)

        # 查找年龄在20-25的男性用户
        all_record = all_record.query("range", age={"gte": 20, "lte": 25}).filter("term", sex__keyword="男")
        self.print_all_record("查找年龄在20-25的男性用户:", all_record)

    def get_record_by_exclude(self):
        """
        非查询
        :return:
        """
        all_record = self.search(self.user_key)

        # 查找性别不是男生,并且爱好不是唱的用户
        records = all_record.exclude("term", sex__keyword="男").exclude("term", hobby__keyword="唱")
        self.print_all_record("查找性别不是男生,并且爱好不是唱的用户:", records, 6)

        # 查找男性用户中,爱好不是唱且身高大于175的用户
        records = all_record.filter("term", sex__keyword="男").exclude(
            "term", hobby__keyword="唱").query(
            "range", height={"gt": 175})
        self.print_all_record("查找男性用户中,爱好不是唱且身高大于175的用户:", records, 6)

    def other_search(self):
        """
        其他查询获取文档数量:.count()
            查询结果排序:sort(field)
        :return:
        """
        all_record = self.search(self.user_key)

        # 获取id>=90的文档数量
        gte_90 = all_record.filter("range", id={"gte": 90})
        print(f"id>=90的文档 存在,数量为:{gte_90.count()}")  # id>=90的文档 存在,数量为:10

        # 获取id>=300的文档数量
        gte_300 = all_record.filter("range", id={"gte": 300})
        print(f"id>=300的文档 存在,数量为:{gte_300.count()}")  # id>=300的文档 存在,数量为:0

        #  排序 获取排序结果后,不能使用san(), 比如:self.search(self.key).sort("-height").scan()
        #  使用scan()不会以任何预先确定的顺序返回结果, 详见:
            # https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
        # 不使用san() 获取记录会有数量10000条的限制,如果查询文档数量大于10000,不使用san()方法便会出现异常
        # 下面查询会触发异常:
        #     all_record = self.search(self.test_count_key)
        #     for record in all_record[0: 15000]:
        #         print(record.to_dict())
        #  因为test_count_key下数量已经达到20000条,当取15000条数据时会触发:elasticsearch.exceptions.RequestError异常
        #  查询id>90, 且按玩家身高排序从大到小排序
        gte_90_sort = all_record.sort("-height")
        # self.print_all_record("查询id>90, 且按玩家身高排序从大到小排序:", gte_90_sort, 10)
        for record in gte_90_sort:
            print(record.to_dict())
        self.cross_line()
        #  查询id>90, 且按玩家身高排序从小到大排序, 如果身高相同,则按id降序排序
        gte_90_sort = all_record.filter("range", id={"gte": 90}).sort("height", "-id")
        for record in gte_90_sort:
            print(record.to_dict())
        self.cross_line()

        # 分页
        # 查询用户用户数据,跳过前五十条后20条
        skip_50_search_20 = all_record[50: 70]
        i = 1
        for record in skip_50_search_20:
            print(f"第{i}条数据, 用户id:{record.to_dict()['id']}")
            i += 1


if __name__ == "__main__":
    search = Search()
    # search.get_all_record()
    # search.get_record_by_query()
    # search.get_record_by_filter()
    # search.get_record_by_query_and_filter()
    # search.get_record_by_exclude()
    search.other_search()



更新

按查询更新文档
编写执行脚本
刷新索引

创建文件study_search.py
详见示例:update_example

import time

from elasticsearch_obj import ElasticSearchObj


class Update(ElasticSearchObj):
    def refresh(self, index):
        self.client.indices.refresh(index=index)

    @staticmethod
    def cross_line():
        print('=================================================')

    def print_all_record(self, explain, all_record):
        for record in all_record.scan():
            print(explain, record.to_dict())
        self.cross_line()

    def update_example(self):
        # 将user_key id为90记录,name改为”我是来测试的“, height增加50
        # 查找user_key id为90更新前的记录
        update_before = self.search(self.user_key).filter("term", id=90)
        self.print_all_record("查找user_key id为90更新前的记录:", update_before)

        update_obj = self.update(self.user_key).filter(
            "term", id=90).script(
            source="ctx._source.name=params.name;ctx._source.height+=params.height;",
            params={
                "name": "我是来测试的",
                "height": 50
            })

        update_response = update_obj.execute()  # 请求执行并返回执行结果
        print("user_key id为90记录,name改为”我是来测试的“, height增加50 更新返回结果:", update_response)

        update_after = self.search(self.user_key).filter("term", id=90)
        self.print_all_record("查找user_key id为90更新后的记录1:", update_after)

        time.sleep(1)
        update_after = self.search(self.user_key).filter("term", id=90)
        self.print_all_record("查找user_key id为90更新后的记录2:", update_after)

        # 执行结果就不沾出来了,但有执行的可以看出
        # 更新后记录1:是更新前的结果; 而更新后记录2:是更新后的结果
        # 造成这一现象是因为上一个请求更新在内部并未完全完成,所以在sleep 一秒后便能获取更新后的记录
        print(end="\n\n\n")
        # 如果需要在更新后立即获取最新文档,又不知道多久能够完成更新, 可以在执行更新后刷新文档
        # 看下面示例
        # 将user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百
        print("user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百")
        update_before = self.search(self.user_key).filter("term", id=70)
        self.print_all_record("查找user_key id为80更新前的记录:", update_before)
        update_obj = self.update(self.user_key).filter(
            "term", id=70).script(
            source="ctx._source.name=params.name;ctx._source.height+=params.height;",
            params={
                "name": "试试刷新index能否更新后获取最新数据",
                "height": 100
            })

        update_response = update_obj.execute()  # 请求执行并返回执行结果
        print("user_key id为70记录,name改为”试试刷新index能否更新后获取最新数据“, height增加100 更新返回结果:", update_response)
        self.refresh(self.user_key)  # 刷新
        update_after = self.search(self.user_key).filter("term", id=70)
        self.print_all_record("查找user_key id为70更新后的记录1:", update_after)

        # 认情况下,Elasticsearch 每秒都会定期刷新索引, 如果并不需要获取更新后的文档,尽量就不要手动刷新了
        # 可以通过更新响应的total跟updated数量是否一致判断记录是否更新成功
        # 查询更新会更新所有匹配的文档,查询条件跟上面介绍的查询用法一致
        # 例如:将user_key所有age增加一岁
        response = self.update(self.user_key).script(source="ctx._source.age+=1;").execute()
        print("将user_key所有age增加一岁:", response["total"], response["updated"], "response=", response)

        # 此处增加刷新,是因为上一个执行是更新整个user_key,如果还未自动刷新,执行下面示例,或造成并发异常,
        # 导致elasticsearch.exceptions.ConflictError异常
        self.refresh(self.user_key)
        print(end="\n\n\n")
        # 特别注意的是,如果script定义的字段,查询的文档存在则会更新,不存在则会在文档中插入字段
        print("将user_key下id为1的文档增加一个字段test_field,值为:[1,2,3]")
        update_before = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("查找user_key id为1更新前的记录:", update_before)
        update_obj = self.update(self.user_key).filter(
            "term", id=1).script(
            source='ctx._source.test_field=params.test_field',
            params={
                "test_field": [1, 2, 3],
            })
        response = update_obj.execute()
        print("将user_key下id为1的文档增加一个字段test_field:", response)
        self.refresh(self.user_key)  # 刷新
        update_after = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("查找user_key 将user_key下id为1的文档增加一个字段test_field,更新后的记录:", update_after)

        # 删除字段
        print("将user_key下id为1的文档增加的test_field字段移除")
        update_before = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("查找user_key id为1更新前的记录:", update_before)
        response = self.update(self.user_key).filter(
            "term", id=1).script(
            source='ctx._source.remove("test_field")').execute()
        print("将user_key下id为1的文档增加的test_field字段移除:", response)
        self.refresh(self.user_key)  # 刷新
        update_after = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("将user_key下id为1的文档增加的test_field字段移除,更新后的记录:", update_after)


if __name__ == "__main__":
    update = Update()
    update.update_example()




删除

删除索引
根据查询条件删除文档
创建文件study_delete.py

from elasticsearch_obj import ElasticSearchObj


class StudyDelete(ElasticSearchObj):
    def delete_index(self, index):
        """
        https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-delete-index.html
        删除整个索引
        :param index:
        :return:
        """
        try:
            self.client.indices.delete(index=index)
        except Exception:
            print(f"需要删除的索引:{index}不存在")

    def delete_by_query(self):
        """
        示例根据查询条件删除文档
        :return:
        """

        # 将user_key下id大于90的玩家删除
        all_record = self.search(self.user_key)
        print("查看删除前文档的数量:", all_record.count())
        all_record.filter("range", id={"gt": 90}).delete()
        self.client.indices.refresh(index=self.user_key)
        all_record = self.search(self.user_key)
        print(f"查看删除后文档的数量:{all_record.count()}")


if __name__ == "__main__":
    study_delete = StudyDelete()
    study_delete.delete_index(study_delete.friend_key)  # 删除索引friend_key
    study_delete.delete_by_query()


相关文章

功能概要:(目前已实现功能)公共展示部分:1.网站首页展示...
大体上把Python中的数据类型分为如下几类: Number(数字) ...
开发之前第一步,就是构造整个的项目结构。这就好比作一幅画...
源码编译方式安装Apache首先下载Apache源码压缩包,地址为ht...
前面说完了此项目的创建及数据模型设计的过程。如果未看过,...
python中常用的写爬虫的库有urllib2、requests,对于大多数比...