Easticsearch 数据迁移至influxdb python
需求:将Easticsearch部分数据迁移至influxdb中。
见过从mysql,influxdb迁移至Easticsearch中的,没见过从Easticsearch迁移至influxdb中,迁移的数据是一些实时性的流量数据,influxdb时序性数据库对这类数据的支撑比较客观。
解决方案:大批量从Easticsearch取数据,两种方案。1.from...size 2.scroll (类似于数据库的游标) 脚本采用第二种scroll方案对Easticsearch 查询取数据。循环通过scrool_id进行查询并写入influxdb中。
#!/usr/bin/env python
#coding=utf-8
import sys
import json
import datetime
import elasticsearch
from influxdb import InfluxDBClient
#连接Easticsearch
class ES(object):
@classmethod
def connect_host(cls):
url = "http://192.168.121.33:9202/"
es = elasticsearch.Elasticsearch(url,timeout=120)
return es
es = ES.connect_host()
#连接influxdb
client = InfluxDBClient(host="192.168.121.33", port="8086", username='admin', password='admin', database='esl')
client.create_database('esl')
#DSL查询语法
data = {
"query": { "match_all" : {}},
"size": 100
}
# 设置要过滤返回的字段值,要什么字段。
'hits.hits._source.resource_id',
'hits.hits._source.timestamp',
'hits.hits._source.counter_volume',
'hits.hits._source.@timestamp',
]
# 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用
res = es.search(
index='pipefilter_meters*',
doc_type ='canaledge.flow.bytes',
body=data,
search_type="scan",
scroll="10m"
)
scroll_id = res['_scroll_id']
response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,)
scroll_id = response['_scroll_id'] #获取第二次scroll_id
hits = response['hits']['hits']
in_data = []
while len(hits) > 0:
for i in hits:
res_id = i['_source']['resource_id']
r_id, r_type = res_id.split(':')
datas = {
"measurement": "es_net",
"tags": {
"resource_id": r_id,
"type": r_type
},
"time": i['_source']['timestamp'],
"fields": {
"counter_volume": i['_source']['counter_volume']
}
}
in_data.append(datas)
#循环写入influxdb
client.write_points(in_data)
in_data = [] #每次循环完重新定义列表为空
data = {
"query": { "match_all" : {}},
"size": 100
}
## 设置要过滤返回的字段值,要什么字段。
'_scroll_id',
'hits.hits._source.resource_id',
'hits.hits._source.timestamp',
'hits.hits._source.counter_volume',
'hits.hits._source.@timestamp',
]
## 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用
response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,)
#调试
#if not response.get('hits'):
# print response
# sys.exit(1)
#else:
hits = response['hits']['hits']
scroll_id = response["_scroll_id"] #获取第三次scroll_id