BigCustomerFansDailyTask.py 3.66 KB
import elasticsearch
import pymysql
import datetime
from elasticsearch.helpers import scan
from mysql_tool import func_write_into_mysql_with_unique

hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
sql_ip = '192.168.16.11'
sql_port = 3306
sql_user = 'data_user'
sql_passwd = 'vip@123456'
database = 'short_video'
db = pymysql.connect(host=sql_ip, user=sql_user, password=sql_passwd,
                     db=database, port=sql_port, charset='utf8mb4')

now = datetime.datetime.now() - datetime.timedelta(2)
data_month = now.month
data_year = now.year
data_day = now.day
write_list = []
for systematic in ['short_video']:
    releaser_id_list_re = []
    search_releaser = {
        "query": {
            "bool": {
                "should": [
                    {"match": {"project_tags.keyword": "央视三农"}},
                    {"match": {"project_tags.keyword": "大客户部"}},
                    {"match": {"project_tags.keyword": "星光大道"}}
                ],
                "minimum_should_match": 1,
                "filter": []
            }
        }
    }

    if systematic == 'short_video':
        platform_dict = {"terms": {"platform.keyword": ["toutiao", "抖音", "腾讯新闻", "miaopai", "腾讯视频", "new_tudou",
                                                        "haokan", "kwai"]}}

    if systematic == 'weixin':
        platform_dict = {"terms": {"platform.keyword": ["weixin"]}}

    search_releaser["query"]["bool"]["filter"].append(platform_dict)

    scan_re = scan(client=es, index='target_releasers', query=search_releaser)
    releaser_id_list = {}
    for one in scan_re:
        tmp_dict = {}
        line = one['_source']
        releaser_id_str = line['releaser_id_str']
        platform = line['platform']
        try:
            releaserUrl = line['releaserUrl']
        except:
            releaserUrl = ''
        releaser_name = line['releaser']
        releaser_id_list[releaser_id_str] = {'platform': platform, 'releaserUrl': releaserUrl,
                                             'releaser': releaser_name}
    releaser_id_list_re = releaser_id_list
    for one in releaser_id_list_re:
        search_fans = {
            "query": {
                "bool": {
                    "filter": [
                        {"term": {"data_month": data_month}},
                        {"term": {"data_year": data_year}},
                        {"term": {"data_day": data_day}},
                        {"term": {"releaser_id_str.keyword": one}}
                    ]
                }
            }
        }
        search_fans_re = es.search(index='releaser_fans', doc_type='doc', body=search_fans)
        if search_fans_re['hits']['total'] > 0:
            try:
                fans = search_fans_re['hits']['hits'][0]['_source']['releaser_followers_count']
            except:
                fans = 0
        else:
            continue
        releaser = releaser_id_list_re[one]['releaser'].replace('\t', '').replace('"', '')
        fans_re = {'releaser_id_str': one,
                   'platform': releaser_id_list_re[one]['platform'],
                   'releaser_followers_count': fans,
                   'releaserUrl': releaser_id_list_re[one]['releaserUrl'],
                   'releaser': releaser,
                   'fetch_day': data_day,
                   'fetch_month': data_month,
                   'fetch_year': data_year
                   }
        write_list.append(fans_re)

    func_write_into_mysql_with_unique(db=db, tablename='releaser_daily',
                                      log_file='', data_dict_list=write_list)