# -*- coding:utf-8 -*- # @Time : 2020/2/19 11:36 # @Author : litao import datetime from elasticsearch import Elasticsearch import json, copy from write_data_into_es.func_get_releaser_id import get_releaser_id from write_data_into_es.func_cal_doc_id import cal_doc_id from write_data_into_es.func_transfer_from_ftp import transfer_from_ftp import logging from urllib.parse import parse_qs,urlparse from elasticsearch.helpers import scan from write_data_into_es.ReleaserMeta import ReleaseMeta hosts = '192.168.17.11' port = 80 user = 'litao' passwd = 'lQSmSEnGZZxl' http_auth = (user, passwd) es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) date_str = datetime.datetime.now().strftime("%Y-%m-%d") loggerName = 'writeES_from_qingbo_%s' % date_str logger = logging.getLogger(loggerName) logger.setLevel(logging.DEBUG) weixin_data_dic = { "releaser": None, "releaser_id_str": None, "timestamp": None, "platform": "weixin", "url": None, "title": None, "play_count": None, "favorite_count": None, "author": None, 'release_time': 0, 'fetch_time': 0, "top": None, "isOriginal": None } def formatter_dict(raw_dict): ts_every_line = int(datetime.datetime.now().timestamp()*1e3) if 'url' in raw_dict: try: fetch_timeT = datetime.datetime.strptime(raw_dict['fetch_time'], '%Y-%m-%d %H:%M:%S') fetch_time_ts = int(fetch_timeT.timestamp()*1e3) except: fetch_time_ts = 0 # # added on May 16 2018 # try: # qingbo_url_regT=datetime.datetime.strptime(raw_dict['fetch_time'], '%Y-%m-%d %H:%M:%S') # qingbo_url_reg_ts=int(qingbo_url_regT.timestamp()*1e3) # except: # qingbo_url_reg_ts=0 try: release_timeT = datetime.datetime.strptime(raw_dict['posttime'], '%Y-%m-%d %H:%M:%S') release_time_ts = int(release_timeT.timestamp()*1e3) except: release_time_ts = 0 if 'author' in raw_dict: releaser = raw_dict['author'] else: releaser = '' if 'comments_count' in raw_dict: comment_count = raw_dict['comments_count'] else: comment_count = 0 if 'favourites_count' in raw_dict: favourite_count = raw_dict['favourites_count'] else: favourite_count = 0 if 'play_count' in raw_dict: play_count = raw_dict['play_count'] else: play_count = 0 if 'play_time' in raw_dict: duration = raw_dict['play_time'] else: duration = 0 if 'reposts_count' in raw_dict: repost_count = raw_dict['reposts_count'] else: repost_count = 0 if 'site_name' in raw_dict: platform = raw_dict['site_name'] if platform == '今日头条': platform = 'toutiao' if platform == '快手': platform = 'kwai' else: platform = '' if "author_url" in raw_dict: try: releaserUrl = raw_dict['author_url'] releaser_id = get_releaser_id(platform=platform, releaserUrl=releaserUrl) if releaser_id: releaser_id_str = platform + "_" + releaser_id else: releaser_id_str = None except: releaser_id_str = None else: if platform == "kwai": releaser_id = get_releaser_id(platform=platform, releaserUrl=raw_dict["url"]) releaser_id_str = platform + "_" + releaser_id releaserUrl = "https://live.kuaishou.com/profile/%s" % releaser_id else: releaserUrl = "" releaser_id_str = None if 'subdomain' in raw_dict: channel = raw_dict['subdomain'] else: channel = '' if 'title' in raw_dict: title = raw_dict['title'] else: title = '' if 'keyword' in raw_dict: video_id = raw_dict['keyword'] else: video_id = '' if 'vid' in raw_dict: vid = raw_dict['vid'] else: vid = '' video_type_str = '' if 'video_type' in raw_dict: video_type_id = raw_dict['video_type'] try: video_type_id_num = int(video_type_id) except: video_type_id_num = 0 if video_type_id_num == 1: video_type_str = 'live-video' if video_type_id_num == 0: video_type_str = 'video' formatted_dict = { 'data_provider': 'qingbo', 'timestamp': ts_every_line, 'video_id' : video_id, 'releaser': releaser, 'comment_count': comment_count, 'favorite_count': favourite_count, 'play_count': play_count, 'fetch_time': fetch_time_ts, 'release_time': release_time_ts, 'duration': duration, 'repost_count': repost_count, 'platform': platform, 'channel': channel, 'title': title, 'url': raw_dict['url'], 'vid': vid, "releaserUrl" : releaserUrl, 'releaser_id_str': releaser_id_str, } if raw_dict.get("author_followers_count"): formatted_dict["releaser_followers_count"] = raw_dict.get("author_followers_count") formatted_dict["releaser_favourites_count"] = raw_dict.get("author_favourites_count") # formatted_dict["releaser_total_video_count"] = raw_dict.get("author_statuses_count") if video_type_str != '': formatted_dict['video_type'] = video_type_str # # added on May 16 2018 # if qingbo_url_reg_ts!=0: # formatted_dict['qingbo_url_reg_time']=qingbo_url_reg_ts return formatted_dict else: return None def write_ATU_TK(loggerName, data_Lst): """ ATU stands for all-time-url, which means data in short-video-production/all-time-url TK stands for time-track, which means data in short-video-time-track/time-track. Passed-in data_Lst should be a list of python dict. Field check should be performed before passing in data_Lst. """ # creater logger logger = logging.getLogger('%s.write_ATU_TK' % loggerName) # es = Elasticsearch(host='192.168.17.11', port=9200) #es = Elasticsearch(host='192.168.17.11', port=9200) es = Elasticsearch(hosts='192.168.17.11', port=80, http_auth=('crawler', 'XBcasfo8dgfs')) index_pro = 'short-video-all-time-url' doc_type_all = 'all-time-url' index_time_track = 'short-video-time-track' doc_type_time_track = 'time-track' # write all-time-url bulk_body_all = '' # write time-track bulk_time_track = '' write_counter = 0 for line_dict in data_Lst: write_counter += 1 platform = line_dict['platform'] fetch_time_ts = line_dict['fetch_time'] url = line_dict['url'] id_all = cal_doc_id(platform=platform, url=url, doc_id_type='all-time-url', data_dict=line_dict) id_time_track = cal_doc_id(platform=platform, url=url, fetch_time_ts=fetch_time_ts, doc_id_type='time-track', data_dict=line_dict) action_str_time_track = '{"index": {"_id":"%s"}}' % id_time_track action_str_all = '{"index": {"_id":"%s"}}' % id_all data_str = json.dumps(line_dict, ensure_ascii=False) line_body_for_bulk_time_track = action_str_time_track + '\n' + data_str + '\n' line_body_for_bulk_all = action_str_all + '\n' + data_str + '\n' bulk_time_track += line_body_for_bulk_time_track bulk_body_all += line_body_for_bulk_all if len(bulk_time_track) > 0: es.bulk(body=bulk_time_track, index=index_time_track, doc_type=doc_type_time_track, request_timeout=200) logger.info('From all %d input lines, write %s/%s with %d lines.' % (len(data_Lst), index_time_track, doc_type_time_track, write_counter)) else: logger.info('From all %d input lines, write %s/%s with 0 lines.' % (len(data_Lst), index_time_track, doc_type_time_track)) if len(bulk_body_all) > 0: es.bulk(body=bulk_body_all, index=index_pro, doc_type=doc_type_all, request_timeout=200) logger.info('From all %d input lines, write %s/%s with %d lines.' % (len(data_Lst), index_pro, doc_type_all, write_counter)) else: logger.info('From all %d input lines, write %s/%s with 0 lines.' % (len(data_Lst), index_pro, doc_type_all)) def write_short_video(file_name): f_raw = open(file_name, 'r', encoding='utf-8') data_Lst = [] line_counter = 0 for line_counter,line in enumerate(f_raw): line_d = json.loads(line) # now_old = datetime.datetime.now() formatted_dict = formatter_dict(line_d) # microtimr = (datetime.datetime.now()-now_old).microseconds # if microtimr > 0.1: # print(formatted_dict["platform"]," ",(datetime.datetime.now()-now_old).microseconds) if formatted_dict is not None: data_Lst.append(formatted_dict) else: print('ignore one line who is lack of url field') if line_counter % 1000 == 0: print('Writing line: %d' % line_counter) write_ATU_TK(file_name, data_Lst) data_Lst.clear() if len(data_Lst) > 0: print('Writing line: %d' % line_counter) write_ATU_TK(file_name, data_Lst) print('Processing file %s done, %s.' % (file_name, datetime.datetime.now())) print('Total lines in file %s: %d.' % (file_name, line_counter)) f_raw.close() def write_weixin_data(file_name): f_raw = open(file_name, 'r', encoding='utf-8') bulk_all_body = "" count_false = 0 for count_true,r in enumerate(f_raw): r = json.loads(r) body = copy.deepcopy(weixin_data_dic) body["releaser"] = r.get("wx_nickname") body["releaser_id_str"] =get_releaser_id(platform="weixin",releaserUrl=r.get("url")) body["wx_id"] = r.get("wx_name") body["timestamp"] = int(datetime.datetime.now().timestamp() * 1e3) body["url"] = r.get("url") body["title"] = r.get("wx_title") body["play_count"] = int(r.get("readnum")) body["favorite_count"] = int(r.get("likenum")) body["author"] = r.get("author") body["release_time"] = int(datetime.datetime.strptime(r.get("posttime"), "%Y-%m-%d %H:%M:%S").timestamp() * 1e3) try: body["fetch_time"] = int(datetime.datetime.strptime(r.get("get_time"), "%Y-%m-%d %H:%M:%S").timestamp() * 1e3) except: body["fetch_time"] = int( datetime.datetime.strptime(r.get("add_time"), "%Y-%m-%d %H:%M:%S").timestamp() * 1e3) body["top"] = r.get("top") query = urlparse(body["url"]).query # wd=python&ie=utf-8 params = parse_qs(query) # {'wd': ['python'], 'ie': ['utf-8']} """所得的字典的value都是以列表的形式存在,若列表中都只有一个值""" result = {key: params[key][0] for key in params} _id = result.get("sn") bulk_head = '{"index": {"_id":"%s"}}' % _id # body["isOriginal"] = r.get("copyright") bulk_body = json.dumps(body, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + bulk_body + '\n' bulk_all_body += bulk_one_body if count_true % 1000 == 0: eror_dic = es.bulk(index="ronghe_weixin_daily", doc_type="doc", body=bulk_all_body, request_timeout=200) bulk_all_body = '' if eror_dic['errors'] is True: count_false = 1 print(eror_dic['items']) print(bulk_all_body) print(count_true) if bulk_all_body != '': eror_dic = es.bulk(index="ronghe_weixin_daily", doc_type="doc", body=bulk_all_body, request_timeout=200) if eror_dic['errors'] is True: count_false = 1 print(eror_dic) f_raw.close() if count_false == 0: return "%s条数据写入成功" % count_true else: return False if __name__ == "__main__": file_name = "wx_2020050618.log" file_in_ftp = transfer_from_ftp(file_name, "/qingbo/qingbo/_reback_data", "./") if "wx_" in file_name: print(write_weixin_data(file_name)) else: write_short_video(file_name)