ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。
安装API
pip3 install elasticsearch
建立es连接
无用户名密码状态
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])
默认的超时时间是10秒,如果数据量很大,时间设置更长一些。如果端口是9200,直接写IP即可。代码如下:
es = Elasticsearch(['10.10.13.12'], timeout=3600)
用户名密码状态
如果Elasticsearch开启了验证,需要用户名和密码
es = Elasticsearch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)
数据检索功能
es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')
常用参数
index - 索引名
q - 查询指定匹配 使用Lucene查询语法
from_ - 查询起始点 默认0
doc_type - 文档类型
size - 指定查询条数 默认10
field - 指定字段 逗号分隔
sort - 排序 字段:asc/desc
body - 使用Query DSL
scroll - 滚动查询
统计查询功能
语法同search大致一样,但只输出统计值
es.count(index='logstash-2015.08.21', q='http_status_code:500')
输出:
{'_shards':{'failed':0, 'successful':5, 'total':5}, 'count':17042}
17042 就是统计值!
知识扩展
滚动demo
# Initialize the scroll
page = es.search(
index ='yourIndex',
doc_type ='yourType',
scroll ='2m',
search_type ='scan',
size =1000,
body ={
# Your query's body
})
sid = page['_scroll_id']
scroll_size = page['hits']['total']
# Start scrolling
while(scroll_size >0):
print "Scrolling..."
page = es.scroll(scroll_id = sid, scroll ='2m')
# Update the scroll ID
sid = page['_scroll_id']
# Get the number of results that we returned in the last scroll
scroll_size = len(page['hits']['hits'])
print "scroll size: "+ str(scroll_size)
# Do something with the obtained page
以上demo实现了一次取若干数据,数据取完之后结束,不会获取到最新更新的数据。我们滚动完之后想获取最新数据怎么办?滚动的时候会有一个统计值,如total: 5。跳出循环之后,我们可以用_from参数定位到5开始滚动之后的数据。
但是我用的不是这个,用的是以下方法,链接如下:
https://www.cnblogs.com/blue163/p/8126156.html
在下面的内容中,我会详细介绍此代码如何使用!
range过滤器查询范围
gt: > 大于
lt: < 小于
gte: >= 大于或等于
lte: <= 小于或等于
示例代码1
"range":{
"money":{
"gt":20,
"lt":40
}
}
时间范围
最近时间段
比如我要查询最近1分钟的
"range": {
'@timestamp': {'gt': 'now-1m'}
}
最新1小时
"range": {
'@timestamp': {'gt': 'now-1h'}
}
最新1天的
"range": {
'@timestamp': {'gt': 'now-1d'}
}
指定时间段
那么问题来了,它是根据当前时间来计算最近的时间。但是有些情况下,我需要制定时间范围,精确到分钟
假设需要查询早上8点到9点的数据,可以这样
"range": {
'@timestamp': {
"gt" : "{}T{}:00:00".format("2018-12-17","08"),
"lt": "{}T{}:59:59".format("2018-12-17","09"),
"time_zone": "Asia/Shanghai"
}
}
注意:日期和小时之间,有一个字母T来间隔。不能用空格!
time_zone 表示时区,如果默认的时区不会,可能会影响查询结果!
bool组合过滤器
must:所有分句都必须匹配,与 AND 相同。
must_not:所有分句都必须不匹配,与 NOT 相同。
should:至少有一个分句匹配,与 OR 相同。
示例代码
{
"bool":{
"must":[],
"should":[],
"must_not":[],
}
}
term过滤器
term单过滤
{ "terms":{
"money":20
}
}
表示money包含20的记录
terms复数版本
允许多个匹配条件
{ "terms":{
"money":20
}
}
表示money包含20或者30的记录
结合bool+term来举一个实际的例子:
查询path字段中包含applogs最近1分钟的记录
"bool": {
"must": [
{
"terms": {
"path": [
"applogs",
]
}
},
{
"range": {
'@timestamp': {'gt': 'now-1m'}
}
}
]
}
这里使用了terms复数版本,可以随时添加多个条件!
正则查询
{
"regexp": {
"http_status_code": "5.*"
}
}
match查询
match 精确匹配
{
"match":{
"email":"123456@qq.com"
}
}
multi_match 多字段搜索
{
"multi_match":{
"query":"11",
"fields":["Tr","Tq"]
}
}
demo
获取最近一小时的数据
{'query':
{'filtered':
{'filter':
{'range':
{'@timestamp':{'gt':'now-1h'}}
}
}
}
}
条件过滤查询
{
"query":{
"filtered":{
"query":{"match":{"http_status_code":500}},
"filter":{"term":{"server_name":"vip03"}}
}
}
}
Terms Facet 单字段统计
{'facets':
{'stat':
{'terms':
{'field':'http_status_code',
'order':'count',
'size':50}
}
},
'size':0
}
一次统计多个字段
{'facets':
{'cip':
{'terms':
{'fields':['client_ip']}},
'status_facets':{'terms':{'fields':['http_status_code'],
'order':'term',
'size':50}}},
'query':{'query_string':{'query':'*'}},
'size':0
}
多个字段一起统计
{'facets':
{'tag':
{'terms':
{'fields':['http_status_code','client_ip'],
'size':10
}
}
},
'query':
{'match_all':{}},
'size':0
}
数据组装
以下是kibana首页的demo,用来统计一段时间内的日志数量
{
"facets": {
"0": {
"date_histogram": {
"field": "@timestamp",
"interval": "5m"
},
"facet_filter": {
"fquery": {
"query": {
"filtered": {
"query": {
"query_string": {
"query": "*"
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
'gt': 'now-1h'
}
}
},
{
"exists": {
"field": "http_status_code.raw"
}
},
# --------------- -------
# 此处加匹配条件
]
}
}
}
}
}
}
}
},
"size": 0
}
如果想添加匹配条件,在以上代码标识部分加上过滤条件,按照以下代码格式即可
{
"query": {
"query_string": {"query": "backend_name:baidu.com"}
}
},
先介绍到这里,后续会有Query DSL API介绍。
需求
下面是kibana展示的日志
需要统计某一天的日志,统计每一个小时用户数,要求用户id不能重复。一个用户id就是一个用户,也称之为一个PV。
看一段message字段信息
2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user] [Class=o.s.u.c.AccountController:1189] [Method=findCustomerByLoid]- Operation=find customer by loid,Params=loid:001,Content=start
其中有一个[r=9538381535],这个9538381535就是用户id。那么用户登录手机APP操作,都会带着这个id,产生一条日志。
比如user项目,那么最终要的数据格式如下:
"user":{
"00":1,
"01":0,
...
"22":3245,
"23":765
}
这里使用24小时制来表示每一个时间段,有多个个用户访问了。注意:已经去重了用户id,统计用户数!
在放出最终代码之前,先来介绍相关技术点,便于理解代码。按照代码从上到下原则,分别来介绍!
项目列表
project_list = ['user',...]
实际的项目是user,但是存储到elasticsearch中,是userlogs,加了一个logs后缀。这个是java后端代码定义的,便于识别!
判断日期是否合法
def isVaildDate(self, date):
try:
if ":" in date:
time.strptime(date, "%Y-%m-%d %H:%M:%S")
else:
time.strptime(date, "%Y-%m-%d")
return True
except:
return False
因为需要统计一周的数据,所以脚本执行时,需要传一个日期参数。那么日期参数,传给程序是否合法呢?需要有一个函数来判断!
记录日志
def write_log(self, content):
"""
写入日志文件
:param path:
:param content:
:return:
"""
path = "print.log"
with open(path, mode='a+', encoding='utf-8') as f:
content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n"
print(content)
f.write(content)
为啥不用Python的日志模块呢?因为测试发现,它写入一些,我不想要的信息,太占用磁盘空间了。所以,我单独写了一个记录日志方法。
获取elasticsearch数据
def Get_Data_By_Body(self, project, fixed_date, hour):
"""
获取数据
:param project: 项目名
:param fixed_date: 指定日期
:param hour: 24小时制中的某一个小时
:return: object
"""
# 查询条件,查询项目最近1小时的数据。
doc = {
"query": {
"bool": {
"must": [
{
"terms": {
"path": [
project + "logs",
]
}
},
{
"range": {
'@timestamp': {
"gt": "{}T{}:00:00".format(fixed_date, hour),
"lt": "{}T{}:59:59".format(fixed_date, hour),
"time_zone": "Asia/Shanghai"
}
}
}
]
}
}
}
由于线上数据量过大,因此直接查询一天的数据,会卡死。所以是切分为每一个小时查询!
上面的query表示查询语句,大概就是查询指定项目(项目名+logs),1小时范围内的数据
scroll获取数据
由于1小时内的数据量,也很大。不能直接返回!默认不指定size,是返回10条数据!
size = 1000 # 指定返回1000条
queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )
参数解释:
size 指定返回的条数,默认返回10条
index 指定索引名
body 查询语句
scroll 告诉 Elasticsearch 把搜索上下文再保持一分钟。1m表示1分钟
返回结果
mdata = queryData.get("hits").get("hits") # 返回数据,它是一个列表类型
if not mdata:
self.write_log('%s mdata is empty!' % project)
queryData 返回一个字典,那么真正的查询结果在queryData['hits']['hits']中,如果这个值没有,表示没有查询到数据!
注意:它并不是返回所有的结果,而是一页的数据,是一个列表类型。因为我们使用了scroll获取数据,只返回一页!
分页数据
上面只是返回了1页,我要所有数据,怎么办?需要使用分页,先来看一下分页公式
divmod(总条数, 每页大小)
注意:divmod返回一个元祖,第一个元素,就是要分页数
总条数,使用
total = queryData['hits']['total'] # 返回数据的总条数
每页大小,就是上面指定的size
size = 1000 # 指定返回1000条
那么遍历每一页数据,需要这样
scroll_id = queryData['_scroll_id'] # 获取scrollID
total = queryData['hits']['total'] # 返回数据的总条数
# 使用divmod设置分页查询
# divmod(total,1000)[0]+1 表示总条数除以1000,结果取整数加1
for i in range(divmod(total, size)[0] + 1):
res = self.es.scroll(scroll_id=scroll_id, scroll='1m') # scroll参数必须指定否则会报错
mdata += res["hits"]["hits"] # 扩展列表
scroll_id给es.scroll获取数据使用,这个参数必须要有。
由于Python中的range是顾头不顾尾,所以需要加1。使用for循环,就可以遍历每一个分页数
es.scroll(scroll_id=scroll_id, scroll='1m') 才是真正查询每一页的数据,必须要指定这2个参数。它的返回结果,就是查询结果!返回一个列表
上面的mdata是一个列表,res也是列表。因此使用+=就可以扩展列表,得到所有数据!
创建年月日目录
def create_folder(self, fixed_date):
"""
创建年/月/日 文件夹
:return: path
"""
# 系统当前时间年份
# year = time.strftime('%Y', time.localtime(time.time()))
# # 月份
# month = time.strftime('%m', time.localtime(time.time()))
# # 日期
# day = time.strftime('%d', time.localtime(time.time()))
# 年月日
year, month, day = fixed_date.split("-")
# 具体时间 小时分钟毫秒
# mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time()))
# 判断基础目录是否存在
if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')):
os.mkdir(os.path.join(self.BASE_DIR, 'data_files'))
# 年月日
fileYear = os.path.join(self.BASE_DIR, 'data_files', year)
fileMonth = os.path.join(fileYear, month)
fileDay = os.path.join(fileMonth, day)
# 判断目录是否存在,否则创建
try:
if not os.path.exists(fileYear):
os.mkdir(fileYear)
os.mkdir(fileMonth)
os.mkdir(fileDay)
else:
if not os.path.exists(fileMonth):
os.mkdir(fileMonth)
os.mkdir(fileDay)
else:
if not os.path.exists(fileDay):
os.mkdir(fileDay)
return fileDay
except Exception as e:
print(e)
return False
统计结果是最终写入到一个txt里面,那么如何存储呢?使用年月日目录在区分,可以知道这个txt文件,是属于哪一天的。到了一定时间后,可以定期清理,非常方便!
这里使用的传参方式,传入一个日期。所以使用"-"就可以切割出年月日
# 年月日year, month, day = fixed_date.split("-")
输出24小时
使用以下代码就可以实现
hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
输出:
['00', '01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23']
项目统计字典
需要统计每一个项目的每一个小时的用户id,用户id必须去重。既然要去重,我们首先会想到用集合。
但是还有一个办法,使用字典,也可以去重。因为字典的key是唯一的。
构造24小时字典
先来构造项目user的数据,格式如下:
"basebusiness": {
"00": {},
"01": {},
"02": {},
"03": {},
"04": {},
"05": {},
"06": {},
"07": {},
"08": {},
"09": {},
"10": {},
"11": {},
"12": {},
"13": {},
"14": {},
"15": {},
"16": {},
"17": {},
"18": {},
"19": {},
"20": {},
"21": {},
"22": {},
"23": {},
}
这只是一个项目,实际有很多项目。所以每一个字典,都有这样的24小时数据。相关代码如下:
project_dic = {} # 项目统计字典
# 24小时
hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
for hour in hour_list: # 遍历24小时
# print("查询{}点的数据###############".format(hour))
self.write_log("查询{}点的数据###############".format(hour))
for project in project_list: # 遍历项目列表
if not project_dic.get(project):
project_dic[project] = {} # 初始化项目字典
if not project_dic[project].get(hour):
project_dic[project][hour] = {} # 初始化项目小时字典
这里的每一个小时,都是空字典。还没有添加数据,需要添加用户id,下面会讲到!
正则匹配用户id
看这一点字符串
2018-12-17 12:00:00,533 l=INFO [r=9538381535][s=2] [t=http-xxx-543] [APP=user]
需要提取出9538381535,思路就是:匹配中括号内容-->提取以r=开头的内容-->使用等号切割,获取用户id
匹配中括号内容
p1 = re.compile(r'[[](.*?)[]]', re.S) # 最小匹配,匹配中括号的内容
注意:这里要使用最小匹配,不能使用贪婪匹配。这一段正则,我是用网上找的,测试ok
提取和切割,就比较简单了。使用startswith和split方法,就可以了!
使用字典去重
接下来,需要将用户id写入到字典中,需要去重,否则字典添加时,会报错!
那么如何使用字典去重呢?只需要遵循一个原则即可! 有则忽略,无则添加
# 判断字典中rid不存在时,避免字典键值重复
if not project_dic[project][hour].get(rid):
project_dic[project][hour][rid] = True # 添加值
生成器
这里主要在2个方法中,使用了生成器。生成器的优点,就是节省内容。
一处在是Get_Data_By_Body方法中,它需要返回所有查询的数据。数据量非常大,因此必须要生成器,否则服务器内容就溢出!
还有一处,就main方法。它是返回项目的统计结果。注意,它不是最终结果。它里面保存了每一个项目,每一个小时中的用户id,是已经去重的用户id。
数据量也是比较大,当然,没有Get_Data_By_Body方法返回的结果大。
统计每一个小时用户数
main方法,返回的字典格式如下:
"user":{
"00":{
"242412":True,
}
"01":{
"":True,
},
...
"22":{
"457577":True,
"546583":True,
},
"23":{
"457577":True,
"546583":True,
"765743":True,
}
}
我需要知道,每一个小时的用户数。怎么统计呢?用2个方法
1. 遍历字典的每一个小时,使用计数器
2. 使用len方法(推荐)
最简单的方法,就是使用len方法,就可以知道每一个小时有多少个key
for i in dic: # 遍历数据
if not final_dic.get(i):
final_dic[i] = {} # 初始化字典
for h in sorted(dic[i]): # 遍历项目的每一个小时
# 统计字典的长度
final_dic[i][h] = len(dic[i][h])
有序字典
看下面的数据
可以发现,24小时,排序是乱的。这样给领导看时,不太美观。所以需要对24小时进行排序!
在Python 3.6之前,字典的key是无序的。因此,需要定义一个有序字典,在写入之前,要对字典的key做一次排序。
这样顺序写入到有序字典之后,之后再次调用,依然是有序的!
order_dic = OrderedDict() # 实例化一个有序字典
final_dic = {} # 最终统计结果
for dic in data: # 遍历生成器
for i in dic: # 遍历数据
if not final_dic.get(i):
final_dic[i] = order_dic # 初始化字典
# 有序字典必须先对普通字典key做排序
for h in sorted(dic[i]): # 遍历项目的每一个小时
# 统计字典的长度
final_dic[i][h] = len(dic[i][h])
完整代码
#!/usr/bin/env python3
# coding: utf-8
import re
import os
import sys
import json
import time
from collections import OrderedDict
from elasticsearch import Elasticsearch
# 项目列表
project_list = ['usercenter', ['login']]
# yesterday = (datetime.datetime.now() + datetime.timedelta(days=-1)).strftime("%Y-%m-%d")
# today = datetime.datetime.now().strftime("%Y-%m-%d")
class ElasticObj:
def __init__(self, index_name, ip, fixed_date, timeout=3600):
'''
:param index_name: 索引名称
:param ip: elasticsearch地址
:param timeout: 设置超时间,默认是10秒的,如果数据量很大,时间要设置更长一些
'''
self.index_name = index_name
self.ip = ip
self.timeout = timeout
# 无用户名密码状态
# self.es = Elasticsearch([self.ip], timeout=self.timeout)
# 用户名密码状态
# self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
self.es = Elasticsearch([self.ip], http_auth=('elastic', '123456'), timeout=self.timeout)
self.fixed_date = fixed_date # 指定日期
# 当前py文件所在的文件夹
self.BASE_DIR = os.path.dirname(os.path.abspath(__file__))
self.fileDay = self.create_folder() # 创建日志和数据目录
@staticmethod
def isVaildDate(date):
"""
判断日期是否合法
:param date: 日期,比如: 2018-03-30
:return: bool
"""
try:
if ":" in date:
time.strptime(date, "%Y-%m-%d %H:%M:%S")
else:
time.strptime(date, "%Y-%m-%d")
return True
except:
return False
def write_log(self, content):
"""
写入日志文件
:param content: 写入内容
:return:
"""
path = os.path.join(self.fileDay,"output_%s.log" %self.fixed_date)
# path = "output_{}.log".format(self.fixed_date)
with open(path, mode='a+', encoding='utf-8') as f:
content = time.strftime('%Y-%m-%d %H:%M:%S') + ' ' + content + "\n"
print(content)
f.write(content)
def Get_Data_By_Body(self, project, hour):
"""
获取数据
:param project: 项目名
:param hour: 24小时制中的某一个小时
:return: 生成器
"""
# doc = {'query': {'match_all': {}}}
# 查询条件,查询项目最近1小时的数据。now-1h表示最近1小时
# print(type(fixed_date))
# print("{date}T00:00:00".format(date=fixed_date))
# 24小时
doc = {
"query": {
"bool": {
"must": [
{
"terms": {
"path": [
project + "logs",
]
}
},
{
# "range": {
# '@timestamp': {'gt': 'now-1m'}
# }
"range": {
'@timestamp': {
"gt": "{}T{}:00:00".format(self.fixed_date, hour),
"lt": "{}T{}:59:59".format(self.fixed_date, hour),
"time_zone": "Asia/Shanghai"
}
}
}
]
}
}
}
# queryData = self.es.search(index=self.index_name, body=doc)
# scroll 参数告诉 Elasticsearch 把搜索上下文再保持一分钟,1m表示1分钟
# size 参数允许我们配置没匹配结果返回的最大命中数。每次调用 scroll API 都会返回下一批结果,直到不再有可以返回的结果,即命中数组为空。
size = 1000 # 指定返回1000条
queryData = self.es.search(index=self.index_name, body=doc, size=size, scroll='1m', )
# print(queryData['hits']['total'])
mdata = queryData.get("hits").get("hits") # 返回查询的数据,不是所有数据,而是一页的数据,它是一个列表类型
if not mdata:
self.write_log('%s mdata is empty!' % project)
# scroll_id 的值就是上一个请求中返回的 _scroll_id 的值
scroll_id = queryData['_scroll_id'] # 获取scrollID
total = queryData['hits']['total'] # 返回数据的总条数
# print("查询项目{} {}点的数据,总共有{}条".format(project,hour,total))
self.write_log("查询项目{} {}点的数据,总共有{}条".format(project, hour, total))
# 使用divmod设置分页查询
# divmod(total,1000)[0]+1 表示总条数除以1000,结果取整数加1
for i in range(divmod(total, size)[0] + 1):
res = self.es.scroll(scroll_id=scroll_id, scroll='1m') # scroll参数必须指定否则会报错
mdata += res["hits"]["hits"] # 扩展列表
# yield mdata
# print(mdata)
# return mdata
yield mdata
def create_folder(self):
"""
创建年/月/日 文件夹
:return: path
"""
# 系统当前时间年份
# year = time.strftime('%Y', time.localtime(time.time()))
# # 月份
# month = time.strftime('%m', time.localtime(time.time()))
# # 日期
# day = time.strftime('%d', time.localtime(time.time()))
# 年月日
year, month, day = self.fixed_date.split("-")
# 具体时间 小时分钟毫秒
# mdhms = time.strftime('%m%d%H%M%S', time.localtime(time.time()))
# 判断基础目录是否存在
if not os.path.exists(os.path.join(self.BASE_DIR, 'data_files')):
os.mkdir(os.path.join(self.BASE_DIR, 'data_files'))
# 年月日
fileYear = os.path.join(self.BASE_DIR, 'data_files', year)
fileMonth = os.path.join(fileYear, month)
fileDay = os.path.join(fileMonth, day)
# 判断目录是否存在,否则创建
try:
if not os.path.exists(fileYear):
os.mkdir(fileYear)
os.mkdir(fileMonth)
os.mkdir(fileDay)
else:
if not os.path.exists(fileMonth):
os.mkdir(fileMonth)
os.mkdir(fileDay)
else:
if not os.path.exists(fileDay):
os.mkdir(fileDay)
return fileDay
except Exception as e:
print(e)
return False
def main(self):
"""
主要处理逻辑
:return: 生成器
"""
project_dic = {} # 项目统计字典
# fixed_date = datetime.datetime.strptime(fixed_date, "%Y-%m-%d")
# strftime("%Y-%m-%d")
# conv_date = fixed_date.strftime("%Y-%m-%d")
# print(conv_date, type(conv_date))
# exit()
# now_hour = fixed_date.strftime('%H') # 当前时间的小时
# print(now_hour)
# 24小时
hour_list = ['{num:02d}'.format(num=i) for i in range(24)]
# hour_list = ['{num:02d}'.format(num=i) for i in range(2)]
# project="usercenter"
# project_dic[project] = {now_hour: {}} # 初始化字典
for hour in hour_list: # 遍历24小时
# print("查询{}点的数据###############".format(hour))
self.write_log("查询{}点的数据###############".format(hour))
for project in project_list: # 遍历项目列表
if not project_dic.get(project):
project_dic[project] = {} # 初始化项目字典
if not project_dic[project].get(hour):
project_dic[project][hour] = {} # 初始化项目小时字典
mdata = self.Get_Data_By_Body(project, hour) # 获取数据
for item in mdata: # 遍历生成器
for hit in item: # 遍历返回数据
# hit是一个字典
str1 = hit['_source']['message'] # 查询指定字典
p1 = re.compile(r'[[](.*?)[]]', re.S) # 最小匹配,匹配中括号的内容
for i in re.findall(p1, str1): # 遍历结果
if i.startswith('r='): # 判断以r=开头的
rid = i.split("=")[1] # 获取rid
# print("rid",rid)
# 判断字典中rid不存在时,避免字典键值重复
if not project_dic[project][hour].get(rid):
project_dic[project][hour][rid] = True # 添加值
time.sleep(1) # 休眠1秒钟
# return project_dic
yield project_dic
if __name__ == '__main__':
# fixed_date = "2018-12-16"
fixed_date = sys.argv[1] # 日期参数
if not ElasticObj.isVaildDate(fixed_date):
print("日期不合法!")
exit()
startime = time.time() # 开始时间
index_name = "common-*"
es_server = "192.168.92.131"
obj = ElasticObj(index_name, es_server, fixed_date) # 连接elasticsearch
print("正在查询日期%s这一天的数据" % fixed_date)
obj.write_log("###########################################")
obj.write_log("正在查询日期%s这一天的数据" % fixed_date)
data = obj.main()
# print("初步结果",data)
# fileDay = obj.create_folder() # 创建目录
# if not fileDay:
# # print("创建目录失败!")
# obj.write_log("创建目录失败!")
# exit()
order_dic = OrderedDict() # 实例化一个有序字典
final_dic = {} # 最终统计结果
for dic in data: # 遍历生成器
for i in dic: # 遍历数据
if not final_dic.get(i):
final_dic[i] = order_dic # 初始化字典
# 有序字典必须先对普通字典key做排序
for h in sorted(dic[i]): # 遍历项目的每一个小时
# 统计字典的长度
final_dic[i][h] = len(dic[i][h])
# print("最终结果",final_dic) # 统计结果
obj.write_log("最终结果执行完毕!")
# 写入文件
with open(os.path.join(obj.fileDay, 'access_data.txt'), encoding='utf-8', mode='a') as f:
f.write(json.dumps(final_dic) + "\n")
endtime = time.time()
take_time = endtime - startime
if take_time < 1: # 判断不足1秒时
take_time = 1 # 设置为1秒
# 计算花费时间
m, s = divmod(take_time, 60)
h, m = divmod(m, 60)
# print("本次花费时间 %02d:%02d:%02d" % (h, m, s))
obj.write_log("统计日期%s这一天的数据完成!请查阅data_files目录的日志和数据文件" % fixed_date)
obj.write_log("本次花费时间 %02d:%02d:%02d" % (h, m, s))
日志文件和数据文件,都在年月日目录里面!
本文参考链接:
http://www.cnblogs.com/letong/p/4749234.html
http://www.linuxyw.com/790.html
https://www.cnblogs.com/blue163/p/8126156.html