# -*- coding: utf-8 -*- """ Created on Wed Jun 6 18:14:32 2018 @author: hanye """ import datetime import json import sys from func_cal_doc_id import cal_doc_id from es_hy_sign_in import es def bulk_write_short_video(dict_Lst, index, doc_type, doc_id_type, client=es, f_log=sys.stdout ): bulk_write_bd = '' write_counter = 0 for line in dict_Lst: try: url = line['url'] platform = line['platform'] fetch_time_ts = line['fetch_time'] fetch_time_T = datetime.datetime.fromtimestamp(fetch_time_ts/1e3) fetch_time_day_str = fetch_time_T.isoformat()[:10] doc_id = cal_doc_id(platform, url, fetch_day_str=fetch_time_day_str, fetch_time_ts=fetch_time_ts, doc_id_type=doc_id_type, data_dict=line) line['timestamp'] = int(datetime.datetime.now().timestamp()*1e3) data_str = json.dumps(line, ensure_ascii=False) action_str_daily = '{ "index" : {"_id" : "%s" }}' % doc_id line_body_for_daily = action_str_daily + '\n' + data_str + '\n' bulk_write_bd += line_body_for_daily write_counter += 1 except: print('Failed when building bulk_write body with %s' % line, file=f_log) continue if bulk_write_bd != '': t1 = datetime.datetime.now() try: bulk_resp = client.bulk(index=index, doc_type=doc_type, body=bulk_write_bd, request_timeout=300) t2 = datetime.datetime.now() td = t2 - t1 print('written %d lines into %s/%s, costs %s,' % (write_counter, index, doc_type, td), datetime.datetime.now(), file=f_log) bulk_write_bd = '' return bulk_resp except: print('Failed to perform bulk write with bulk_write_body:\n%s' % bulk_write_bd, file=f_log) f_log.flush() return None else: print('Got empty bulk_write_body.', file=f_log)