ronghe_releaser_csv_to_es.py 2.47 KB
import requests
import json, hashlib
import datetime
import elasticsearch
from write_data_into_es.func_cal_doc_id import *

hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
import csv


def write_es(Lst):
    count = 0
    bulk_all_body = ""
    doc_id_type = "all-time-url"
    header_Lst = Lst[0]
    linec = 1
    for line in Lst:
        if linec == 1:
            linec += 1
            continue
        linec += 1

        line_dict = dict(zip(header_Lst, line))
        dic = line_dict
        dic["timestamp"] = int(datetime.datetime.now().timestamp() * 1e3)
        dic["channel_id"] = dic["channel_id"].zfill(4)
        # ID为如下字段哈希值
        # TV_station + platform + channel + releaser_platform + releaser + releaser_platform + releaserUrl
        eid = dic["tv_station"] + dic["platform"] + dic["channel"] + dic["program"] + dic.get("releaser") + dic.get(
            "releaser_platform") + dic["releaserUrl"]
        print(eid)
        sha1 = hashlib.sha1()
        sha1.update(eid.encode("utf8"))
        bulk_head = '{"index": {"_id":"%s"}}' % sha1.hexdigest()
        data_str = json.dumps(dic, ensure_ascii=False)
        bulk_one_body = bulk_head + '\n' + data_str + '\n'
        bulk_all_body += bulk_one_body
        count += 1
        if count % 1000 == 0:
            eror_dic = es.bulk(index=target_index, doc_type=target_type,
                               body=bulk_all_body, request_timeout=200)
            bulk_all_body = ''
            if eror_dic['errors'] is True:
                print(eror_dic['items'])
                print(bulk_all_body)
            print(count)

    if bulk_all_body != '':
        eror_dic = es.bulk(body=bulk_all_body,
                           index=target_index,
                           doc_type=target_type,
                           request_timeout=200)
        if eror_dic['errors'] is True:
            print(eror_dic)
            bulk_all_body = ''
            # print(platform, releaser, 'end_have:', len(wirte_set), 'add:', len(set_url))
        print(count)


if __name__ == '__main__':
    target_index = 'ronghe_target_releaser'
    target_type = 'doc'

    m3 = open(r"D:\work_file\发布者账号\融媒账号列表\电视新闻媒体栏目、账号与APP汇总(含中央) 对照表(技术版)0718-2.csv", "r", encoding="gb18030")
    file = csv.reader(m3)
    data = list(file)
    write_es(data)