# -*- coding: utf-8 -*- """ Created on Thu Aug 9 16:47:12 2018 @author: zhouyujiang """ import elasticsearch import datetime from elasticsearch.helpers import scan # import pandas as pd import json from func_cal_doc_id import cal_doc_id # from urllib import parse hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) target_index = 'short-video-irregular' target_type = '2018-month-1to10' from_index = 'short-video-irregular' from_type = 'xinjingbao_3m' fn = r'C:\Users\zhouyujiang\cuowu3.csv' bulk_all_body = '' #target_date_list = target_type.split('-') #target_date_start = datetime.datetime(int(target_date_list[-3]), int(target_date_list[-2]), 1) #target_date_end = datetime.datetime(int(target_date_list[-3]), int(target_date_list[-2]) + 1, 1) #target_ts_start = int(target_date_start.timestamp()) * 1000 #target_ts_end = int(target_date_end.timestamp()) * 1000 #print(target_ts_start) #print(target_ts_end) with open(fn, 'r', encoding='gb18030')as f: bulk_all_body = '' head = f.readline() head_list = head.strip().split(',') for i in f: ll_list = [] line_list = i.strip().split(',') test_dict = dict(zip(head_list, line_list)) releaser = test_dict['releaser'] platform = test_dict['platform'] wirte_set = set() search_body = { "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, {"term": {"releaser.keyword": releaser}}, {"exists": {"field": "play_count"}}, {"range": {"release_time": {"gte": 1519833600000, "lt": 1522512000000}}}, {"range": {"duration": {"lte": 600}}} ] } } } q3_re = es.search(index=target_index, doc_type=target_type, body=search_body) q3_total = q3_re['hits']['total'] write_into_scan = scan(client=es, query=search_body, index=target_index, doc_type=target_type, scroll='5m', request_timeout=100 ) for one_scan in write_into_scan: have_id = one_scan['_id'] wirte_set.add(have_id) print(platform, releaser, 'start_have', len(wirte_set)) # search_body['query']['bool']['filter'].append({"range": {"fetch_time": # {"gte": 1547539200000}}}) scan_re = scan(client=es, query=search_body, index=from_index, doc_type=from_type, scroll='5m', request_timeout=100 ) count = 0 set_url = set() for one_scan in scan_re: # print(one_scan) count = count + 1 line = one_scan['_source'] url = line['url'] platform = line['platform'] if platform=='腾讯新闻': doc_id = cal_doc_id(platform, data_dict=line, doc_id_type='all-time-url') else: doc_id = cal_doc_id(platform, url=url, doc_id_type='all-time-url') # print(doc_id) if doc_id not in wirte_set: wirte_set.add(doc_id) if doc_id not in set_url: set_url.add(doc_id) platform = line['platform'] data_provider = 'CCR' weekly_net_inc_play_count = line['play_count'] weekly_net_inc_comment_count = line['comment_count'] weekly_net_inc_favorite_count = line['favorite_count'] weekly_cal_base = 'accumulate' timestamp = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) line.update({ 'timestamp': timestamp, # 'weekly_cal_base': weekly_cal_base, # 'weekly_net_inc_favorite_count': weekly_net_inc_favorite_count, # 'weekly_net_inc_comment_count': weekly_net_inc_comment_count, # 'weekly_net_inc_play_count': weekly_net_inc_play_count, 'data_provider': data_provider }) if 'video_id' in line.keys(): line.pop('video_id') bulk_head = '{"index": {"_id":"%s"}}' % doc_id data_str = json.dumps(line, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + data_str + '\n' # bulk_all_body += bulk_one_body if count%100 == 0: eror_dic=es.bulk(index=target_index, doc_type=target_type, body=bulk_all_body, request_timeout=200) bulk_all_body='' if eror_dic['errors'] is True: print(eror_dic['items']) print(bulk_all_body) print(count) if bulk_all_body != '': eror_dic = es.bulk(body=bulk_all_body, index=target_index, doc_type=target_type , request_timeout=200) if eror_dic['errors'] is True: print(eror_dic) bulk_all_body = '' print(platform, releaser, 'end_have:', len(wirte_set), 'add:', len(set_url)) print(111)