1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 9 16:47:12 2018
@author: zhouyujiang
"""
import elasticsearch
import datetime
from elasticsearch.helpers import scan
import pandas as pd
import json
from func_cal_doc_id import cal_doc_id
from urllib import parse
hosts='192.168.17.11'
port=80
user='zhouyujiang'
passwd='8tM9JDN2LVxM'
http_auth=(user, passwd)
es=elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
index='short-video-production'
doc_type='daily-url'
set_url = set()
count=0
with open('buurl.csv') as f:
header_Lst=f.readline().strip().split(',')
for line in f:
line_Lst=line.strip().split(',')
line_dict=dict(zip(header_Lst, line_Lst))
releaser=line_dict['releaser']
platform=line_dict['platform']
bulk_all_body=''
search_body={
"query": {
"bool": {
"filter": [
{"term": {"platform.keyword": platform}},
{"term": {"releaser.keyword": releaser}},
{"range": {"release_time": {"gte": 1538323200000,"lt":1541001600000}}},
{"range": {"fetch_time": {"gte": 1541692800000,"lt":1542038400000}}}
]
}
}
}
scan_re=scan(client=es,
query=search_body,
index=index,
doc_type=doc_type,
scroll='5m',
request_timeout=100
)
for one_scan in scan_re:
count=count+1
line=one_scan['_source']
url = line['url']
platform = line['platform']
if url not in set_url:
set_url.add(url)
platform=line['platform']
if platform=='腾讯新闻':
doc_id = cal_doc_id(platform, data_dict=line, doc_id_type='all-time-url')
else:
doc_id = cal_doc_id(platform, url=url,doc_id_type='all-time-url')
print(doc_id)
monthly_net_inc_play_count = line['play_count']
monthly_net_inc_comment_count = line['comment_count']
monthly_net_inc_favorite_count = line['favorite_count']
monthly_cal_base = 'accumulated_values'
timestamp = int(datetime.datetime.timestamp(datetime.datetime.now())*1000)
line.update({
'timestamp':timestamp,
'monthly_cal_base':monthly_cal_base,
'monthly_net_inc_favorite_count':monthly_net_inc_favorite_count,
'monthly_net_inc_comment_count':monthly_net_inc_comment_count,
'monthly_net_inc_play_count':monthly_net_inc_play_count
})
bulk_head='{"index": {"_id":"%s"}}' % doc_id
data_str=json.dumps(line, ensure_ascii=False)
bulk_one_body = bulk_head+'\n'+data_str+'\n'
bulk_all_body += bulk_one_body
if count%100 == 0:
eror_dic=es.bulk(index='short-video-production', doc_type='daily-url-2018-10-31',
body=bulk_all_body, request_timeout=200)
bulk_all_body=''
if eror_dic['errors'] is True:
print(eror_dic['items'])
print(bulk_all_body)
print(count)
if bulk_all_body != '':
eror_dic = es.bulk(body=bulk_all_body,
index='short-video-production',
doc_type='daily-url-2018-10-31' ,
request_timeout=200)
if eror_dic['errors'] is True:
print(eror_dic)
bulk_all_body = ''