craw_data_and_write_into_weekly_index.py 12.3 KB
# -*- coding: utf-8 -*-
"""
Created on Thu Sep  6 09:22:24 2018

@author: fangyucheng
"""

#import time
import json 
#import argparse
import datetime
from elasticsearch import Elasticsearch

from elasticsearch.helpers import scan
from func_find_week_num import find_week_belongs_to
from crawler.crawler_sys.framework.platform_crawler_register import get_crawler
from crawler.crawler_sys.utils import trans_format
from func_cal_doc_id import cal_doc_id
from write_data_into_es.func_get_releaser_id import *


hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)

es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
#parser = argparse.ArgumentParser()
#parser.add_argument('-w', '--week_str', type=str, default=None)

def week_start_day(week_year, week_no,week_day, week_day_start=1):
    year_week_start = find_first_day_for_given_start_weekday(week_year, week_day_start)
    week_start = year_week_start + datetime.timedelta(days=(week_no-1)*7)
    return week_start
def define_doc_type(week_year, week_no, week_day_start):
    """
    doc_type = 'daily-url-2018_w24_s2' means select Tuesday as the
    first day of each week, it's year 2018's 24th week.
    In isocalendar defination, Monday - weekday 1, Tuesday - weekday 2,
    ..., Saturday - weekday 6, Sunday -  weekday 7.
    """
    doc_type_str = 'daily-url-%d_w%02d_s%d' % (week_year, week_no, week_day_start)
    return doc_type_str 
    
def find_first_day_for_given_start_weekday(year, start_weekday):
    i = 0
    while i<7:
        dayDi = datetime.date(year, 1, 1) + datetime.timedelta(days=i)
        if dayDi.weekday()==start_weekday:
            cal_day1D = dayDi - datetime.timedelta(days=1)
            break
        else:
            cal_day1D = None
        i += 1
    return cal_day1D
    
#def find_week_belongs_to(dayT, week_start_iso_weekday=1):
#    if isinstance(dayT, datetime.datetime):
#        dayD = datetime.date(dayT.year, dayT.month, dayT.day)
#    elif isinstance(dayT, datetime.date):
#        dayD = dayT
#    else:
#        print('Wrong type, input parameter must be instance of datetime.datetime '
#              'or datetime.date!')
#        return None
#    calendar_tupe = dayD.isocalendar()
#    param_iso_week_no = calendar_tupe[1]
#    param_iso_weekday = calendar_tupe[2]
#    param_iso_week_year = calendar_tupe[0]
#    if isinstance(week_start_iso_weekday, int) and week_start_iso_weekday>0:
#        if week_start_iso_weekday==1: # iso week, Mon is weekday 1, Sun is weekday 7
#            cal_week_no = param_iso_week_no
#            cal_week_year = param_iso_week_year
#            cal_weekday = param_iso_weekday
#            return (cal_week_year, cal_week_no, cal_weekday)
#    else:
#        print('Wrong parameter, must be positive int!')
#        return None

def get_target_releaser_video_info(platform,
                                   releaserUrl,
                                   log_file=None,

                                   output_to_es_raw=True,
                                   es_index=None,
                                   doc_type=None,
                                   releaser_page_num_max=100):
        if log_file == None:
            log_file = open('error.log' ,'w')

        crawler = get_crawler(platform=platform)
        crawler_initialization = crawler()
        if platform == 'haokan':
            try:
                crawler_initialization.releaser_page(releaserUrl=releaserUrl,
                                                     releaser_page_num_max=releaser_page_num_max,
                                                     output_to_es_raw=True,
                                                     es_index=es_index,
                                                     doc_type=doc_type,
                                                     fetchFavoriteCommnt=True)
            except:
                print(releaserUrl, platform, file=log_file)
        else:
            try:
                crawler_initialization.releaser_page(releaserUrl=releaserUrl,
                                                     releaser_page_num_max=releaser_page_num_max,
                                                     output_to_es_raw=True,
                                                     es_index=es_index,
                                                     doc_type=doc_type)
            except:
                print(releaserUrl, platform, file=log_file)



