# -*- coding: utf-8 -*- """ Created on Thu Aug 9 16:47:12 2018 @author: zhouyujiang """ import elasticsearch import datetime from elasticsearch.helpers import scan import pandas as pd import json from func_cal_doc_id import cal_doc_id from urllib import parse hosts='192.168.17.11' port=80 user='zhouyujiang' passwd='8tM9JDN2LVxM' http_auth=(user, passwd) es=elasticsearch.Elasticsearch(hosts=hosts, port=port, http_auth=http_auth) index='short-video-production' doc_type='daily-url' set_url = set() count=0 with open('buurl.csv') as f: header_Lst=f.readline().strip().split(',') for line in f: line_Lst=line.strip().split(',') line_dict=dict(zip(header_Lst, line_Lst)) releaser=line_dict['releaser'] platform=line_dict['platform'] bulk_all_body='' search_body={ "query": { "bool": { "filter": [ {"term": {"platform.keyword": platform}}, {"term": {"releaser.keyword": releaser}}, {"range": {"release_time": {"gte": 1538323200000,"lt":1541001600000}}}, {"range": {"fetch_time": {"gte": 1541692800000,"lt":1542038400000}}} ] } } } scan_re=scan(client=es, query=search_body, index=index, doc_type=doc_type, scroll='5m', request_timeout=100 ) for one_scan in scan_re: count=count+1 line=one_scan['_source'] url = line['url'] platform = line['platform'] if url not in set_url: set_url.add(url) platform=line['platform'] if platform=='腾讯新闻': doc_id = cal_doc_id(platform, data_dict=line, doc_id_type='all-time-url') else: doc_id = cal_doc_id(platform, url=url,doc_id_type='all-time-url') print(doc_id) monthly_net_inc_play_count = line['play_count'] monthly_net_inc_comment_count = line['comment_count'] monthly_net_inc_favorite_count = line['favorite_count'] monthly_cal_base = 'accumulated_values' timestamp = int(datetime.datetime.timestamp(datetime.datetime.now())*1000) line.update({ 'timestamp':timestamp, 'monthly_cal_base':monthly_cal_base, 'monthly_net_inc_favorite_count':monthly_net_inc_favorite_count, 'monthly_net_inc_comment_count':monthly_net_inc_comment_count, 'monthly_net_inc_play_count':monthly_net_inc_play_count }) bulk_head='{"index": {"_id":"%s"}}' % doc_id data_str=json.dumps(line, ensure_ascii=False) bulk_one_body = bulk_head+'\n'+data_str+'\n' bulk_all_body += bulk_one_body if count%100 == 0: eror_dic=es.bulk(index='short-video-production', doc_type='daily-url-2018-10-31', body=bulk_all_body, request_timeout=200) bulk_all_body='' if eror_dic['errors'] is True: print(eror_dic['items']) print(bulk_all_body) print(count) if bulk_all_body != '': eror_dic = es.bulk(body=bulk_all_body, index='short-video-production', doc_type='daily-url-2018-10-31' , request_timeout=200) if eror_dic['errors'] is True: print(eror_dic) bulk_all_body = ''