Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
crawler
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
backend
crawler
Commits
56c55b5a
Commit
56c55b5a
authored
4 years ago
by
litaolemo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
b9ce66aa
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
136 additions
and
603 deletions
+136
-603
revise_table_data_to_table.py
crawler_sys/tools/revise_table_data_to_table.py
+25
-0
check_high_play_count_data_source_v_qq.py
tasks/check_high_play_count_data_source_v_qq.py
+0
-92
from_sparksql_to_mysql.py
tasks/from_sparksql_to_mysql.py
+111
-0
update_DU_ATU_from_crawler_raw.py
tasks/update_DU_ATU_from_crawler_raw.py
+0
-297
write_key_releaser_to_week_doc_weekly.py
tasks/write_key_releaser_to_week_doc_weekly.py
+0
-214
No files found.
crawler_sys/tools/revise_table_data_to_table.py
0 → 100644
View file @
56c55b5a
# -*- coding:UTF-8 -*-
# @Time : 2020/8/19 11:47
# @File : revise_table_data_to_table.py
# @email : litao@igengmei.com
# @author : litao
import
pymysql
def
con_sql
(
sql
):
# 从数据库的表里获取数据
"""
:type sql : str
:rtype : tuple
"""
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'st_user'
,
passwd
=
'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6'
,
db
=
'jerry_prod'
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
db
.
close
()
return
result
sql
=
"slect * from xxx"
\ No newline at end of file
This diff is collapsed.
Click to expand it.
tasks/check_high_play_count_data_source_v_qq.py
deleted
100644 → 0
View file @
b9ce66aa
# -*- coding: utf-8 -*-
"""
Created on Wed Sep 5 17:52:53 2018
@author: fangyucheng
"""
from
crawler_sys.site_crawler.crawler_v_qq
import
Crawler_v_qq
from
crawler_sys.utils.output_results
import
output_result
from
crawler_sys.utils
import
Metaorphosis
as
meta
from
crawler_sys.utils.output_log
import
output_log
logging
=
output_log
(
page_category
=
'video_page'
,
program_info
=
'tencent'
)
def
tran_input_data_to_lst
(
file_name
,
file_category
=
'csv'
):
if
file_category
==
'csv'
:
video_info_lst
=
meta
.
csv_to_lst_whth_headline
(
file_name
)
url_lst
=
[]
for
line
in
video_info_lst
:
try
:
if
line
[
'data_provider'
]
==
'CCR'
:
url_lst
.
append
(
line
[
'url'
])
except
:
pass
return
url_lst
elif
file_category
==
'file'
:
url_lst
=
meta
.
str_file_to_lst
(
file_name
)
return
url_lst
url_lst
=
tran_input_data_to_lst
(
file_name
=
'R:/CCR/数据需求/短期临时需求/TX'
,
file_category
=
'file'
)
crawler
=
Crawler_v_qq
()
get_video_page
=
crawler
.
video_page
def
get_data_source
(
url_lst
=
url_lst
,
output_to_file
=
False
,
filepath
=
None
,
output_to_es_raw
=
False
,
output_to_es_register
=
False
,
push_to_redis
=
False
,
output_es_index
=
None
,
output_doc_type
=
None
):
result_lst
=
[]
for
url
in
url_lst
:
video_info
=
get_video_page
(
url
=
url
)
result_lst
.
append
(
video_info
)
logging
.
info
(
'get_data at page
%
s'
%
url
)
if
len
(
result_lst
)
>=
100
:
if
output_es_index
is
not
None
and
output_doc_type
is
not
None
:
output_result
(
result_lst
,
platform
=
'腾讯视频'
,
output_to_file
=
output_to_file
,
output_to_es_raw
=
output_to_es_raw
,
output_to_es_register
=
output_to_es_register
,
push_to_redis
=
push_to_redis
,
es_index
=
output_es_index
,
doc_type
=
output_doc_type
)
result_lst
.
clear
()
else
:
output_result
(
result_lst
,
platform
=
'腾讯视频'
,
output_to_file
=
output_to_file
,
output_to_es_raw
=
output_to_es_raw
,
output_to_es_register
=
output_to_es_register
,
push_to_redis
=
push_to_redis
)
result_lst
.
clear
()
if
len
(
result_lst
)
!=
[]:
if
output_es_index
is
not
None
and
output_doc_type
is
not
None
:
output_result
(
result_lst
,
platform
=
'腾讯视频'
,
output_to_file
=
output_to_file
,
output_to_es_raw
=
output_to_es_raw
,
output_to_es_register
=
output_to_es_register
,
push_to_redis
=
push_to_redis
,
es_index
=
output_es_index
,
doc_type
=
output_doc_type
)
result_lst
.
clear
()
else
:
output_result
(
result_lst
,
platform
=
'腾讯视频'
,
output_to_file
=
output_to_file
,
output_to_es_raw
=
output_to_es_raw
,
output_to_es_register
=
output_to_es_register
,
push_to_redis
=
push_to_redis
)
result_lst
.
clear
()
if
__name__
==
'__main__'
:
get_data_source
(
output_to_es_raw
=
True
,
output_es_index
=
'test2'
,
output_doc_type
=
'fyc'
)
\ No newline at end of file
This diff is collapsed.
Click to expand it.
tasks/from_sparksql_to_mysql.py
0 → 100644
View file @
56c55b5a
# -*- coding:UTF-8 -*-
# @Time : 2020/8/19 11:53
# @File : from_sparksql_to_mysql.py
# @email : litao@igengmei.com
# @author : litao
import
hashlib
import
json
import
pymysql
import
xlwt
,
datetime
import
redis
# from pyhive import hive
from
maintenance.func_send_email_with_file
import
send_file_email
from
typing
import
Dict
,
List
from
elasticsearch_7
import
Elasticsearch
from
elasticsearch_7.helpers
import
scan
import
sys
import
time
from
pyspark
import
SparkConf
from
pyspark.sql
import
SparkSession
,
DataFrame
from
pyspark.sql.functions
import
lit
import
pytispark.pytispark
as
pti
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'st_user'
,
passwd
=
'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6'
,
db
=
'jerry_prod'
)
cursor
=
db
.
cursor
()
def
con_sql
(
sql
):
# 从数据库的表里获取数据
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'st_user'
,
passwd
=
'YPEzp78HQBuhByWPpefQu6X3D6hEPfD6'
,
db
=
'jerry_prod'
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
db
.
close
()
return
result
startTime
=
time
.
time
()
sparkConf
=
SparkConf
()
sparkConf
.
set
(
"spark.sql.crossJoin.enabled"
,
True
)
sparkConf
.
set
(
"spark.debug.maxToStringFields"
,
"100"
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_double_read"
,
False
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_read"
,
True
)
sparkConf
.
set
(
"spark.hive.mapred.supports.subdirectories"
,
True
)
sparkConf
.
set
(
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"
,
True
)
sparkConf
.
set
(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
sparkConf
.
set
(
"mapreduce.output.fileoutputformat.compress"
,
False
)
sparkConf
.
set
(
"mapreduce.map.output.compress"
,
False
)
sparkConf
.
set
(
"prod.gold.jdbcuri"
,
"jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.mimas.jdbcuri"
,
"jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.gaia.jdbcuri"
,
"jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.tidb.jdbcuri"
,
"jdbc:mysql://172.16.40.158:4000/eagle?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.jerry.jdbcuri"
,
"jdbc:mysql://172.16.40.158:4000/jerry_prod?user=st_user&password=aqpuBLYzEV7tML5RPsN1pntUzFy&rewriteBatchedStatements=true"
)
sparkConf
.
set
(
"prod.tispark.pd.addresses"
,
"172.16.40.158:2379"
)
sparkConf
.
set
(
"prod.tispark.pd.addresses"
,
"172.16.40.170:4000"
)
sparkConf
.
set
(
"prod.tidb.database"
,
"jerry_prod"
)
spark
=
(
SparkSession
.
builder
.
config
(
conf
=
sparkConf
)
.
config
(
"spark.sql.extensions"
,
"org.apache.spark.sql.TiExtensions"
)
.
config
(
"spark.tispark.pd.addresses"
,
"172.16.40.170:2379"
)
.
appName
(
"LR PYSPARK TEST"
)
.
enableHiveSupport
()
.
getOrCreate
())
spark
.
sql
(
"ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar"
)
spark
.
sql
(
"ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'"
)
spark
.
sql
(
"CREATE TEMPORARY FUNCTION arrayMerge AS 'com.gmei.hive.common.udf.UDFArryMerge'"
)
select_sql
=
"""SELECT * FROM pm.tl_pm_contentpage_ctr"""
device_df
=
spark
.
sql
(
select_sql
)
device_df
.
show
(
1
,
False
)
sql_res
=
device_df
.
collect
()
print
(
"-----------------------------------------------------------------------------"
)
for
res
in
sql_res
:
day_id
=
res
.
day_id
device_os_type
=
res
.
device_os_type
active_type
=
res
.
active_type
grey_type
=
res
.
grey_type
page_name
=
res
.
page_name
content_pv
=
res
.
content_pv
content_uv
=
res
.
content_uv
wel_exp_pv
=
res
.
wel_exp_pv
content_exp_pv
=
res
.
content_exp_pv
wel_click_pv
=
res
.
wel_click_pv
content_click_pv
=
res
.
content_click_pv
slide_wel_click_pv
=
res
.
slide_wel_click_pv
self_wel_click_pv
=
res
.
self_wel_click_pv
partition_day
=
res
.
partition_day
pid
=
hashlib
.
md5
(
day_id
+
device_os_type
+
active_type
+
grey_type
+
page_name
)
sql
=
"""INSERT INTO conent_detail_page_grayscale_ctr(day_id,device_os_type,
active_type,grey_type,page_name,content_pv,content_uv,wel_exp_pv,content_exp_pv,wel_click_pv,content_click_pv,
wel_click_pv,content_click_pv,slide_wel_click_pv,partition_day,pid
) VALUES('{day_id}','{device_os_type}',
'{active_type}','{grey_type}','{page_name}',{content_pv},{content_uv},{wel_exp_pv},{content_exp_pv},{wel_click_pv},{content_click_pv},
{wel_click_pv},{content_click_pv},{slide_wel_click_pv},'{partition_day}','{pid})'"""
.
format
(
day_id
=
day_id
,
device_os_type
=
device_os_type
,
active_type
=
active_type
,
grey_type
=
grey_type
,
page_name
=
page_name
,
content_pv
=
content_pv
,
content_uv
=
content_uv
,
wel_exp_pv
=
wel_exp_pv
,
content_exp_pv
=
content_exp_pv
,
wel_click_pv
=
wel_click_pv
,
content_click_pv
=
content_click_pv
,
slide_wel_click_pv
=
slide_wel_click_pv
,
self_wel_click_pv
=
self_wel_click_pv
,
partition_day
=
partition_day
,
pid
=
pid
)
cursor
.
execute
(
sql
)
# cursor.executemany()
db
.
close
()
\ No newline at end of file
This diff is collapsed.
Click to expand it.
tasks/update_DU_ATU_from_crawler_raw.py
deleted
100644 → 0
View file @
b9ce66aa
# -*- coding: utf-8 -*-
"""
Created on Thu Jun 14 16:56:45 2018
@author: hanye
Due to the play count from v_qq video page maybe album play count rather than
video play count, we have to select these video info, not write into
short-video-production but another new index named album-play-count
"""
import
os
import
re
import
datetime
import
argparse
import
configparser
from
elasticsearch.helpers
import
scan
from
elasticsearch
import
Elasticsearch
from
crawler_sys.framework.es_short_video
import
bulk_write_short_video
from
crawler_sys.framework.es_crawler
import
scan_crawler_raw_index
from
crawler_sys.utils.output_results
import
bulk_write_into_es
es_framework
=
Elasticsearch
(
hosts
=
'192.168.17.11'
,
port
=
80
,
http_auth
=
(
'crawler'
,
'XBcasfo8dgfs'
))
parser
=
argparse
.
ArgumentParser
(
description
=
'You can specify a date to process.'
)
parser
.
add_argument
(
'-d'
,
'--file_date'
,
help
=
(
'Must in isoformat, similar to "2018-06-07". Other '
'format will just be ignored.'
))
parser
.
add_argument
(
'-H'
,
'--write_high'
,
help
=
(
'like 1 OR 0'
),
default
=
1
)
parser
.
add_argument
(
'-R'
,
'--write_day'
,
help
=
(
'like 1 OR 0'
),
default
=
1
)
parser
.
add_argument
(
'-p'
,
'--target_platform'
,
action
=
'append'
,
default
=
None
,
help
=
(
'Write sigle platform.similaer to "抖音"'
))
args
=
parser
.
parse_args
()
if
args
.
file_date
is
not
None
:
try
:
dayT
=
datetime
.
datetime
.
strptime
(
args
.
file_date
,
'
%
Y-
%
m-
%
d'
)
except
:
print
(
'Ill format for parameter -t:
%
s, should be in isoformat, '
'similar to "2018-06-07". The input parameter is ignored, '
'will continue to run with default parameters. Ctrl-C to '
'interrupt or just kill -9 pid.'
%
args
.
file_date
)
else
:
dayT
=
datetime
.
datetime
.
now
()
def
save_log
(
process
):
with
open
(
"/home/hanye/crawlersNew/crawler/crawler_log/daily_log"
,
"a"
,
encoding
=
"utf-8"
)
as
f
:
f
.
write
(
process
+
"_at_"
+
datetime
.
datetime
.
now
()
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
+
"
\n
"
)
save_log
(
"start"
)
high_fre_index
=
"target_releasers_org"
# get legal platforms from configure file
#查找当前目录
#cwd = os.getcwd()
#if os.name == 'nt':
# path_sep = '\\'
# pattern = '.+\\\\crawler\\\\'
#else:
# path_sep = '/'
# pattern = '.+/crawler/'
#parent_pth = re.findall(pattern, cwd)[0]
config_folder_pth_relative
=
'/crawler_sys/framework/config'
parent_pth
=
'/home/hanye/crawlersNew/crawler'
config_folder_pth_abs
=
parent_pth
+
config_folder_pth_relative
legal_platforms_config_fn
=
'legal_platforms.ini'
config
=
configparser
.
ConfigParser
()
with
open
(
config_folder_pth_abs
+
'/'
+
legal_platforms_config_fn
,
'r'
,
encoding
=
'utf-8'
)
as
conf_file
:
config
.
read_file
(
conf_file
)
legal_platforms
=
config
[
'legal_platforms_to_update_production'
][
'legal_platforms'
]
.
split
(
','
)
print
(
legal_platforms
)
#dayT = datetime.datetime.today()
fetch_time_start_T
=
datetime
.
datetime
(
dayT
.
year
,
dayT
.
month
,
dayT
.
day
)
+
datetime
.
timedelta
(
days
=-
1
)
# fetch date range spreads on two days rather than one, to
# avoid missing data because task time overlap
fetch_time_end_T
=
fetch_time_start_T
+
datetime
.
timedelta
(
days
=
2
)
fetch_time_start_ts
=
int
(
fetch_time_start_T
.
timestamp
()
*
1e3
)
fetch_time_end_ts
=
int
(
fetch_time_end_T
.
timestamp
()
*
1e3
)
release_time_start_T
=
fetch_time_start_T
-
datetime
.
timedelta
(
days
=
60
)
release_time_start_ts
=
int
(
release_time_start_T
.
timestamp
()
*
1e3
)
fetch_time_start_T_high
=
datetime
.
datetime
(
dayT
.
year
,
dayT
.
month
,
dayT
.
day
)
-
datetime
.
timedelta
(
days
=
2
)
fetch_time_end_T_high
=
fetch_time_start_T_high
+
datetime
.
timedelta
(
days
=
1
)
fetch_time_start_ts_high
=
int
(
fetch_time_start_T_high
.
timestamp
()
*
1e3
)
fetch_time_end_ts_high
=
int
(
fetch_time_end_T_high
.
timestamp
()
*
1e3
)
if
args
.
target_platform
:
legal_platforms
=
args
.
target_platform
if
args
.
write_day
==
1
:
print
(
'start write low into production'
)
for
platform
in
legal_platforms
:
print
(
platform
)
find_data_from_crawler_raw_bd
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"platform.keyword"
:
platform
}},
{
"range"
:
{
"fetch_time"
:
{
"gte"
:
fetch_time_start_ts
,
"lt"
:
fetch_time_end_ts
}
}
},
{
"range"
:
{
"release_time"
:
{
"gte"
:
release_time_start_ts
}
}
}
],
"must_not"
:
[
{
"term"
:{
"data_source"
:
"interactioncount"
}}
]
}
}
}
total_hit
,
scan_resp
=
scan_crawler_raw_index
(
find_data_from_crawler_raw_bd
)
if
total_hit
>
0
:
line_counter
=
0
data_Lst
=
[]
for
line
in
scan_resp
:
line_counter
+=
1
line_d
=
line
[
'_source'
]
data_Lst
.
append
(
line_d
)
if
line_counter
%
500
==
0
or
line_counter
==
total_hit
:
print
(
'Writing lines
%
d/
%
d into short video index,
%
s'
%
(
line_counter
,
total_hit
,
datetime
.
datetime
.
now
()))
bulk_write_short_video
(
data_Lst
,
#index='test_write6', # test
)
data_Lst
.
clear
()
if
data_Lst
!=
[]:
print
(
'Writing lines
%
d/
%
d into short video index,
%
s'
%
(
line_counter
,
total_hit
,
datetime
.
datetime
.
now
()))
bulk_write_short_video
(
data_Lst
,
#index='test_write6', # test
)
data_Lst
.
clear
()
print
(
'All done.
%
s'
%
datetime
.
datetime
.
now
())
else
:
print
(
'Zero hit, program exits.
%
s'
%
datetime
.
datetime
.
now
())
scan_high_releaser
=
[]
save_log
(
"end_time_write_to_alltime"
)
if
args
.
write_high
==
1
:
print
(
'start write high into production'
)
high_re_list
=
[]
high_count
=
0
search_high_releaser_body
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"range"
:
{
"frequency"
:
{
"gte"
:
3
}}}
]
}
}
}
scan_high_1
=
scan
(
client
=
es_framework
,
index
=
high_fre_index
,
doc_type
=
'doc'
,
query
=
search_high_releaser_body
)
scan_high_2
=
scan
(
client
=
es_framework
,
index
=
"target_releasers"
,
doc_type
=
'doc'
,
query
=
search_high_releaser_body
)
print
(
'start frequency releaser'
)
for
res
in
scan_high_1
:
scan_high_releaser
.
append
(
res
)
for
res
in
scan_high_2
:
if
res
not
in
scan_high_releaser
:
scan_high_releaser
.
append
(
res
)
print
(
"get
%
s releaser in high frequency"
%
len
(
scan_high_releaser
))
# write high frequency releaser craw data in yesterday
for
one_high_releaser
in
scan_high_releaser
:
platform
=
one_high_releaser
[
'_source'
][
'platform'
]
if
platform
in
legal_platforms
:
try
:
releaser
=
one_high_releaser
[
'_source'
][
'releaser'
]
releaser_id_str
=
one_high_releaser
[
'_source'
]
.
get
(
'releaser_id_str'
)
scan_high_releaser_body
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"platform.keyword"
:
platform
}},
{
"range"
:
{
"fetch_time"
:
{
"gte"
:
fetch_time_start_ts_high
,
"lt"
:
fetch_time_end_ts_high
}
}
},
{
"range"
:
{
"release_time"
:
{
"gte"
:
release_time_start_ts
}
}
}
],
"must_not"
:
[
{
"term"
:{
"data_source"
:
"interactioncount"
}}
]
}
}
}
if
releaser_id_str
:
scan_high_releaser_body
[
"query"
][
"bool"
][
"filter"
]
.
append
({
"term"
:
{
"releaser_id_str.keyword"
:
releaser_id_str
}})
else
:
scan_high_releaser_body
[
"query"
][
"bool"
][
"filter"
]
.
append
(
{
"term"
:
{
"releaser.keyword"
:
releaser
}})
total_one_releaser
,
total_high_data
=
scan_crawler_raw_index
(
scan_high_releaser_body
)
if
total_one_releaser
!=
0
:
for
one_high_data
in
total_high_data
:
high_count
=
high_count
+
1
high_re_list
.
append
(
one_high_data
[
'_source'
])
if
high_count
%
500
==
0
:
print
(
'Writing lines
%
d into short video index,
%
s
%
s
%
s'
%
(
high_count
,
datetime
.
datetime
.
now
(),
platform
,
releaser
))
bulk_write_short_video
(
high_re_list
)
high_re_list
.
clear
()
except
Exception
as
e
:
print
(
e
)
print
(
'wrong in '
,
platform
,
releaser
)
else
:
print
(
'platform is not allowed to writeinto production:'
,
platform
)
if
high_re_list
!=
[]:
print
(
'Writing lines
%
d into short video index,
%
s'
%
(
high_count
,
datetime
.
datetime
.
now
()))
bulk_write_short_video
(
high_re_list
)
high_re_list
.
clear
()
save_log
(
"end_high_frequency_"
)
#for those video info with album play count, write them into another es index
find_album_play_count_data
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"data_source"
:
"interactioncount"
}
},
{
"range"
:
{
"fetch_time"
:
{
"gte"
:
fetch_time_start_ts
,
"lt"
:
fetch_time_end_ts
}
}
}
]
}
}
}
total_hit
,
scan_resp
=
scan_crawler_raw_index
(
find_album_play_count_data
)
if
total_hit
>
0
:
line_counter
=
0
album_play_count_lst
=
[]
for
line
in
scan_resp
:
line_counter
+=
1
line_d
=
line
[
'_source'
]
album_play_count_lst
.
append
(
line_d
)
if
line_counter
%
500
==
0
or
line_counter
==
total_hit
:
print
(
'Writing lines
%
d/
%
d into index album-play-count,
%
s'
%
(
line_counter
,
total_hit
,
datetime
.
datetime
.
now
()))
bulk_write_into_es
(
dict_Lst
=
album_play_count_lst
,
index
=
'album-play-count'
,
doc_type
=
'doc'
)
album_play_count_lst
.
clear
()
if
album_play_count_lst
!=
[]:
print
(
'Writing lines
%
d/
%
d index album-play-count,
%
s'
%
(
line_counter
,
total_hit
,
datetime
.
datetime
.
now
()))
bulk_write_into_es
(
dict_Lst
=
album_play_count_lst
,
index
=
'album-play-count'
,
doc_type
=
'doc'
)
album_play_count_lst
.
clear
()
print
(
'write album play count into another index.
%
s'
%
datetime
.
datetime
.
now
())
else
:
print
(
'Zero hit, program exits.
%
s'
%
datetime
.
datetime
.
now
())
save_log
(
"end_time_"
)
\ No newline at end of file
This diff is collapsed.
Click to expand it.
tasks/write_key_releaser_to_week_doc_weekly.py
deleted
100644 → 0
View file @
b9ce66aa
# -*- coding:utf-8 -*-
# @Time : 2019/8/14 18:01
# @Author : litao
import
json
# import argparse
import
datetime
from
elasticsearch
import
Elasticsearch
from
elasticsearch.helpers
import
scan
from
func_find_week_num
import
find_week_belongs_to
from
crawler.crawler_sys.framework.platform_crawler_register
import
get_crawler
from
crawler.crawler_sys.utils
import
trans_format
from
write_data_into_es.func_cal_doc_id
import
cal_doc_id
hosts
=
'192.168.17.11'
port
=
80
user
=
'zhouyujiang'
passwd
=
'8tM9JDN2LVxM'
http_auth
=
(
user
,
passwd
)
es
=
Elasticsearch
(
hosts
=
hosts
,
port
=
port
,
http_auth
=
http_auth
)
# parser = argparse.ArgumentParser()
# parser.add_argument('-w', '--week_str', type=str, default=None)
def
week_start_day
(
week_year
,
week_no
,
week_day
,
week_day_start
=
1
):
year_week_start
=
find_first_day_for_given_start_weekday
(
week_year
,
week_day_start
)
week_start
=
year_week_start
+
datetime
.
timedelta
(
days
=
(
week_no
-
1
)
*
7
)
return
week_start
def
define_doc_type
(
week_year
,
week_no
,
week_day_start
):
"""
doc_type = 'daily-url-2018_w24_s2' means select Tuesday as the
first day of each week, it's year 2018's 24th week.
In isocalendar defination, Monday - weekday 1, Tuesday - weekday 2,
..., Saturday - weekday 6, Sunday - weekday 7.
"""
doc_type_str
=
'daily-url-
%
d_w
%02
d_s
%
d'
%
(
week_year
,
week_no
,
week_day_start
)
return
doc_type_str
def
find_first_day_for_given_start_weekday
(
year
,
start_weekday
):
i
=
0
while
i
<
7
:
dayDi
=
datetime
.
date
(
year
,
1
,
1
)
+
datetime
.
timedelta
(
days
=
i
)
if
dayDi
.
weekday
()
==
start_weekday
:
cal_day1D
=
dayDi
-
datetime
.
timedelta
(
days
=
1
)
break
else
:
cal_day1D
=
None
i
+=
1
return
cal_day1D
def
get_target_releaser_video_info
(
platform
,
releaserUrl
,
log_file
=
None
,
output_to_es_raw
=
True
,
es_index
=
None
,
doc_type
=
None
,
releaser_page_num_max
=
100
):
if
log_file
==
None
:
log_file
=
open
(
'error.log'
,
'w'
)
crawler
=
get_crawler
(
platform
=
platform
)
crawler_initialization
=
crawler
()
if
platform
==
'haokan'
:
try
:
crawler_initialization
.
releaser_page
(
releaserUrl
=
releaserUrl
,
releaser_page_num_max
=
releaser_page_num_max
,
output_to_es_raw
=
True
,
es_index
=
es_index
,
doc_type
=
doc_type
,
fetchFavoriteCommnt
=
True
)
except
:
print
(
releaserUrl
,
platform
,
file
=
log_file
)
else
:
try
:
crawler_initialization
.
releaser_page
(
releaserUrl
=
releaserUrl
,
releaser_page_num_max
=
releaser_page_num_max
,
output_to_es_raw
=
True
,
es_index
=
es_index
,
doc_type
=
doc_type
)
except
:
print
(
releaserUrl
,
platform
,
file
=
log_file
)
def
func_search_reUrl_from_target_index
(
platform
,
releaser
):
search_body
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"platform.keyword"
:
platform
}},
{
"term"
:
{
"releaser.keyword"
:
releaser
}}
]
}
}
}
search_re
=
es
.
search
(
index
=
'target_releasers'
,
doc_type
=
'doc'
,
body
=
search_body
)
if
search_re
[
'hits'
][
'total'
]
>
0
:
return
search_re
[
'hits'
][
'hits'
][
0
][
'_source'
][
'releaserUrl'
]
else
:
print
(
'Can not found:'
,
platform
,
releaser
)
return
None
def
func_write_into_weekly_index_new_released
(
line_list
,
doc_type
,
index
=
'short-video-weekly'
):
count
=
0
bulk_all_body
=
''
re_list
=
[]
for
line
in
line_list
:
count
=
count
+
1
weekly_net_inc_play_count
=
line
[
'play_count'
]
weekly_net_inc_comment_count
=
line
[
'comment_count'
]
weekly_net_inc_favorite_count
=
line
[
'favorite_count'
]
weekly_cal_base
=
'accumulate'
timestamp
=
int
(
datetime
.
datetime
.
timestamp
(
datetime
.
datetime
.
now
())
*
1000
)
line
.
update
({
'timestamp'
:
timestamp
,
'weekly_cal_base'
:
weekly_cal_base
,
'weekly_net_inc_favorite_count'
:
weekly_net_inc_favorite_count
,
'weekly_net_inc_comment_count'
:
weekly_net_inc_comment_count
,
'weekly_net_inc_play_count'
:
weekly_net_inc_play_count
})
re_list
.
append
(
line
)
url
=
line
[
'url'
]
platform
=
line
[
'platform'
]
doc_id
=
cal_doc_id
(
platform
,
url
=
url
,
doc_id_type
=
'all-time-url'
,
data_dict
=
line
)
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
doc_id
data_str
=
json
.
dumps
(
line
,
ensure_ascii
=
False
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
#
bulk_all_body
+=
bulk_one_body
if
count
%
500
==
0
:
eror_dic
=
es
.
bulk
(
index
=
index
,
doc_type
=
doc_type
,
body
=
bulk_all_body
,
request_timeout
=
200
)
bulk_all_body
=
''
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
[
'items'
])
print
(
bulk_all_body
)
print
(
count
)
if
bulk_all_body
!=
''
:
eror_dic
=
es
.
bulk
(
body
=
bulk_all_body
,
index
=
index
,
doc_type
=
doc_type
,
request_timeout
=
200
)
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
todayT
=
datetime
.
datetime
.
now
()
# todayT=datetime.datetime(2019,2,5)
week_day_start
=
1
# if args.week_str is None:
seven_days_ago_T
=
todayT
-
datetime
.
timedelta
(
days
=
7
)
week_year
,
week_no
,
week_day
=
find_week_belongs_to
(
seven_days_ago_T
,
week_day_start
)
week_start
=
week_start_day
(
week_year
,
week_no
,
week_day
)
re_s
=
week_start
-
datetime
.
timedelta
(
1
)
re_s_dt
=
datetime
.
datetime
.
strptime
(
str
(
re_s
),
'
%
Y-
%
m-
%
d'
)
re_s_t
=
int
(
datetime
.
datetime
.
timestamp
(
re_s_dt
)
*
1000
)
re_e
=
week_start
+
datetime
.
timedelta
(
6
)
re_e_dt
=
datetime
.
datetime
.
strptime
(
str
(
re_e
),
'
%
Y-
%
m-
%
d'
)
re_e_t
=
int
(
datetime
.
datetime
.
timestamp
(
re_e_dt
)
*
1000
)
# nowT_feihua = week_start + datetime.timedelta(days=6)
weekly_doc_type_name
=
define_doc_type
(
week_year
,
week_no
,
week_day_start
=
week_day_start
)
key_releaser_body
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"key_releaser.keyword"
:
"True"
}}
]
}
}
}
releaser_re
=
scan
(
client
=
es
,
index
=
'target_releasers'
,
doc_type
=
'doc'
,
query
=
key_releaser_body
,
scroll
=
'3m'
)
for
re
in
releaser_re
:
releaser
=
re
[
"_source"
][
'releaser'
]
platform
=
re
[
"_source"
][
'platform'
]
if
releaser
!=
None
:
re_list
=
[]
search_body
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"platform.keyword"
:
platform
}},
{
"term"
:
{
"releaser.keyword"
:
releaser
}},
{
"range"
:
{
"release_time"
:
{
"gte"
:
re_s_t
,
"lt"
:
re_e_t
}}},
{
"range"
:
{
"fetch_time"
:
{
"gte"
:
re_s_t
}}}
]
}
}
}
scan_re
=
scan
(
client
=
es
,
index
=
'short-video-all-time-url'
,
doc_type
=
'all-time-url'
,
query
=
search_body
,
scroll
=
'3m'
)
for
one_scan
in
scan_re
:
re_list
.
append
(
one_scan
[
'_source'
])
func_write_into_weekly_index_new_released
(
re_list
,
doc_type
=
weekly_doc_type_name
)
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment