# -*- coding: utf-8 -*- """ Created on Tue Feb 12 17:32:31 2019 @author: zhouyujiang 讲计算增量分为两部分 一部分计算历史数据 另一部分计算新发布数据 写入redis计算 写入 redis 采用pipeline 速度提升 从redis读取,采用管道不合适 """ import redis import hashlib import datetime import json from elasticsearch import Elasticsearch from elasticsearch.helpers import scan hosts = '192.168.17.11' port = 9200 es = Elasticsearch(hosts=hosts, port=port, timeout=30) def cal_id_hash(doc_id, cal_type): releaser_hash = ('%s_%s' % (cal_type, hashlib.md5( doc_id.encode('utf-8')).hexdigest())) return releaser_hash def scan_index(index, doc_type, search_body, log_file): search_resp = es.search(index=index, doc_type=doc_type, body=search_body, size=0) total_hit = search_resp['hits']['total'] print('Index: %s total hit: %d' % (index, total_hit), file=log_file) if total_hit > 0: scan_resp = scan(client=es, query=search_body, index=index, doc_type=doc_type, request_timeout=300) else: print('Zero hit.', file=log_file) scan_resp = None return (total_hit, scan_resp) def form_bulk_body(line_data, doc_id): line_data.update({'timestamp': int(datetime.datetime.now().timestamp()*1e3)}) action_json_line_str = '{"index": {"_id":"' + doc_id + '"}}' line_data.pop('_id', None) data_json_line_str = json.dumps(line_data, ensure_ascii=False) line_bulk_body = (action_json_line_str + '\n' + data_json_line_str + '\n') return line_bulk_body def bulk_write(data_list, doc_type,index): bulk_body = '' for data_dict in data_list: doc_id = data_dict['_id'] line_bulk_body = form_bulk_body(data_dict['_source'], doc_id) bulk_body += line_bulk_body try: error = es.bulk(body=bulk_body, index=index, doc_type=doc_type, request_timeout=100) except: print(error) data_list.clear() def func_cal_increment(index_last, doc_type_last, cal_type, index_now, doc_type_now, search_body=None, now_body=None, release_time_st_last=None, release_time_et_last=None, redis_db=10, log_f=None, limit_platform=None, target_id="_id"): if log_f is None: is_close = True file_path = '/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log/cal_NI_{dstr}.log'.format(dstr=str(datetime.datetime.now())[0:10]) log_file = open(file_path, 'a') else: log_file = log_f is_close = False rds = redis.StrictRedis(host='192.168.17.60', port=6379, db=redis_db) rds.flushdb() pipe = rds.pipeline() re_list = [] rds_cpunt = 0 count = 0 if cal_type == 'M': cal_dict = {'cal_base':'monthly_cal_base', 'net_inc_repost_count': 'monthly_net_inc_repost_count', 'net_inc_play_count':'monthly_net_inc_play_count', 'net_inc_comment_count':"monthly_net_inc_comment_count", 'net_inc_favorite_count':"monthly_net_inc_favorite_count", 'net_inc_play_count_real': 'monthly_net_inc_play_count_real', 'net_inc_comment_count_real': 'monthly_net_inc_comment_count_real', 'net_inc_favorite_count_real': 'monthly_net_inc_favorite_count_real', 'net_inc_repost_count_real': 'monthly_net_inc_repost_count_real', } print('will use %s'%cal_dict, file=log_file) if cal_type == 'W': cal_dict = {'cal_base':'weekly_cal_base', 'net_inc_repost_count': 'weekly_net_inc_repost_count', 'net_inc_play_count':'weekly_net_inc_play_count', 'net_inc_comment_count':"weekly_net_inc_comment_count", 'net_inc_favorite_count':"weekly_net_inc_favorite_count", 'net_inc_play_count_real': 'weekly_net_inc_play_count_real', 'net_inc_comment_count_real': 'weekly_net_inc_comment_count_real', 'net_inc_favorite_count_real': 'weekly_net_inc_favorite_count_real', 'net_inc_repost_count_real': 'weekly_net_inc_repost_count_real', } print('will use %s'%cal_dict, file=log_file) if cal_type == 'N': cal_dict = { 'cal_base':'netly_cal_base', 'net_inc_repost_count': 'net_inc_repost_count', 'net_inc_play_count':'net_inc_play_count', 'net_inc_comment_count':"net_inc_comment_count", 'net_inc_favorite_count':"net_inc_favorite_count", 'net_inc_play_count_real': 'net_inc_play_count_real', 'net_inc_comment_count_real': 'net_inc_comment_count_real', 'net_inc_favorite_count_real': 'net_inc_favorite_count_real', 'net_inc_repost_count_real': 'net_inc_repost_count_real', } if search_body == None: if release_time_st_last != None and release_time_et_last != None: scan_body = { "query": { "bool": { "filter": [ {"range": {"release_time": {"gte": release_time_st_last, "lt":release_time_et_last}}} ] } } } else: print('GET error in release_time') return None else: scan_body = search_body print(scan_body) if limit_platform!= None: platform_dict = {"terms": {"platform.keyword": limit_platform}}, scan_body["query"]['bool']['filter'].append(platform_dict) print('last_search:', scan_body) total_hit, scan_resp = scan_index(index_last, doc_type_last, scan_body, log_file) print('%s _ %s total have %s'%(index_last, doc_type_last, total_hit), file=log_file) print('start write into redis', datetime.datetime.now(), file=log_file) for one_scan in scan_resp: rds_cpunt = rds_cpunt + 1 if rds_cpunt %10000 == 0 or rds_cpunt == total_hit: print('write into redis {rds_cpunt}/{total_hit}'.format(rds_cpunt=rds_cpunt,total_hit=total_hit),datetime.datetime.now(), file=log_file) pipe.execute() line_dcit = one_scan['_source'] if target_id == "_id": doc_id = one_scan[target_id] else: doc_id = one_scan['_source'][target_id] doc_id_hash = cal_id_hash(doc_id, cal_type) if 'repost_count' in line_dcit: try: data_dict_for_redis = {'play_count': line_dcit.get('play_count'), 'favorite_count': line_dcit['favorite_count'], 'comment_count': line_dcit['comment_count'], 'repost_count': line_dcit['repost_count']} except Exception as e: print(180, e, line_dcit, file=log_file) continue else: try: data_dict_for_redis = {'play_count': line_dcit.get('play_count'), 'favorite_count': line_dcit['favorite_count'], 'comment_count': line_dcit['comment_count']} except Exception as e: print(186, e, line_dcit, file=log_file) continue pipe.hmset(doc_id_hash, data_dict_for_redis) pipe.execute() print('end write into redis', datetime.datetime.now(), file=log_file) print('start cal IN', datetime.datetime.now(), file=log_file) if now_body == None: search_now_body = { "query": { "bool": { "filter": [ {"range": {"release_time": {"gte": release_time_st_last, "lt":release_time_et_last}}} ] } } } else: search_now_body = now_body if limit_platform!=None: platform_dict = {"terms": {"platform.keyword": limit_platform}}, search_now_body["query"]['bool']['filter'].append(platform_dict) print('this_search:', search_now_body) total_hit_now, scan_resp_now = scan_index(index_now, doc_type_now, search_now_body, log_file) print('%s _ %s total have %s'%(index_now, doc_type_now, total_hit_now), file=log_file) for one_scan_now in scan_resp_now: count = count + 1 line_dcit_now = one_scan_now['_source'] if target_id == "_id": doc_id = one_scan_now[target_id] else: doc_id = one_scan_now['_source'][target_id] doc_id_hash = cal_id_hash(doc_id, cal_type) if rds.exists(doc_id_hash): data_dict_in_redis = rds.hgetall(doc_id_hash) try: cal_base = 'historical_complete' try: net_inc_play_count = line_dcit_now['play_count'] - int(data_dict_in_redis[b'play_count']) # print(doc_id,line_dcit_now['play_count'],int(data_dict_in_redis[b'play_count'])) except: net_inc_play_count = 0 net_inc_comment_count = line_dcit_now['comment_count'] - int(data_dict_in_redis[b'comment_count']) net_inc_favorite_count = line_dcit_now['favorite_count'] - int(data_dict_in_redis[b'favorite_count']) except Exception as e: print(e, line_dcit_now, file=log_file) continue if 'repost_count' in line_dcit_now and b'repost_count' in data_dict_in_redis: try: net_inc_repost_count = line_dcit_now['repost_count'] - int(data_dict_in_redis[b'repost_count']) except Exception as e: print(e, line_dcit_now, file=log_file) else: net_inc_repost_count = 0 if net_inc_play_count < 0 or net_inc_comment_count < 0 or net_inc_favorite_count < 0 or net_inc_repost_count < 0: cal_base = 'historical_complete_change_MNI' if net_inc_play_count < 0: net_inc_play_count_real = net_inc_play_count net_inc_play_count = 0 else: net_inc_play_count_real = 0 if net_inc_comment_count < 0: net_inc_comment_count_real = net_inc_comment_count net_inc_comment_count = 0 else: net_inc_comment_count_real = 0 if net_inc_favorite_count < 0: net_inc_favorite_count_real = net_inc_favorite_count net_inc_favorite_count = 0 else: net_inc_favorite_count_real = 0 if net_inc_repost_count < 0: net_inc_repost_count_real = net_inc_repost_count net_inc_repost_count = 0 else: net_inc_repost_count_real = 0 one_scan_now['_source'].update({cal_dict['cal_base']:cal_base, cal_dict['net_inc_repost_count']:net_inc_repost_count, cal_dict['net_inc_play_count']:net_inc_play_count, cal_dict['net_inc_comment_count']:net_inc_comment_count, cal_dict['net_inc_favorite_count']:net_inc_favorite_count, cal_dict['net_inc_repost_count_real']:net_inc_repost_count_real, cal_dict['net_inc_play_count_real']:net_inc_play_count_real, cal_dict['net_inc_comment_count_real']:net_inc_comment_count_real, cal_dict['net_inc_favorite_count_real']:net_inc_favorite_count_real, }) else: one_scan_now['_source'].update({cal_dict['cal_base']:cal_base, cal_dict['net_inc_play_count']:net_inc_play_count, cal_dict['net_inc_comment_count']:net_inc_comment_count, cal_dict['net_inc_favorite_count']:net_inc_favorite_count, cal_dict['net_inc_repost_count']:net_inc_repost_count }) re_list.append(one_scan_now) if count%1000 == 0 or count == total_hit_now: print(str(count) + "/" + str(total_hit_now), file=log_file) bulk_write(data_list=re_list, doc_type=doc_type_now, index=index_now) re_list.clear() elif cal_dict['net_inc_play_count'] in one_scan_now['_source']: one_scan_now['_source'].pop(cal_dict['net_inc_play_count']) one_scan_now['_source'].pop(cal_dict['net_inc_comment_count']) one_scan_now['_source'].pop(cal_dict['net_inc_favorite_count']) cal_base= 'historical_not_found' # one_scan_now['_source'].update({cal_dict['cal_base']:cal_base, # cal_dict['net_inc_repost_count']:0, # cal_dict['net_inc_play_count']:0, # cal_dict['net_inc_comment_count']:0, # cal_dict['net_inc_favorite_count']:0, # cal_dict['net_inc_repost_count_real']:0, # cal_dict['net_inc_play_count_real']:0, # cal_dict['net_inc_comment_count_real']:0, # cal_dict['net_inc_favorite_count_real']:0, # }) re_list.append(one_scan_now) if count%1000 == 0 or count == total_hit_now: print(str(count) + "/" + str(total_hit_now), file=log_file) bulk_write(data_list=re_list, doc_type=doc_type_now, index=index_now) re_list.clear() if re_list != []: bulk_write(data_list=re_list, doc_type=doc_type_now, index=index_now) re_list.clear() if is_close: log_file.close() def func_cal_new_released_NI(cal_type, index_now, doc_type_now, exter_list=None, search_body=None, release_time_st_now=None, release_time_et_now=None, log_f=None, exter_dict=None, limit_platform=None): if log_f is None: is_close = True file_path = '/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log/cal_NI_{dstr}.log'.format(dstr=str(datetime.datetime.now())[0:10]) log_file = open(file_path, 'a') else: log_file = log_f is_close = False count = 0 re_list = [] if cal_type == 'M': cal_dict = {'cal_base':'monthly_cal_base', 'net_inc_repost_count': 'monthly_net_inc_repost_count', 'net_inc_play_count':'monthly_net_inc_play_count', 'net_inc_comment_count':"monthly_net_inc_comment_count", 'net_inc_favorite_count':"monthly_net_inc_favorite_count" } print('will use %s'%cal_dict, file=log_file) if cal_type == 'W': cal_dict = {'cal_base':'weekly_cal_base', 'net_inc_repost_count': 'weekly_net_inc_repost_count', 'net_inc_play_count':'weekly_net_inc_play_count', 'net_inc_comment_count':"weekly_net_inc_comment_count", 'net_inc_favorite_count':"weekly_net_inc_favorite_count" } print('will use %s'%cal_dict, file=log_file) if cal_type == 'N': cal_dict = { 'cal_base':'netly_cal_base', 'net_inc_repost_count': 'net_inc_repost_count', 'net_inc_play_count':'net_inc_play_count', 'net_inc_comment_count':"net_inc_comment_count", 'net_inc_favorite_count':"net_inc_favorite_count" } if search_body == None: search_now_body = { "query": { "bool": { "filter": [ {"range": {"release_time": {"gte": release_time_st_now, "lt":release_time_et_now}}} ] #, # "must_not": [ # {"exists": {"field": "monthly_net_inc_play_count"}} # ] } } } if exter_dict != None: try: print(exter_list) for one_exter in exter_list: search_now_body['query']['bool']['filter'].append(one_exter) except: print(exter_list) else: search_now_body = search_body if limit_platform != None: platform_dict = {"terms": {"platform.keyword": limit_platform}}, search_now_body["query"]['bool']['filter'].append(platform_dict) print('this_search:', search_now_body) total_hit_now, scan_resp_now = scan_index(index_now, doc_type_now, search_now_body, log_file) print('%s _ %s in total have %s'%(index_now, doc_type_now,total_hit_now), file=log_file) for one_scan_now in scan_resp_now: count = count + 1 line_dcit_now = one_scan_now['_source'] try: cal_base = 'accumulate' net_inc_play_count = line_dcit_now.get('play_count') if not net_inc_play_count: net_inc_play_count = 0 net_inc_comment_count = line_dcit_now['comment_count'] net_inc_favorite_count = line_dcit_now['favorite_count'] except Exception as e: print(374, e, line_dcit_now, file=log_file) continue if 'repost_count' in line_dcit_now: try: net_inc_repost_count = line_dcit_now['repost_count'] except Exception as e: print(380, e, line_dcit_now, file=log_file) continue else: net_inc_repost_count = 0 one_scan_now['_source'].update({cal_dict['cal_base']:cal_base, cal_dict['net_inc_play_count']:net_inc_play_count, cal_dict['net_inc_comment_count']:net_inc_comment_count, cal_dict['net_inc_favorite_count']:net_inc_favorite_count, cal_dict['net_inc_repost_count']:net_inc_repost_count, }) re_list.append(one_scan_now) if count%1000 ==0 or count == total_hit_now: bulk_write(data_list=re_list, doc_type=doc_type_now, index=index_now) if is_close: log_file.close() if __name__ == '__main__': # release_time_st_last = 1543593600000 # release_time_et_last = 1546272000000 # index_last = 'short-video-production-2018' # doc_type_last = 'daily-url-2018-12-31' cal_type = 'M' release_time_st_now = 1546272000000 release_time_et_now = 1548950400000 index_now = 'short-video-production-2019' doc_type_now = 'daily-url-2019-01-31' exter_dict = [{"term": {"releaser.keyword": "看看新闻Knews"}}, {"term": {"platform.keyword": "haokan"}}] func_cal_new_released_NI(cal_type=cal_type, release_time_st_now=release_time_st_now, release_time_et_now=release_time_et_now, index_now=index_now, doc_type_now=doc_type_now, exter_list=exter_dict )