Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
crawler
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
backend
crawler
Commits
38846fb6
Commit
38846fb6
authored
4 years ago
by
litaolemo
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
e1343672
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
108 additions
and
154 deletions
+108
-154
es_locust.py
locust_test/es_locust.py
+1
-1
revise_data_from_es.py
write_data_into_es/revise_data_from_es.py
+107
-0
从爬虫索引写数据到切片.py
write_data_into_es/从爬虫索引写数据到切片.py
+0
-153
No files found.
locust_test/es_locust.py
View file @
38846fb6
...
...
@@ -399,7 +399,7 @@ import random
print
(
random
.
choice
(
query_Str_list
))
class
QuickstartUser
(
FastHttpUser
):
@task
(
500
)
@task
def
hello_world
(
self
):
query
=
random
.
choice
(
query_Str_list
)
data
=
{
...
...
This diff is collapsed.
Click to expand it.
write_data_into_es/revise_data_from_es.py
0 → 100644
View file @
38846fb6
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 9 16:47:12 2018
@author: zhouyujiang
"""
import
elasticsearch
import
datetime
from
elasticsearch_7.helpers
import
scan
# import pandas as pd
import
json
# from func_cal_doc_id import cal_doc_id
# from urllib import parse
hosts
=
'172.16.31.17'
port
=
9000
# user = 'zhouyujiang'
# passwd = '8tM9JDN2LVxM'
# http_auth = (user, passwd)
es2
=
elasticsearch
.
Elasticsearch
(
hosts
=
hosts
,
port
=
port
)
es7
=
elasticsearch
.
Elasticsearch
(
hosts
=
"172.16.52.27"
,
port
=
9200
,
http_auth
=
())
target_index
=
'gm-dbmw-doctor-read'
target_type
=
'doctor'
from_index
=
'gm-dbmw-doctor-read'
from_type
=
'doc'
fn
=
r'C:\Users\zhouyujiang\cuowu3.csv'
bulk_all_body
=
''
search_body
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"platform.keyword"
:
platform
}},
{
"term"
:
{
"releaser.keyword"
:
releaser
}},
{
"exists"
:
{
"field"
:
"play_count"
}},
{
"range"
:
{
"release_time"
:
{
"gte"
:
1519833600000
,
"lt"
:
1522512000000
}}},
{
"range"
:
{
"duration"
:
{
"lte"
:
600
}}}
]
}
}
}
q3_re
=
es
.
search
(
index
=
target_index
,
doc_type
=
target_type
,
body
=
search_body
)
q3_total
=
q3_re
[
'hits'
][
'total'
]
write_into_scan
=
scan
(
client
=
es
,
query
=
search_body
,
index
=
target_index
,
doc_type
=
target_type
,
scroll
=
'5m'
,
request_timeout
=
100
)
for
one_scan
in
write_into_scan
:
have_id
=
one_scan
[
'_id'
]
wirte_set
.
add
(
have_id
)
print
(
platform
,
releaser
,
'start_have'
,
len
(
wirte_set
))
# search_body['query']['bool']['filter'].append({"range": {"fetch_time":
# {"gte": 1547539200000}}})
scan_re
=
scan
(
client
=
es
,
query
=
search_body
,
index
=
from_index
,
doc_type
=
from_type
,
scroll
=
'5m'
,
request_timeout
=
100
)
count
=
0
set_url
=
set
()
for
one_scan
in
scan_re
:
# print(one_scan)
count
=
count
+
1
line
=
one_scan
[
'_source'
]
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
doc_id
data_str
=
json
.
dumps
(
line
,
ensure_ascii
=
False
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
#
bulk_all_body
+=
bulk_one_body
if
count
%
100
==
0
:
eror_dic
=
es
.
bulk
(
index
=
target_index
,
doc_type
=
target_type
,
body
=
bulk_all_body
,
request_timeout
=
200
)
bulk_all_body
=
''
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
[
'items'
])
print
(
bulk_all_body
)
print
(
count
)
if
bulk_all_body
!=
''
:
eror_dic
=
es
.
bulk
(
body
=
bulk_all_body
,
index
=
target_index
,
doc_type
=
target_type
,
request_timeout
=
200
)
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
bulk_all_body
=
''
This diff is collapsed.
Click to expand it.
write_data_into_es/从爬虫索引写数据到切片.py
deleted
100644 → 0
View file @
e1343672
# -*- coding: utf-8 -*-
"""
Created on Thu Aug 9 16:47:12 2018
@author: zhouyujiang
"""
import
elasticsearch
import
datetime
from
elasticsearch.helpers
import
scan
# import pandas as pd
import
json
from
func_cal_doc_id
import
cal_doc_id
# from urllib import parse
hosts
=
'192.168.17.11'
port
=
80
user
=
'zhouyujiang'
passwd
=
'8tM9JDN2LVxM'
http_auth
=
(
user
,
passwd
)
es
=
elasticsearch
.
Elasticsearch
(
hosts
=
hosts
,
port
=
port
,
http_auth
=
http_auth
)
target_index
=
'short-video-irregular'
target_type
=
'2018-month-1to10'
from_index
=
'short-video-irregular'
from_type
=
'xinjingbao_3m'
fn
=
r'C:\Users\zhouyujiang\cuowu3.csv'
bulk_all_body
=
''
#target_date_list = target_type.split('-')
#target_date_start = datetime.datetime(int(target_date_list[-3]), int(target_date_list[-2]), 1)
#target_date_end = datetime.datetime(int(target_date_list[-3]), int(target_date_list[-2]) + 1, 1)
#target_ts_start = int(target_date_start.timestamp()) * 1000
#target_ts_end = int(target_date_end.timestamp()) * 1000
#print(target_ts_start)
#print(target_ts_end)
with
open
(
fn
,
'r'
,
encoding
=
'gb18030'
)
as
f
:
bulk_all_body
=
''
head
=
f
.
readline
()
head_list
=
head
.
strip
()
.
split
(
','
)
for
i
in
f
:
ll_list
=
[]
line_list
=
i
.
strip
()
.
split
(
','
)
test_dict
=
dict
(
zip
(
head_list
,
line_list
))
releaser
=
test_dict
[
'releaser'
]
platform
=
test_dict
[
'platform'
]
wirte_set
=
set
()
search_body
=
{
"query"
:
{
"bool"
:
{
"filter"
:
[
{
"term"
:
{
"platform.keyword"
:
platform
}},
{
"term"
:
{
"releaser.keyword"
:
releaser
}},
{
"exists"
:
{
"field"
:
"play_count"
}},
{
"range"
:
{
"release_time"
:
{
"gte"
:
1519833600000
,
"lt"
:
1522512000000
}}},
{
"range"
:
{
"duration"
:
{
"lte"
:
600
}}}
]
}
}
}
q3_re
=
es
.
search
(
index
=
target_index
,
doc_type
=
target_type
,
body
=
search_body
)
q3_total
=
q3_re
[
'hits'
][
'total'
]
write_into_scan
=
scan
(
client
=
es
,
query
=
search_body
,
index
=
target_index
,
doc_type
=
target_type
,
scroll
=
'5m'
,
request_timeout
=
100
)
for
one_scan
in
write_into_scan
:
have_id
=
one_scan
[
'_id'
]
wirte_set
.
add
(
have_id
)
print
(
platform
,
releaser
,
'start_have'
,
len
(
wirte_set
))
# search_body['query']['bool']['filter'].append({"range": {"fetch_time":
# {"gte": 1547539200000}}})
scan_re
=
scan
(
client
=
es
,
query
=
search_body
,
index
=
from_index
,
doc_type
=
from_type
,
scroll
=
'5m'
,
request_timeout
=
100
)
count
=
0
set_url
=
set
()
for
one_scan
in
scan_re
:
# print(one_scan)
count
=
count
+
1
line
=
one_scan
[
'_source'
]
url
=
line
[
'url'
]
platform
=
line
[
'platform'
]
if
platform
==
'腾讯新闻'
:
doc_id
=
cal_doc_id
(
platform
,
data_dict
=
line
,
doc_id_type
=
'all-time-url'
)
else
:
doc_id
=
cal_doc_id
(
platform
,
url
=
url
,
doc_id_type
=
'all-time-url'
)
# print(doc_id)
if
doc_id
not
in
wirte_set
:
wirte_set
.
add
(
doc_id
)
if
doc_id
not
in
set_url
:
set_url
.
add
(
doc_id
)
platform
=
line
[
'platform'
]
data_provider
=
'CCR'
weekly_net_inc_play_count
=
line
[
'play_count'
]
weekly_net_inc_comment_count
=
line
[
'comment_count'
]
weekly_net_inc_favorite_count
=
line
[
'favorite_count'
]
weekly_cal_base
=
'accumulate'
timestamp
=
int
(
datetime
.
datetime
.
timestamp
(
datetime
.
datetime
.
now
())
*
1000
)
line
.
update
({
'timestamp'
:
timestamp
,
# 'weekly_cal_base': weekly_cal_base,
# 'weekly_net_inc_favorite_count': weekly_net_inc_favorite_count,
# 'weekly_net_inc_comment_count': weekly_net_inc_comment_count,
# 'weekly_net_inc_play_count': weekly_net_inc_play_count,
'data_provider'
:
data_provider
})
if
'video_id'
in
line
.
keys
():
line
.
pop
(
'video_id'
)
bulk_head
=
'{"index": {"_id":"
%
s"}}'
%
doc_id
data_str
=
json
.
dumps
(
line
,
ensure_ascii
=
False
)
bulk_one_body
=
bulk_head
+
'
\n
'
+
data_str
+
'
\n
'
#
bulk_all_body
+=
bulk_one_body
if
count
%
100
==
0
:
eror_dic
=
es
.
bulk
(
index
=
target_index
,
doc_type
=
target_type
,
body
=
bulk_all_body
,
request_timeout
=
200
)
bulk_all_body
=
''
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
[
'items'
])
print
(
bulk_all_body
)
print
(
count
)
if
bulk_all_body
!=
''
:
eror_dic
=
es
.
bulk
(
body
=
bulk_all_body
,
index
=
target_index
,
doc_type
=
target_type
,
request_timeout
=
200
)
if
eror_dic
[
'errors'
]
is
True
:
print
(
eror_dic
)
bulk_all_body
=
''
print
(
platform
,
releaser
,
'end_have:'
,
len
(
wirte_set
),
'add:'
,
len
(
set_url
))
print
(
111
)
\ No newline at end of file
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment