import requests import json, hashlib import datetime import elasticsearch from write_data_into_es.func_cal_doc_id import * hosts = '192.168.17.11' port = 80 user = 'zhouyujiang' passwd = '8tM9JDN2LVxM' http_auth = (user, passwd) es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) import csv def write_es(Lst): count = 0 bulk_all_body = "" doc_id_type = "all-time-url" header_Lst = Lst[0] linec = 1 for line in Lst: if linec == 1: linec += 1 continue linec += 1 line_dict = dict(zip(header_Lst, line)) dic = line_dict dic["timestamp"] = int(datetime.datetime.now().timestamp() * 1e3) dic["channel_id"] = dic["channel_id"].zfill(4) # ID为如下字段哈希值 # TV_station + platform + channel + releaser_platform + releaser + releaser_platform + releaserUrl eid = dic["tv_station"] + dic["platform"] + dic["channel"] + dic["program"] + dic.get("releaser") + dic.get( "releaser_platform") + dic["releaserUrl"] print(eid) sha1 = hashlib.sha1() sha1.update(eid.encode("utf8")) bulk_head = '{"index": {"_id":"%s"}}' % sha1.hexdigest() data_str = json.dumps(dic, ensure_ascii=False) bulk_one_body = bulk_head + '\n' + data_str + '\n' bulk_all_body += bulk_one_body count += 1 if count % 1000 == 0: eror_dic = es.bulk(index=target_index, doc_type=target_type, body=bulk_all_body, request_timeout=200) bulk_all_body = '' if eror_dic['errors'] is True: print(eror_dic['items']) print(bulk_all_body) print(count) if bulk_all_body != '': eror_dic = es.bulk(body=bulk_all_body, index=target_index, doc_type=target_type, request_timeout=200) if eror_dic['errors'] is True: print(eror_dic) bulk_all_body = '' # print(platform, releaser, 'end_have:', len(wirte_set), 'add:', len(set_url)) print(count) if __name__ == '__main__': target_index = 'ronghe_target_releaser' target_type = 'doc' m3 = open(r"D:\work_file\发布者账号\融媒账号列表\电视新闻媒体栏目、账号与APP汇总(含中央) 对照表(技术版)0718-2.csv", "r", encoding="gb18030") file = csv.reader(m3) data = list(file) write_es(data)