文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Easticsearch 数据迁移至in

2023-01-31 03:25

关注

Easticsearch 数据迁移至influxdb python


需求:将Easticsearch部分数据迁移至influxdb中。


见过从mysql,influxdb迁移至Easticsearch中的,没见过从Easticsearch迁移至influxdb中,迁移的数据是一些实时性的流量数据,influxdb时序性数据库对这类数据的支撑比较客观。


解决方案:大批量从Easticsearch取数据,两种方案。1.from...size    2.scroll (类似于数据库的游标)  脚本采用第二种scroll方案对Easticsearch 查询取数据。循环通过scrool_id进行查询并写入influxdb中。


#!/usr/bin/env python
#coding=utf-8

import sys
import json
import datetime
import elasticsearch
from influxdb import InfluxDBClient

#连接Easticsearch
class ES(object):
    @classmethod
    def connect_host(cls):
        url = "http://192.168.121.33:9202/"
        es = elasticsearch.Elasticsearch(url,timeout=120)
        return es
es = ES.connect_host()

#连接influxdb
client = InfluxDBClient(host="192.168.121.33", port="8086", username='admin', password='admin', database='esl')
client.create_database('esl')

#DSL查询语法
data = {
    "query": { "match_all" : {}},
    "size": 100
}

# 设置要过滤返回的字段值,要什么字段。
    'hits.hits._source.resource_id',
    'hits.hits._source.timestamp',
    'hits.hits._source.counter_volume',
    'hits.hits._source.@timestamp',
]

# 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用
res = es.search(
    index='pipefilter_meters*',
    doc_type ='canaledge.flow.bytes',
    body=data,
    search_type="scan",
    scroll="10m"
)
scroll_id = res['_scroll_id']

response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,)
scroll_id = response['_scroll_id']   #获取第二次scroll_id
hits = response['hits']['hits'] 
in_data = []

while len(hits) > 0:
    for i in hits:
        res_id = i['_source']['resource_id']
        r_id, r_type = res_id.split(':')
        datas = {
            "measurement": "es_net",
            "tags": {
                 "resource_id": r_id,
                 "type": r_type
             },
            "time": i['_source']['timestamp'],
            "fields": {
                "counter_volume": i['_source']['counter_volume']
            }
        }
        in_data.append(datas)
    #循环写入influxdb
    client.write_points(in_data)
    in_data = []   #每次循环完重新定义列表为空

    data = {
        "query": { "match_all" : {}},
        "size": 100
    }
    ## 设置要过滤返回的字段值,要什么字段。
        '_scroll_id',
        'hits.hits._source.resource_id',
        'hits.hits._source.timestamp',
        'hits.hits._source.counter_volume',
        'hits.hits._source.@timestamp',
    ]

    ## 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用
    response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,)
    #调试
    #if not response.get('hits'):
    #    print response
    #    sys.exit(1)
    #else:
    
    hits = response['hits']['hits']
    scroll_id = response["_scroll_id"] #获取第三次scroll_id


阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