import elasticsearch import pymysql from elasticsearch.helpers import scan from mysql_tool import func_write_into_mysql_with_unique hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) sql_ip = '192.168.16.11' sql_port = 3306 sql_user = 'data_user' sql_passwd = 'vip@123456' database = 'weibo_data' db = pymysql.connect(host=sql_ip, user=sql_user, password=sql_passwd, db=database, port=sql_port, charset='utf8mb4') write_list = [] for systematic in ['short_video', 'weixin']: releaser_id_list_re = [] search_releaser = { "query": { "bool": { "filter": [ {"term": {"is_valid": "true"}} ], "should": [ {"term": {"央视三农.keyword": "True"}}, {"term": {"大客户部.keyword": "True"}} ], "minimum_should_match": 1 } } } if systematic == 'short_video': platform_dict = {"terms": {"platform.keyword": ["toutiao", "抖音", "腾讯新闻", "miaopai", "腾讯视频", "new_tudou", "haokan", "kwai"]}} if systematic == 'weixin': platform_dict = {"terms": {"platform.keyword": ["weixin"]}} search_releaser["query"]["bool"]["filter"].append(platform_dict) scan_re = scan(client=es, index='target_releasers', query=search_releaser) releaser_id_list = {} for one in scan_re: tmp_dict = {} line = one['_source'] releaser_id_str = line['releaser_id_str'] platform = line['platform'] try: releaserUrl = line['releaserUrl'] except: releaserUrl = '' releaser_name = line['releaser'] releaser_id_list[releaser_id_str] = {'platform': platform, 'releaserUrl': releaserUrl, 'releaser': releaser_name} releaser_id_list_re = releaser_id_list for one in releaser_id_list_re: search_fans = { "query": { "bool": { "filter": [ {"terms": {"data_month": [10]}}, {"terms": {"data_year": [2019]}}, {"term": {"releaser_id_str.keyword": one}}, {"term": {"stats_type.keyword": "observed"}} ] } } } search_fans_re = es.search(index='releaser', doc_type='releasers', body=search_fans) if search_fans_re['hits']['total'] > 0: try: fans = search_fans_re['hits']['hits'][0]['_source']['releaser_followers_count'] except: fans = 0 else: fans = 0 releaser = releaser_id_list_re[one]['releaser'].replace('\t', '').replace('"', '') fans_re = {'releaser_id_str': one, 'platform': releaser_id_list_re[one]['platform'], 'releaser_followers_count': fans, 'releaserUrl': releaser_id_list_re[one]['releaserUrl'], 'releaser': releaser} write_list.append(fans_re) func_write_into_mysql_with_unique(db=db, tablename='short_video_weixin_releaser', log_file='', data_dict_list=write_list)