craw_data_and_write_into_monthly_index.py 12.2 KB
# -*- coding: utf-8 -*-
"""
Created on Thu Sep  6 09:22:24 2018

@author: fangyucheng
"""

#import time
import json 
#import argparse
import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan
from crawler.crawler_sys.framework.platform_crawler_register import get_crawler
from func_cal_doc_id import cal_doc_id
try:
    from crawler_sys.framework.func_get_releaser_id import *
except:
    from func_get_releaser_id import *

hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)

es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)

#parser = argparse.ArgumentParser()
#parser.add_argument('-w', '--week_str', type=str, default=None)


    
#def find_week_belongs_to(dayT, week_start_iso_weekday=1):
#    if isinstance(dayT, datetime.datetime):
#        dayD = datetime.date(dayT.year, dayT.month, dayT.day)
#    elif isinstance(dayT, datetime.date):
#        dayD = dayT
#    else:
#        print('Wrong type, input parameter must be instance of datetime.datetime '
#              'or datetime.date!')
#        return None
#    calendar_tupe = dayD.isocalendar()
#    param_iso_week_no = calendar_tupe[1]
#    param_iso_weekday = calendar_tupe[2]
#    param_iso_week_year = calendar_tupe[0]
#    if isinstance(week_start_iso_weekday, int) and week_start_iso_weekday>0:
#        if week_start_iso_weekday==1: # iso week, Mon is weekday 1, Sun is weekday 7
#            cal_week_no = param_iso_week_no
#            cal_week_year = param_iso_week_year
#            cal_weekday = param_iso_weekday
#            return (cal_week_year, cal_week_no, cal_weekday)
#    else:
#        print('Wrong parameter, must be positive int!')
#        return None

def get_target_releaser_video_info(platform,
                                   releaserUrl,
                                   log_file=None,

                                   output_to_es_raw=True,
                                   es_index=None,
                                   doc_type=None,
                                   releaser_page_num_max=100):
        if log_file == None:
            log_file = open('error.log' ,'w')

        crawler = get_crawler(platform=platform)
        crawler_initialization = crawler()
        if platform == 'haokan':
            try:
                crawler_initialization.releaser_page(releaserUrl=releaserUrl,
                                                     releaser_page_num_max=releaser_page_num_max,
                                                     output_to_es_raw=True,
                                                     es_index=es_index,
                                                     doc_type=doc_type,
                                                     fetchFavoriteCommnt=True)
            except:
                print(releaserUrl, platform, file=log_file)
        else:
            try:
                crawler_initialization.releaser_page(releaserUrl=releaserUrl,
                                                     releaser_page_num_max=releaser_page_num_max,
                                                     output_to_es_raw=True,
                                                     es_index=es_index,
                                                     doc_type=doc_type)
            except:
                print(releaserUrl, platform, file=log_file)



def func_search_reUrl_from_target_index(platform, releaser):
    search_body = {
            "query": {
                "bool": {
                  "filter": [
                    {"term": {"platform.keyword": platform}},
                    {"term": {"releaser.keyword": releaser}}
                    ]
                }
                  }
                   }
    search_re = es.search(index='target_releasers', doc_type='doc', body=search_body)
    if search_re['hits']['total'] > 0:
        return search_re['hits']['hits'][0]['_source']['releaserUrl']
    else:
        print('Can not found:', platform, releaser)
        return None


def func_write_into_monthly_index_new_released(line_list, doc_type, index='short-video-production-2020'):
    count = 0 
    bulk_all_body = ''
    re_list = []
    for line in line_list:
        count = count + 1
        # try:
        #     monthly_net_inc_favorite_count_net_inc_play_count = line['play_count']
        # except:
        #     monthly_net_inc_favorite_count_net_inc_play_count = line['play_count']
        monthly_net_inc_favorite_count_net_inc_play_count = line['play_count']
        monthly_net_inc_favorite_count_net_inc_comment_count = line['comment_count']
        monthly_net_inc_favorite_count_net_inc_favorite_count = line['favorite_count']
        try:
            monthly_net_inc_repost_count_net_inc_favorite_count = line['repost_count']
        except:
            monthly_net_inc_repost_count_net_inc_favorite_count = 0
        monthly_net_inc_favorite_count_cal_base = 'accumulate'
        timestamp = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)

        line.update({
                'timestamp': timestamp,
                'monthly_cal_base': monthly_net_inc_favorite_count_cal_base,
                'monthly_net_inc_favorite_count': monthly_net_inc_favorite_count_net_inc_favorite_count,
                'monthly_net_inc_comment_count': monthly_net_inc_favorite_count_net_inc_comment_count,
                'monthly_net_inc_play_count': monthly_net_inc_favorite_count_net_inc_play_count,
                'monthly_net_inc_repost_count': monthly_net_inc_repost_count_net_inc_favorite_count,
        })
        re_list.append(line)
        # if 'video_id' in line.keys():
        #     line.pop('video_id')
        url = line['url']
        platform = line['platform']
        doc_id = cal_doc_id(platform, url=url, doc_id_type='all-time-url',data_dict=line)
        bulk_head = '{"index": {"_id":"%s"}}' % doc_id
        data_str = json.dumps(line, ensure_ascii=False)

        bulk_one_body = bulk_head + '\n' + data_str + '\n'
        #
        bulk_all_body += bulk_one_body
        if count%500 == 0:

             eror_dic=es.bulk(index=index, doc_type=doc_type,
                    body=bulk_all_body, request_timeout=200)
             bulk_all_body=''
             if eror_dic['errors'] is True:
                 print(eror_dic['items'])
                 print(bulk_all_body)
             print(count)

    if bulk_all_body != '':
     eror_dic = es.bulk(body=bulk_all_body,
                        index=index,
                        doc_type=doc_type ,
                        request_timeout=200)
     if eror_dic['errors'] is True:
         print(eror_dic)
    print(count)
            
