1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import elasticsearch
import pymysql
from elasticsearch.helpers import scan
from mysql_tool import func_write_into_mysql_with_unique
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
sql_ip = '192.168.16.11'
sql_port = 3306
sql_user = 'data_user'
sql_passwd = 'vip@123456'
database = 'weibo_data'
db = pymysql.connect(host=sql_ip, user=sql_user, password=sql_passwd,
db=database, port=sql_port, charset='utf8mb4')
write_list = []
for systematic in ['short_video', 'weixin']:
releaser_id_list_re = []
search_releaser = {
"query": {
"bool": {
"filter": [
{"term": {"is_valid": "true"}}
],
"should": [
{"term": {"央视三农.keyword": "True"}},
{"term": {"大客户部.keyword": "True"}}
],
"minimum_should_match": 1
}
}
}
if systematic == 'short_video':
platform_dict = {"terms": {"platform.keyword": ["toutiao", "抖音", "腾讯新闻", "miaopai", "腾讯视频", "new_tudou",
"haokan", "kwai"]}}
if systematic == 'weixin':
platform_dict = {"terms": {"platform.keyword": ["weixin"]}}
search_releaser["query"]["bool"]["filter"].append(platform_dict)
scan_re = scan(client=es, index='target_releasers', query=search_releaser)
releaser_id_list = {}
for one in scan_re:
tmp_dict = {}
line = one['_source']
releaser_id_str = line['releaser_id_str']
platform = line['platform']
try:
releaserUrl = line['releaserUrl']
except:
releaserUrl = ''
releaser_name = line['releaser']
releaser_id_list[releaser_id_str] = {'platform': platform, 'releaserUrl': releaserUrl,
'releaser': releaser_name}
releaser_id_list_re = releaser_id_list
for one in releaser_id_list_re:
search_fans = {
"query": {
"bool": {
"filter": [
{"terms": {"data_month": [10]}},
{"terms": {"data_year": [2019]}},
{"term": {"releaser_id_str.keyword": one}},
{"term": {"stats_type.keyword": "observed"}}
]
}
}
}
search_fans_re = es.search(index='releaser', doc_type='releasers', body=search_fans)
if search_fans_re['hits']['total'] > 0:
try:
fans = search_fans_re['hits']['hits'][0]['_source']['releaser_followers_count']
except:
fans = 0
else:
fans = 0
releaser = releaser_id_list_re[one]['releaser'].replace('\t', '').replace('"', '')
fans_re = {'releaser_id_str': one,
'platform': releaser_id_list_re[one]['platform'],
'releaser_followers_count': fans,
'releaserUrl': releaser_id_list_re[one]['releaserUrl'],
'releaser': releaser}
write_list.append(fans_re)
func_write_into_mysql_with_unique(db=db, tablename='short_video_weixin_releaser',
log_file='', data_dict_list=write_list)