1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import elasticsearch
import pymysql
import datetime
from elasticsearch.helpers import scan
from mysql_tool import func_write_into_mysql_with_unique
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
sql_ip = '192.168.16.11'
sql_port = 3306
sql_user = 'data_user'
sql_passwd = 'vip@123456'
database = 'short_video'
db = pymysql.connect(host=sql_ip, user=sql_user, password=sql_passwd,
db=database, port=sql_port, charset='utf8mb4')
now = datetime.datetime.now() - datetime.timedelta(2)
data_month = now.month
data_year = now.year
data_day = now.day
write_list = []
for systematic in ['short_video']:
releaser_id_list_re = []
search_releaser = {
"query": {
"bool": {
"should": [
{"match": {"project_tags.keyword": "央视三农"}},
{"match": {"project_tags.keyword": "大客户部"}},
{"match": {"project_tags.keyword": "星光大道"}}
],
"minimum_should_match": 1,
"filter": []
}
}
}
if systematic == 'short_video':
platform_dict = {"terms": {"platform.keyword": ["toutiao", "抖音", "腾讯新闻", "miaopai", "腾讯视频", "new_tudou",
"haokan", "kwai"]}}
if systematic == 'weixin':
platform_dict = {"terms": {"platform.keyword": ["weixin"]}}
search_releaser["query"]["bool"]["filter"].append(platform_dict)
scan_re = scan(client=es, index='target_releasers', query=search_releaser)
releaser_id_list = {}
for one in scan_re:
tmp_dict = {}
line = one['_source']
releaser_id_str = line['releaser_id_str']
platform = line['platform']
try:
releaserUrl = line['releaserUrl']
except:
releaserUrl = ''
releaser_name = line['releaser']
releaser_id_list[releaser_id_str] = {'platform': platform, 'releaserUrl': releaserUrl,
'releaser': releaser_name}
releaser_id_list_re = releaser_id_list
for one in releaser_id_list_re:
search_fans = {
"query": {
"bool": {
"filter": [
{"term": {"data_month": data_month}},
{"term": {"data_year": data_year}},
{"term": {"data_day": data_day}},
{"term": {"releaser_id_str.keyword": one}}
]
}
}
}
search_fans_re = es.search(index='releaser_fans', doc_type='doc', body=search_fans)
if search_fans_re['hits']['total'] > 0:
try:
fans = search_fans_re['hits']['hits'][0]['_source']['releaser_followers_count']
except:
fans = 0
else:
continue
releaser = releaser_id_list_re[one]['releaser'].replace('\t', '').replace('"', '')
fans_re = {'releaser_id_str': one,
'platform': releaser_id_list_re[one]['platform'],
'releaser_followers_count': fans,
'releaserUrl': releaser_id_list_re[one]['releaserUrl'],
'releaser': releaser,
'fetch_day': data_day,
'fetch_month': data_month,
'fetch_year': data_year
}
write_list.append(fans_re)
func_write_into_mysql_with_unique(db=db, tablename='releaser_daily',
log_file='', data_dict_list=write_list)