#def func_search_data_to_write_from_index_craw(re_s, re_e,releaser,platform):
#    search_body = {  
#            "query": {
#                "bool": {
#                  "filter": [
#                    {"term": {"platform.keyword": platform}},
#                    {"term": {"releaser.keyword": releaser}},
#                    {"range": {"release_time": {"gte": re_s,"lt":re_e}}}
#                    ]
#                }
#                  }
#                   }

re_s_t = 1582992000000
re_e_t = 1585670400000
miaopai_list = []
monthly_doc_type_name = 'daily-url-2020-03-31'
count_has = 0
# file = r'D:\work_file\发布者账号\一次性需求附件\brief-7月整月数据(含融合).csv'
file = r'D:\work_file\发布者账号\一次性需求附件\month3month2_releaser_video_num预警 2020-04-03.csv'
# file = r'D:\work_file\word_file_new\PROJECT_ORDINARY_COURSE\03_releaser_analysis\for_production_org\target_releasers - key_custom - 副本.csv'
# file = r'D:\work_file\发布者账号\一次性需求附件\month10month9_releaser_video_num预警 2019-11-04.csv'
file = r'D:\wxfile\WeChat Files\litaolemo\FileStorage\File\2020-04\少了数据的快手两个号.csv'
with open(file, 'r',encoding="gb18030")as f:
    header_Lst = f.readline().strip().split(',')
    for line in f:
        line_Lst = line.strip().split(',')
        line_dict = dict(zip(header_Lst, line_Lst))
        releaser = line_dict['releaser']
        platform = line_dict['platform']
        #_uid = line_dict['id']
        try:
            releaserUrl = line_dict['releaserUrl']
        except:
            releaserUrl = func_search_reUrl_from_target_index(platform, releaser)
        try:
            releaser_id_str= platform + "_" + get_releaser_id(platform=platform,releaserUrl=releaserUrl)
        except:
            print("error_relaser_id",releaser,platform,releaserUrl)
            continue
        print(releaser,platform)
        if platform == 'miaopai' or platform == "抖音":
            miaopai_list.append(releaser_id_str)


        if releaserUrl != None:
            re_list = []
            # get_target_releaser_video_info(
            #                               platform=platform,
            #                               releaserUrl=releaserUrl,
            #                               releaser_page_num_max=200,
            #                               es_index='crawler-data-raw',
            #                               doc_type='doc'
            #                               )
            if platform in [
                   #"toutiao",
                  #  "haokan",
                   # "new_tudou",
                   # "腾讯视频",
                   # "网易新闻",
                   #  "miaopai",
                   # "腾讯新闻",
                   "kwai",
                   # "抖音"
            ]:
                pass
            else:
                continue
            search_body = {
                    "query": {
                            "bool": {
                                    "filter": [
                                           # {"term": {"platform.keyword": platform}},
                                            # {"term": {"platform.keyword": "toutiao"}},
                                            #{"term": {"data_provider.keyword": "CCR"}},
                                            {"term": {"releaser_id_str": releaser_id_str}},
                                            # {"term": {"releaser.keyword": releaser}},
                                            {"range": {"release_time": {"gte": re_s_t, "lt": re_e_t}}},
                                            # {"range": {"fetch_time": {"gte": 1570670089000 }}}
                                           #{"range": {"fetch_time": {"gte": re_e_t, "lt": 1585843200000}}},
                                           # {"range": {"comment_count": {"gte": 1}}}
                                    ]
                            }
                    }
            }
            scan_re = scan(client=es, index='short-video-all-time-url', doc_type='all-time-url',
                           query=search_body, scroll='3m')
            # scan_re = scan(client=es, index='crawler-data-raw', doc_type="doc",
            #               query=search_body, scroll='3m')
            # scan_re = scan(client=es, index='short-video-production', doc_type="daily-url",
            #                query=search_body, scroll='3m')
            for one_scan in scan_re:
                re_list.append(one_scan['_source'])
                count_has += 1
                if count_has % 500 == 0:
                    print(count_has)
                    print(len(re_list))
                    func_write_into_monthly_index_new_released(re_list, doc_type=monthly_doc_type_name)
                    re_list = []
            func_write_into_monthly_index_new_released(re_list, doc_type=monthly_doc_type_name)
            # break
# for releaser in miaopai_list:
#    re_list = []
#    search_body = {
#                "query": {
#                    "bool": {
#                      "filter": [
#                        #{"term": {"platform.keyword": 'miaopai'}},
#                        {"term": {"releaser_id_str": releaser}},
#                        {"range": {"release_time": {"gte": re_s_t,"lt":re_e_t}}},
#                      # {"range": {"fetch_time": {"gte": 1550505600000}}}
#                        ]
#                    }
#                      }
#                      }
#    scan_re = scan(client=es, index='short-video-all-time-url', doc_type='all-time-url',
#                    query=search_body, scroll='3m')
#    for one_scan in scan_re:
#        re_list.append(one_scan['_source'])
#    func_write_into_monthly_index_new_released(re_list, doc_type=monthly_doc_type_name)