def func_search_reUrl_from_target_index(platform, releaser):
    search_body = {
            "query": {
                "bool": {
                  "filter": [
                    {"term": {"platform.keyword": platform}},
                    {"term": {"releaser.keyword": releaser}}
                    ]
                }
                  }
                   }
    search_re = es.search(index='target_releasers', doc_type='doc', body=search_body)
    if search_re['hits']['total'] > 0:
        return search_re['hits']['hits'][0]['_source']['releaserUrl']
    else:
        print('Can not found:', platform, releaser)
        return None
def func_write_into_weekly_index_new_released(line_list, doc_type, index='short-video-weekly'):
    count = 0 
    bulk_all_body = ''
    re_list = []
    for line in line_list:
        count = count + 1 
        weekly_net_inc_play_count = line['play_count']
        weekly_net_inc_comment_count = line['comment_count']
        weekly_net_inc_favorite_count = line['favorite_count']
        try:
            weekly_net_inc_repost_count = line['repost_count']
        except:
            weekly_net_inc_repost_count = 0
        weekly_cal_base = 'accumulate'
        timestamp = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)

        line.update({
            'timestamp': timestamp,
            'weekly_cal_base': weekly_cal_base,
            'weekly_net_inc_favorite_count': weekly_net_inc_favorite_count,
            'weekly_net_inc_comment_count': weekly_net_inc_comment_count,
            'weekly_net_inc_play_count': weekly_net_inc_play_count,
            'weekly_net_inc_repost_count': weekly_net_inc_repost_count,
        })
        re_list.append(line)
        # if 'video_id' in line.keys():
        #     line.pop('video_id')
        url = line['url']

        platform = line['platform']
        #print(platform)
        doc_id = cal_doc_id(platform, url=url, doc_id_type='all-time-url',data_dict=line)
        #print(doc_id)
        bulk_head = '{"index": {"_id":"%s"}}' % doc_id
        data_str = json.dumps(line, ensure_ascii=False)

        bulk_one_body = bulk_head + '\n' + data_str + '\n'
        #
        bulk_all_body += bulk_one_body
        if count%500 == 0:
             eror_dic = es.bulk(index=index, doc_type=doc_type,
                    body=bulk_all_body, request_timeout=200)
             bulk_all_body=''
             if eror_dic['errors'] is True:
                 print(eror_dic['items'])
                 print(bulk_all_body)
             print(count)

    if bulk_all_body != '':
     eror_dic = es.bulk(body=bulk_all_body,
                        index=index,
                        doc_type=doc_type ,
                        request_timeout=200)
     if eror_dic['errors'] is True:
         print(eror_dic)
     print(count)
            
#def func_search_data_to_write_from_index_craw(re_s, re_e,releaser,platform):
#    search_body = {  
#            "query": {
#                "bool": {
#                  "filter": [
#                    {"term": {"platform.keyword": platform}},
#                    {"term": {"releaser.keyword": releaser}},
#                    {"range": {"release_time": {"gte": re_s,"lt":re_e}}}
#                    ]
#                }
#                  }
#                   }
    
todayT = datetime.datetime.now()
#todayT=datetime.datetime(2019,2,5)
week_day_start = 1
#if args.week_str is None:
seven_days_ago_T = todayT - datetime.timedelta(days=9)
week_year, week_no, week_day = find_week_belongs_to(seven_days_ago_T,
                                                    week_day_start)
week_start = week_start_day(week_year,week_no,week_day)
re_s = week_start - datetime.timedelta(1)
re_s_dt = datetime.datetime.strptime(str(re_s), '%Y-%m-%d')
re_s_t = int(datetime.datetime.timestamp(re_s_dt)*1000)

