文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

go语言怎么实现Elasticsearches批量修改查询及发送MQ

2023-06-30 05:22

关注

这篇“go语言怎么实现Elasticsearches批量修改查询及发送MQ”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“go语言怎么实现Elasticsearches批量修改查询及发送MQ”文章吧。

update_by_query批量修改

POST post-v1_1-2021.02,post-v1_1-2021.03,post-v1_1-2021.04/_update_by_query{  "query": {    "bool": {      "must": [        {          "term": {            "join_field": {              "value": "post"            }          }        },        {          "term": {            "platform": {              "value": "toutiao"            }          }        },        {          "exists": {            "field": "liked_count"          }        }      ]    }  },  "script":{    "source":"ctx._source.liked_count=0",    "lang":"painless"  }}

索引添加字段

PUT user_tiktok/_doc/_mapping?include_type_name=true{  "post_signature":{    "StuClass":{      "type":"keyword"    },    "post_token":{      "type":"keyword"    }  }}PUT user_toutiao/_mapping{  "properties": {    "user_token": {      "type": "text"    }  }}

查询es发送MQ

from celery import Celeryfrom elasticsearch import Elasticsearchimport loggingimport arrowimport pytzfrom elasticsearch.helpers import scan, streaming_bulkimport redispool_16_8 = redis.ConnectionPool(host='10.0.3.100', port=6379, db=8, password='EfcHGSzKqg6cfzWq')rds_16_8 = redis.StrictRedis(connection_pool=pool_16_8)logger = logging.getLogger('elasticsearch')logger.disabled = Falselogger.setLevel(logging.INFO)es_zoo_connection = Elasticsearch('http://eswriter:e s密码@e sip:4000', dead_timeout=10,                                  retry_on_timeout=True)logger = logging.getLogger(__name__)class ES(object):    index = None    doc_type = None    id_field = '_id'    version = ''    source_id_field = ''    aliase_field = ''    separator = '-'    aliase_func = None    es = None    tz = pytz.timezone('Asia/Shanghai')    logger = logger    @classmethod    def mget(cls, ids=None, index=None, **kwargs):        index = index or cls.index        docs = cls.es.mget(body={'ids': ids}, doc_type=cls.doc_type, index=index, **kwargs)        return docs    @classmethod    def count(cls, query=None, index=None, **kwargs):        index = index or cls.index        c = cls.es.count(doc_type=cls.doc_type, body=query, index=index, **kwargs)        return c.get('count', 0)    @classmethod    def upsert(cls, doc, doc_id=None, index=None, doc_as_upsert=True, **kwargs):        body = {            "doc": doc,        }        if doc_as_upsert:            body['doc_as_upsert'] = True        id = doc_id or cls.id_name(doc)        index = index or cls.index_name(doc)        cls.es.update(index, id, cls.doc_type, body, **kwargs)    @classmethod    def search(cls, index=None, query=None, **kwargs):        index = index or cls.index        return cls.es.search(index=index, body=query, **kwargs)    @classmethod    def scan(cls, query, index=None, **kwargs):        return scan(cls.es,                    query=query,                    index=index or cls.index,                    **kwargs)    @classmethod    def index_name(cls, doc):        if cls.aliase_field and cls.aliase_field in doc.keys():            aliase_part = doc[cls.aliase_field]            if isinstance(aliase_part, str):                aliase_part = arrow.get(aliase_part)            if isinstance(aliase_part, int):                aliase_part = arrow.get(aliase_part).astimezone(cls.tz)            if cls.version:                index = '{}{}{}{}{}'.format(cls.index, cls.separator, cls.version, cls.separator,                                            cls.aliase_func(aliase_part))            else:                index = '{}{}{}'.format(cls.index, cls.separator, cls.aliase_func(aliase_part))        else:            index = cls.index        return index    @classmethod    def id_name(cls, doc):        id = doc.get(cls.id_field) and doc.pop(cls.id_field) or doc.get(cls.source_id_field)        if not id:            print('========', doc)        assert id, 'doc _id must not be None'        return id    @classmethod    def bulk_upsert(cls, docs, **kwargs):        """        批量操作文章, 仅支持 index 和 update        """        op_type = kwargs.get('op_type') or 'update'        chunk_size = kwargs.get('chunk_size')        if op_type == 'update':            upsert = kwargs.get('upsert', True)            if upsert is None:                upsert = True        else:            upsert = False        actions = cls._gen_bulk_actions(docs, cls.index_name, cls.doc_type, cls.id_name, op_type, upsert=upsert)        result = streaming_bulk(cls.es, actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False,                                max_retries=5, request_timeout=25)        return result    @classmethod    def _gen_bulk_actions(cls, docs, index_name, doc_type, id_name, op_type, upsert=True, **kwargs):        assert not upsert or (upsert and op_type == 'update'), 'upsert should use "update" as op_type'        for doc in docs:            # 支持 index_name 作为一个工厂函数            if callable(index_name):                index = index_name(doc)            else:                index = index_name            if op_type == 'index':                _source = doc            elif op_type == 'update' and not upsert:                _source = {'doc': doc}            elif op_type == 'update' and upsert:                _source = {'doc': doc, 'doc_as_upsert': True}            else:                continue            if callable(id_name):                id = id_name(doc)            else:                id = id_name            # 生成 Bulk 动作            action = {                "_op_type": op_type,                "_index": index,                "_type": doc_type,                "_id": id,                "_source": _source            }            yield actionclass tiktokEsUser(ES):    index = 'user_tiktok'    doc_type = '_doc'    id_field = '_id'    source_id_field = 'user_id'    es = es_zoo_connectionfrom kombu import Exchange, Queue, bindingdef data_es_route_task_spider(name, args, kwargs, options, task=None, **kw):    return {        'exchange': 'tiktok',        'exchange_type': 'topic',        'routing_key': name    }class DataEsConfig_download(object):    broker_url = 'amqp://用户:密码@ip:端口/'    task_ignore_result = True    task_serializer = 'json'    accept_content = ['json']    task_default_queue = 'default'    task_default_exchange = 'default'    task_default_routing_key = 'default'    exchange = Exchange('tiktok', type='topic')    task_queues = [        Queue(            'tiktok.user_avatar.download',            [binding(exchange, routing_key='tiktok.user_avatar.download')],            queue_arguments={'x-queue-mode': 'lazy'}        ),        Queue(            'tiktok.post_avatar.download',            [binding(exchange, routing_key='tiktok.post_avatar.download')],            queue_arguments={'x-queue-mode': 'lazy'}        ),        Queue(            'tiktok.post.spider',            [binding(exchange, routing_key='tiktok.post.spider')],            queue_arguments={'x-queue-mode': 'lazy'}        ),        Queue(            'tiktok.post.save',            [binding(exchange, routing_key='tiktok.post.save')],            queue_arguments={'x-queue-mode': 'lazy'}        ),        Queue(            'tiktok.user.save',            [binding(exchange, routing_key='tiktok.user.save')],            queue_arguments={'x-queue-mode': 'lazy'}        ),        Queue(            'tiktok.post_avatar.invalid',            [binding(exchange, routing_key='tiktok.post_avatar.invalid')],            queue_arguments={'x-queue-mode': 'lazy'}        ),        Queue(            'tiktok.user_avatar.invalid',            [binding(exchange, routing_key='tiktok.user_avatar.invalid')],            queue_arguments={'x-queue-mode': 'lazy'}        ),        Queue(            'tiktok.comment.save',            [binding(exchange, routing_key='tiktok.comment.save')],            queue_arguments={'x-queue-mode': 'lazy'}        ),    ]    task_routes = (data_es_route_task_spider,)    enable_utc = True    timezone = "Asia/Shanghai"# 下载apptiktok_app = Celery(    'tiktok',    include=[        'task.tasks',    ])tiktok_app.config_from_object(DataEsConfig_download)# 发任务生产者,更新舆情user历史信息def send_post():    query = {        "query": {            "bool": {                "must": [                    {                        "exists": {                            "field": "post_signature"                        }                    },                    {                        "range": {                            "following_num": {                                "gte": 1000                            }                        }                    }                ]            }        },        "_source": ["region", "sec_uid", "post_signature"]    }    # query = {    #     "query": {    #         "bool": {    #             "must": [    #                 {"exists": {    #                     "field": "post_signature"    #                 }},    #                 {    #                     "match": {    #                         "region": "MY"    #                     }    #                 }    #             ]    #         }    #     },    #     "_source": ["region", "sec_uid", "post_signature"]    # }    r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100)    for item in map(lambda x: x['_source'], r):        tiktok_app.send_task('tiktok.post.spider', args=(item,))def send_sign_token():    query = {        "query": {            "bool": {                "must": [                    {                        "exists": {                            "field": "post_signature"                        }                    },                    {                        "range": {                            "following_num": {                                "gte": 1000                            }                        }                    },                    {                        "range": {                            "create_time": {                                "gte": "2021-01-06T00:00:00",                                "lte": "2021-01-06T01:00:00"                            }                        }                    }                ]            }        },        "_source": ["user_id", "sec_uid"]    }    r = tiktokEsUser.scan(query=query, scroll='30m', request_timeout=100)    for item in map(lambda x: x['_source'], r):        tiktok_app.send_task('tiktok.user.sign_token', args=(item,))if __name__ == '__main__':    send_post()    # send_sign_token()

以上就是关于“go语言怎么实现Elasticsearches批量修改查询及发送MQ”这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注编程网行业资讯频道。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