博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
elasticsearch python接口
阅读量:5234 次
发布时间:2019-06-14

本文共 6212 字,大约阅读时间需要 20 分钟。

elastic'search提供了许多接口用于访问,在这里集成了很多常用的额python访问的方法,如有不正确的地方请指正。

import jsonimport osimport sysfrom elasticsearch import Elasticsearchfrom elasticsearch.exceptions import ConnectionError, NotFoundErrorfrom elasticsearch.helpers import bulkclass ErrorHunter(object):    @staticmethod    def capture_connection_error(func):        def _decorator(*args, **kwargs):            try:                return func(*args, **kwargs)            except ConnectionError, e:                sys.stderr.write("%s\n" % e)                sys.exit(-1)        return _decorator    @staticmethod    def capture_notfind_error(func):        def _decorator(*args, **kwargs):            try:                return func(*args, **kwargs)            except NotFoundError, e:                sys.stderr.write("%s\n" % e)                sys.exit(-1)        return _decoratorclass ElasticSearchImporter(object):    batch_search_size = 1000    def __init__(self, host, port=9200, username=None, password=None):        if not username or not password:            self.es = Elasticsearch(hosts=[host], port=port)        else:            self.es = Elasticsearch(hosts=[host], port=port, http_auth=(username, password))    @ErrorHunter.capture_connection_error    def index_is_exist(self, index_name):        res = self.es.indices.exists(index=index_name)        sys.stdout.write("index[%s] exist state is \n" % index_name, res)        return res    @ErrorHunter.capture_connection_error    @ErrorHunter.capture_notfind_error    def search_size(self, index_name, doc_type):        res = self.es.search(index=index_name, doc_type=doc_type, body={
"query": {
"match_all": {}}}) sys.stdout.write("search response is \n", res) return res["hits"]["total"] @ErrorHunter.capture_connection_error def _create_index(self, index_name): res = self.es.indices.create(index=index_name, ignore=400) sys.stdout.write("response of create index[%s]:" % index_name, res) def insert_by_batch(self, index_name, doc_type, data_list): batch_size = len(data_list) / self.batch_search_size + 1 for i in range(batch_size): begin_index = self.batch_search_size * i end_index = begin_index + self.batch_search_size res = self.insert_data_list(index_name=index_name, doc_type=doc_type, data_list=data_list[begin_index:end_index]) @ErrorHunter.capture_connection_error def insert_data_list(self, index_name, doc_type, data_list): if not self.index_is_exist(index_name): self._create_index(index_name) actions = [ { "_op_type": "index", "_index": index_name, "_type": doc_type, "_source": d } for d in data_list ] res = bulk(self.es, actions) sys.stdout.write("response of insert is : ", res) @ErrorHunter.capture_connection_error @ErrorHunter.capture_notfind_error def search_data(self, index_name, doc_type): if not self.index_is_exist(index_name): raise StopIteration total_size = self.search_size(index_name=index_name, doc_type=doc_type) # search data by page total_page = total_size / self.batch_search_size + 1 for page_num in range(total_page): batch_result_data = self.es.search(index=index_name, doc_type=doc_type, from_=page_num, size=self.batch_search_size) id_data_list = [(result[u"_id"], result[u"_source"]) for result in batch_result_data[u"hits"][u"hits"]] for id_data in id_data_list: yield id_data @ErrorHunter.capture_connection_error @ErrorHunter.capture_notfind_error def search_by_body(self, index_name, doc_type, **kwargs): body = { "query": { "match": kwargs }, "size": 1 } res = self.es.search(index_name, doc_type, body) sys.stdout.write("%s\n" % res) @ErrorHunter.capture_connection_error @ErrorHunter.capture_notfind_error def update_data_list(self, index_name, doc_type, id_data_dict): if not id_data_dict: return actions = [ { # "_op_type": "update", "_index": index_name, "_type": doc_type, "_source": d, "_id": id_index } for id_index, d in id_data_dict.iteritems() ] res = bulk(self.es, actions) sys.stdout.write("%s\n" % res) @ErrorHunter.capture_connection_error @ErrorHunter.capture_notfind_error def delete_index_doc(self, index_name): res = self.es.indices.delete(index=index_name) sys.stdout.write("%s\n" % res) return res def json_import(self, index_name, doc_type, json_file): if not os.path.exists(json_file): sys.stderr.write("This json file[%s] is not exist.\n" % json_file) sys.exit(-1) with open(json_file, "r") as f: data_list = json.load(f) self.insert_data_list(index_name=index_name, doc_type=doc_type, data_list=data_list) sys.stdout.write("Success import json file to ES.\n")def test_search(): builds_index_name = "builds_data" builds_doc_type = "data_stat_doc" es_import = ElasticSearchImporter(host="localhost") es_import.index_is_exist(builds_index_name) es_import.search_by_body(index_name=builds_index_name, doc_type=builds_doc_type, data_path="/var/www/html/ec_latest_builds/DENALI_NGXTRAFFIC/CN_AXF_16Q2/DENALI_NGXTRAFFIC-CN_AXF_16Q2-TnMapDataAccess-linux-centos6x64-2.9.491956-trunk-20170510093257-RC")def test_import(): index_name = "sales" doc_type = "sales_doc" json_file_path = os.path.join(os.path.dirname(__file__), "sales.json") es_import = ElasticSearchImporter(host="localhost") es_import.json_import(index_name, doc_type, json_file_path)if __name__ == '__main__': # import_builds_data("pbf_statistic_autonavi.json") # test_search() test_import()

 

Tips:

在这里查询数据部分采用分页批量获取数据的方法,并且采用yield方式返回。如果需要终止yield,只需抛出StopIteration异常即可。

转载于:https://www.cnblogs.com/dasheng-maritime/p/8521722.html

你可能感兴趣的文章
vue:axios二次封装,接口统一存放
查看>>
vue中router与route的区别
查看>>
js 时间对象方法
查看>>
网络请求返回HTTP状态码(404,400,500)
查看>>
Spring的JdbcTemplate、NamedParameterJdbcTemplate、SimpleJdbcTemplate
查看>>
Mac下使用crontab来实现定时任务
查看>>
303. Range Sum Query - Immutable
查看>>
图片加载失败显示默认图片占位符
查看>>
【★】浅谈计算机与随机数
查看>>
《代码阅读方法与实现》阅读笔记一
查看>>
解决 sublime text3 运行python文件无法input的问题
查看>>
javascript面相对象编程,封装与继承
查看>>
Atlas命名空间Sys.Data下控件介绍——DataColumn,DataRow和DataTable
查看>>
Java中正则表达式的使用
查看>>
算法之搜索篇
查看>>
新的开始
查看>>
java Facade模式
查看>>
NYOJ 120校园网络(有向图的强连通分量)(Kosaraju算法)
查看>>
SpringAop与AspectJ
查看>>
Leetcode 226: Invert Binary Tree
查看>>