Commit 3a61d8f4 authored by litaolemo's avatar litaolemo

update

parent 4dfed1b9
# crawler
1. 部署在BJ-PaaS-test-nvwa001/srv/apps/
2. 创建虚拟环境 conda activate crawler_env/conda deactivate
\ No newline at end of file
......@@ -6,8 +6,7 @@ Created on Thu Jun 14 17:09:09 2018
"""
from elasticsearch import Elasticsearch
es_framework = Elasticsearch(hosts='192.168.17.11', port=80,
http_auth=('crawler', 'XBcasfo8dgfs'))
es_framework = Elasticsearch(hosts='172.16.32.37', port=9200)
index_target_releaser = 'target_releasers'
doc_type_target_releaser = 'doc'
......
......@@ -75,18 +75,16 @@ def output_result(result_Lst, platform,
push_to_redis=False,
batch_str=None,
release_time_lower_bdr=None,
es_index=index_site_crawler,
doc_type=doc_type_site_crawler):
es_index=index_site_crawler):
# write data into es crawler-raw index
if output_to_es_raw:
bulk_write_into_es(result_Lst, es_index, doc_type)
bulk_write_into_es(result_Lst, es_index)
# write data into es crawler-url-register index
if output_to_es_register:
data_Lst_reg = form_data_Lst_for_url_register(result_Lst)
bulk_write_into_es(data_Lst_reg,
index=index_url_register,
doc_type=doc_type_url_register,
construct_id=True,
platform=platform
)
......@@ -148,7 +146,6 @@ def get_ill_encoded_str_posi(UnicodeEncodeError_msg):
def bulk_write_into_es(dict_Lst,
index,
doc_type,
construct_id=False,
platform=None):
bulk_write_body = ''
......@@ -186,15 +183,15 @@ def bulk_write_into_es(dict_Lst,
if construct_id and platform is not None:
doc_id = construct_id_for_url_register(platform, line['url'])
action_str = ('{ "index" : { "_index" : "%s", "_type" : "%s", "_id" : "%s" } }'
% (index, doc_type, doc_id))
% (index, doc_id))
else:
action_str = ('{ "index" : { "_index" : "%s", "_type" : "%s" } }'
% (index, doc_type))
% (index))
data_str = json.dumps(line, ensure_ascii=False)
line_body = action_str + '\n' + data_str + '\n'
bulk_write_body += line_body
if write_counter%1000 == 0 or write_counter == len(dict_Lst):
print('Writing into es %s/%s %d/%d' % (index, doc_type,
print('Writing into es %s %d/%d' % (index,
write_counter,
len(dict_Lst)))
if bulk_write_body != '':
......
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 17 10:15:01 2018
@author: hanye
"""
from es_hy_sign_in import es
import sys
import datetime
def cal_monthly_net_inc_test_range(fetch_year, fetch_month, fetch_day,
fetch_hour=0,
fetch_time_seg_hours=24,
doc_type_target='daily-url',
search_body_logic_part=None,
threads_num=5,
logger_name='calculate_monthly_net_inc',
f_log=sys.stdout):
print('************In func:', file=f_log)
print('fetch_year:', fetch_year, file=f_log)
print('fetch_month:', fetch_month, file=f_log)
print('fetch_day:', fetch_day, file=f_log)
print('fetch_hour:', fetch_hour, file=f_log)
print('fetch_time_seg_hours:', fetch_time_seg_hours, file=f_log)
print('doc_type_target:', doc_type_target, file=f_log)
print('search_body_logic_part:', search_body_logic_part, file=f_log)
print('threads_num:', threads_num, file=f_log)
print('logger_name:', logger_name, file=f_log)
date_passed_in = datetime.datetime(year=fetch_year, month=fetch_month,
day=fetch_day, hour=fetch_hour)
fetch_time_start_ts = int(date_passed_in.timestamp()*1e3)
fetch_time_end_ts = int((date_passed_in
+ datetime.timedelta(seconds=fetch_time_seg_hours*3600)
).timestamp()*1e3)
fetch_time_start_iso = datetime.datetime.fromtimestamp(
int(fetch_time_start_ts/1e3)).isoformat()
fetch_time_end_iso = datetime.datetime.fromtimestamp(
int(fetch_time_end_ts/1e3)).isoformat()
print('fetch_time_start_iso:', fetch_time_start_iso, file=f_log)
print('fetch_time_end_iso:', fetch_time_end_iso, file=f_log)
release_time_start_ts = int((date_passed_in - datetime.timedelta(days=180))
.timestamp()*1000)
release_time_end_ts = int((date_passed_in + datetime.timedelta(days=365))
.timestamp()*1000)
release_time_start_T = datetime.datetime.fromtimestamp(release_time_start_ts/1e3)
release_time_end_T = datetime.datetime.fromtimestamp(release_time_end_ts/1e3)
print('release_time_start:', release_time_start_T, file=f_log)
print('release_time_end:', release_time_end_T, file=f_log)
search_body = {
"query": {
"bool": {
"filter": [
{"range": {"release_time": {
"gte": release_time_start_ts,
"lt": release_time_end_ts}}
},
{"range": {"fetch_time": {
"gte": fetch_time_start_ts,
"lt": fetch_time_end_ts}}
}
],
}
},
"size": 2,
"aggs": {
"release_time_distribution": {
"date_histogram": {
"field": "release_time",
"interval": "day",
"time_zone": "Asia/Shanghai"
}
}
}
}
if search_body_logic_part is not None:
if 'filter' in search_body_logic_part:
search_body['query']['bool']['filter'].append(search_body_logic_part['filter'])
else:
search_body['query']['bool'].update(search_body_logic_part)
else:
pass
search_body_str = search_body.__str__().replace('\'', '"')
print('search_body:', search_body_str, file=f_log)
search_resp = es.search(index='short-video-production',
doc_type='daily-url-2018-06-30',
body=search_body)
total_hit = search_resp['hits']['total']
print('search hits:', total_hit, file=f_log)
runday = datetime.datetime.now()
if runday.day == 1:
last_day_in_the_month_T = runday - datetime.timedelta(days=1)
else:
if runday.month == 1:
month_pre = 12
year_pre = runday.year - 1
last_day_in_the_month_T = datetime.datetime(year_pre, month_pre, 31)
else:
month_pre = runday.month - 1
year_pre = runday.year
last_day_in_the_month_T = (datetime.datetime(year_pre, runday.month, 1)
- datetime.timedelta(days=1))
last_day_in_the_month_str = last_day_in_the_month_T.isoformat()[:10]
doc_type_monthly = 'daily-url-%s' % last_day_in_the_month_str
first_day_in_next_month_T = last_day_in_the_month_T + datetime.timedelta(days=1)
year_start = last_day_in_the_month_T.year
month_start = last_day_in_the_month_T.month
cal_month_str = datetime.datetime.strftime(last_day_in_the_month_T, '%b%Y')
cal_month_T = last_day_in_the_month_T
logger_name = 'calculate_MNI_for_missed'
thread_num = 5
search_body_for_missed_MNI_logic_part = {
"must_not": [
{"exists": {"field": "monthly_cal_base"}}
]
}
cal_day_T = datetime.datetime(year_start, month_start, 2)
step_hours = 4
log_fn = 'test_range_for_MNI_missed_cal_%s.log' % datetime.datetime.now().isoformat()[:19].replace(':', '-')
log_pth = r'D:\CSM\Docs\Projects\短视频\code\write-data-into-es\test'
f_log = open(log_pth+'/'+log_fn, 'w', encoding='utf-8')
while cal_day_T <= first_day_in_next_month_T+datetime.timedelta(days=1):
print(cal_day_T)
cal_monthly_net_inc_test_range(cal_day_T.year, cal_day_T.month, cal_day_T.day,
fetch_hour=cal_day_T.hour,
fetch_time_seg_hours=step_hours,
doc_type_target=doc_type_monthly,
search_body_logic_part=search_body_for_missed_MNI_logic_part,
threads_num=thread_num,
logger_name=logger_name,
f_log=f_log)
cal_day_T = cal_day_T + datetime.timedelta(seconds=step_hours*3600)
f_log.close()
# encoding: utf-8
'''
@author: zhangjian
@time: 2018/11/22 15:10
'''
from elasticsearch import Elasticsearch
from elasticsearch import exceptions
from elasticsearch.helpers import scan
from urllib.parse import quote
import json
import pandas as pd
import datetime
import copy
hosts = '192.168.17.11'
port = 80
user = 'zhouyujiang'
passwd = '8tM9JDN2LVxM'
http_auth = (user, passwd)
es = Elasticsearch(hosts=hosts, port=port, http_auth=http_auth)
sv_index = 'short-video-production'
sv_doc = 'daily-url'
wr_index = 'short-video-production'
wr_doc = 'daily-url'
platfrom_list = [
'haokan',
]
count = 0
bulk_all_body = ''
for platfrom in platfrom_list:
search_body = {
"query": {
"bool": {
"filter": [
{"term": {"platform.keyword": platfrom}},
{"term": {"data_provider.keyword": "CCR"}}
],
"must": [
{
"match_phrase": {
"url": "nid"
}
}
]
}
}
}
search_resp = scan(client=es, index=sv_index, doc_type=sv_doc, query=search_body)
for line in search_resp:
count += 1
one_dict = {}
#dayil-url
fetch_time_int = int(line['_source']['fetch_time'] / 1000)
fetch_time_H = datetime.datetime.fromtimestamp(fetch_time_int)
year=fetch_time_H.year
month=fetch_time_H.month
day=fetch_time_H.day
date_str='_'+str(year)+'-'+str(month)+'-'+str(day)
one_dict.update(line['_source'])
vid = line['_source']['video_id']
url= 'https://sv.baidu.com/videoui/page/videoland?context=' + quote('{\"nid\":\"sv_' + vid + "\"}")
new_id=url+date_str
# print(new_id)
one_dict.update({"data_provider": "CCR_2",
"url": url})
bulk_head = '{"index": {"_id":"%s"}}' % new_id
data_str = json.dumps(one_dict, ensure_ascii=False)
bulk_one_body = bulk_head + '\n' + data_str + '\n'
bulk_all_body += bulk_one_body
if count % 1000 == 0:
eror_dic = es.bulk(index=wr_index, doc_type=wr_doc,
body=bulk_all_body, request_timeout=200)
bulk_all_body = ''
if eror_dic['errors'] is True:
print(eror_dic['items'])
print(bulk_all_body)
print(count)
if bulk_all_body != '':
eror_dic = es.bulk(body=bulk_all_body,
index=wr_index,
doc_type=wr_doc,
request_timeout=200)
if eror_dic['errors'] is True:
print(eror_dic)
bulk_all_body = ''
print("end")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment