import elasticsearch import pymysql import datetime from elasticsearch.helpers import scan from mysql_tool import func_write_into_mysql_with_unique hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) sql_ip = '192.168.16.11' sql_port = 3306 sql_user = 'data_user' sql_passwd = 'vip@123456' database = 'short_video' db = pymysql.connect(host=sql_ip, user=sql_user, password=sql_passwd, db=database, port=sql_port, charset='utf8mb4') now = datetime.datetime.now() - datetime.timedelta(2) data_month = now.month data_year = now.year data_day = now.day write_list = [] for systematic in ['short_video']: releaser_id_list_re = [] search_releaser = { "query": { "bool": { "should": [ {"match": {"project_tags.keyword": "央视三农"}}, {"match": {"project_tags.keyword": "大客户部"}}, {"match": {"project_tags.keyword": "星光大道"}} ], "minimum_should_match": 1, "filter": [] } } } if systematic == 'short_video': platform_dict = {"terms": {"platform.keyword": ["toutiao", "抖音", "腾讯新闻", "miaopai", "腾讯视频", "new_tudou", "haokan", "kwai"]}} if systematic == 'weixin': platform_dict = {"terms": {"platform.keyword": ["weixin"]}} search_releaser["query"]["bool"]["filter"].append(platform_dict) scan_re = scan(client=es, index='target_releasers', query=search_releaser) releaser_id_list = {} for one in scan_re: tmp_dict = {} line = one['_source'] releaser_id_str = line['releaser_id_str'] platform = line['platform'] try: releaserUrl = line['releaserUrl'] except: releaserUrl = '' releaser_name = line['releaser'] releaser_id_list[releaser_id_str] = {'platform': platform, 'releaserUrl': releaserUrl, 'releaser': releaser_name} releaser_id_list_re = releaser_id_list for one in releaser_id_list_re: search_fans = { "query": { "bool": { "filter": [ {"term": {"data_month": data_month}}, {"term": {"data_year": data_year}}, {"term": {"data_day": data_day}}, {"term": {"releaser_id_str.keyword": one}} ] } } } search_fans_re = es.search(index='releaser_fans', doc_type='doc', body=search_fans) if search_fans_re['hits']['total'] > 0: try: fans = search_fans_re['hits']['hits'][0]['_source']['releaser_followers_count'] except: fans = 0 else: continue releaser = releaser_id_list_re[one]['releaser'].replace('\t', '').replace('"', '') fans_re = {'releaser_id_str': one, 'platform': releaser_id_list_re[one]['platform'], 'releaser_followers_count': fans, 'releaserUrl': releaser_id_list_re[one]['releaserUrl'], 'releaser': releaser, 'fetch_day': data_day, 'fetch_month': data_month, 'fetch_year': data_year } write_list.append(fans_re) func_write_into_mysql_with_unique(db=db, tablename='releaser_daily', log_file='', data_dict_list=write_list)