re_e = week_start + datetime.timedelta(6)
re_e_dt = datetime.datetime.strptime(str(re_e), '%Y-%m-%d')
re_e_t = int(datetime.datetime.timestamp(re_e_dt)*1000)
#    nowT_feihua = week_start + datetime.timedelta(days=6)
weekly_doc_type_name = define_doc_type(week_year, week_no +1,
                                       week_day_start=week_day_start)
miaopai_list = []
# file = r'D:\work_file\发布者账号\anhui.csv'
file = r'D:\work_file\发布者账号\一次性需求附件\month10month9_releaser_video_num预警 2019-11-05.csv'
file = r'D:\work_file\发布者账号\周数据账号\周.csv'
with open(file, 'r',encoding="gb18030")as f:
    header_Lst = f.readline().strip().split(',')
    for line in f:
        count_has = 0
        line_Lst = line.strip().split(',')
        line_dict = dict(zip(header_Lst, line_Lst))
        releaser = line_dict['releaser']
        platform = line_dict['platform']
        # if platform != "haokan":
        #     continue
        try:
            releaserUrl = line_dict['releaserUrl']
        except:
            releaserUrl = func_search_reUrl_from_target_index(platform, releaser)
        if platform == 'miaopai':
            miaopai_list.append(releaser)
        releaserUrl = func_search_reUrl_from_target_index(platform, releaser)
        print(releaserUrl)
        line_dict["releaser_id"] = get_releaser_id(platform=platform, releaserUrl=releaserUrl)
        if line_dict["releaser_id"]:
            doc_id = line_dict['platform'] + '_' + line_dict['releaser_id']
        else:
            print("error_url ", releaserUrl)
        if releaserUrl != None:
            re_list = []
            # get_time = get_target_releaser_video_info(
            #                               platform=platform,
            #                               releaserUrl=releaserUrl,
            #                               releaser_page_num_max=200,
            #                               es_index='crawler-data-raw',
            #                               doc_type='doc'
            #                               )
            if platform != "kwai":
                continue
            search_body = {
            "query": {
                "bool": {
                  "filter": [
                   {"term": {"platform.keyword": platform}},
                   # {"term": {"platform.keyword": "toutiao"}},
                   {"term": {"releaser_id_str.keyword": doc_id}},
                          #{"range": {"release_time": {"gte": 1563638400000}}},
                    {"range": {"release_time": {"gte": re_s_t,"lt":re_e_t}}},
                    {"range": {"fetch_time": {"gte": re_s_t}}},

                    ]
                }
                  }
                   }

            scan_re = scan(client=es, index='crawler-data-raw', doc_type='doc',
                          query=search_body, scroll='3m')
            # scan_re = scan(client=es, index='short-video-all-time-url', doc_type='all-time-url',
            #                query=search_body, scroll='3m')
            for one_scan in scan_re:
                re_list.append(one_scan['_source'])
                count_has += 1
                if count_has % 500 == 0:
                    print(count_has)
                    print(len(re_list))
                    func_write_into_weekly_index_new_released(re_list, doc_type=weekly_doc_type_name)
                    re_list = []
            func_write_into_weekly_index_new_released(re_list, doc_type=weekly_doc_type_name)
        # break
# for releaser in miaopai_list:
#    re_list = []
#    search_body = {
#                "query": {
#                    "bool": {
#                      "filter": [
#                        {"term": {"platform.keyword": 'miaopai'}},
#                        {"term": {"releaser.keyword": releaser}},
#                        {"range": {"release_time": {"gte": re_s_t,"lt":re_e_t}}},
#
#                        ]
#                    }
#                      }
#                      }
#    scan_re = scan(client=es, index='short-video-all-time-url', doc_type='all-time-url',
#                    query=search_body, scroll='3m')
#    for one_scan in scan_re:
#        re_list.append(one_scan['_source'])
#    func_write_into_weekly_index_new_released(re_list, doc_type=weekly_doc_type_name)