Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
C
cybertron
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
alpha
cybertron
Commits
9b980863
Commit
9b980863
authored
Jul 25, 2019
by
lixiaofang
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add
parent
d6fa13cd
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
9 additions
and
2041 deletions
+9
-2041
workspace.xml
.idea/workspace.xml
+9
-103
.DS_Store
index_contrast/.DS_Store
+0
-0
pictorial.py
index_contrast/pictorial.py
+0
-424
tag.py
index_contrast/tag.py
+0
-381
topic.py
index_contrast/topic.py
+0
-680
user.py
index_contrast/user.py
+0
-453
No files found.
.idea/workspace.xml
View file @
9b980863
<?xml version="1.0" encoding="UTF-8"?>
<?xml version="1.0" encoding="UTF-8"?>
<project
version=
"4"
>
<project
version=
"4"
>
<component
name=
"ChangeListManager"
>
<component
name=
"ChangeListManager"
>
<list
default=
"true"
id=
"7b835479-ee84-40d6-9b97-0e9285d92119"
name=
"Default Changelist"
comment=
""
>
<list
default=
"true"
id=
"7b835479-ee84-40d6-9b97-0e9285d92119"
name=
"Default Changelist"
comment=
""
/>
<change
beforePath=
"$PROJECT_DIR$/.idea/workspace.xml"
beforeDir=
"false"
afterPath=
"$PROJECT_DIR$/.idea/workspace.xml"
afterDir=
"false"
/>
</list>
<option
name=
"EXCLUDED_CONVERTED_TO_IGNORED"
value=
"true"
/>
<option
name=
"EXCLUDED_CONVERTED_TO_IGNORED"
value=
"true"
/>
<option
name=
"SHOW_DIALOG"
value=
"false"
/>
<option
name=
"SHOW_DIALOG"
value=
"false"
/>
<option
name=
"HIGHLIGHT_CONFLICTS"
value=
"true"
/>
<option
name=
"HIGHLIGHT_CONFLICTS"
value=
"true"
/>
...
@@ -53,68 +51,10 @@
...
@@ -53,68 +51,10 @@
<file
pinned=
"false"
current-in-tab=
"true"
>
<file
pinned=
"false"
current-in-tab=
"true"
>
<entry
file=
"file://$PROJECT_DIR$/auto_request.py"
>
<entry
file=
"file://$PROJECT_DIR$/auto_request.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"
114
"
>
<state
relative-caret-position=
"
462
"
>
<caret
line=
"170"
column=
"61"
selection-start-line=
"170"
selection-start-column=
"
8
"
selection-end-line=
"170"
selection-end-column=
"61"
/>
<caret
line=
"170"
column=
"61"
selection-start-line=
"170"
selection-start-column=
"
61
"
selection-end-line=
"170"
selection-end-column=
"61"
/>
<folding>
<folding>
<element
signature=
"e#0#15#0"
expanded=
"true"
/>
<element
signature=
"e#0#15#0"
expanded=
"true"
/>
<marker
date=
"1563777637000"
expanded=
"true"
signature=
"1533:1535"
ph=
"..."
/>
</folding>
</state>
</provider>
</entry>
</file>
<file
pinned=
"false"
current-in-tab=
"false"
>
<entry
file=
"file://$PROJECT_DIR$/answer_reply2.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"764"
>
<caret
line=
"52"
column=
"43"
selection-start-line=
"52"
selection-start-column=
"43"
selection-end-line=
"52"
selection-end-column=
"43"
/>
<folding>
<element
signature=
"e#0#14#0"
expanded=
"true"
/>
</folding>
</state>
</provider>
</entry>
</file>
<file
pinned=
"false"
current-in-tab=
"false"
>
<entry
file=
"file://$PROJECT_DIR$/reply_comment1.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"75"
>
<caret
line=
"5"
selection-start-line=
"5"
selection-end-line=
"5"
selection-end-column=
"26"
/>
</state>
</provider>
</entry>
</file>
<file
pinned=
"false"
current-in-tab=
"false"
>
<entry
file=
"file://$PROJECT_DIR$/reply_comment2.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"330"
>
<caret
line=
"22"
selection-start-line=
"22"
selection-end-line=
"22"
/>
<folding>
<element
signature=
"e#0#16#0"
expanded=
"true"
/>
</folding>
</state>
</provider>
</entry>
</file>
<file
pinned=
"false"
current-in-tab=
"false"
>
<entry
file=
"file://$PROJECT_DIR$/answer_reply3.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"138"
>
<caret
line=
"46"
column=
"35"
selection-start-line=
"46"
selection-start-column=
"35"
selection-end-line=
"46"
selection-end-column=
"35"
/>
<folding>
<element
signature=
"e#0#14#0"
expanded=
"true"
/>
</folding>
</state>
</provider>
</entry>
</file>
<file
pinned=
"false"
current-in-tab=
"false"
>
<entry
file=
"file://$PROJECT_DIR$/reply_comment3.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"330"
>
<caret
line=
"22"
selection-start-line=
"22"
selection-end-line=
"22"
/>
<folding>
<element
signature=
"e#0#16#0"
expanded=
"true"
/>
</folding>
</folding>
</state>
</state>
</provider>
</provider>
...
@@ -187,11 +127,6 @@
...
@@ -187,11 +127,6 @@
<item
name=
"cybertron"
type=
"b2602c69:ProjectViewProjectNode"
/>
<item
name=
"cybertron"
type=
"b2602c69:ProjectViewProjectNode"
/>
<item
name=
"cybertron"
type=
"462c0819:PsiDirectoryNode"
/>
<item
name=
"cybertron"
type=
"462c0819:PsiDirectoryNode"
/>
</path>
</path>
<path>
<item
name=
"cybertron"
type=
"b2602c69:ProjectViewProjectNode"
/>
<item
name=
"cybertron"
type=
"462c0819:PsiDirectoryNode"
/>
<item
name=
"index_contrast"
type=
"462c0819:PsiDirectoryNode"
/>
</path>
</expand>
</expand>
<select
/>
<select
/>
</subPane>
</subPane>
...
@@ -254,8 +189,7 @@
...
@@ -254,8 +189,7 @@
<servers
/>
<servers
/>
</component>
</component>
<component
name=
"ToolWindowManager"
>
<component
name=
"ToolWindowManager"
>
<frame
x=
"-238"
y=
"-967"
width=
"1280"
height=
"727"
extended-state=
"1"
/>
<frame
x=
"32"
y=
"-1068"
width=
"1280"
height=
"727"
extended-state=
"0"
/>
<editor
active=
"true"
/>
<layout>
<layout>
<window_info
id=
"Structure"
order=
"0"
sideWeight=
"0.21188119"
side_tool=
"true"
weight=
"0.19112115"
/>
<window_info
id=
"Structure"
order=
"0"
sideWeight=
"0.21188119"
side_tool=
"true"
weight=
"0.19112115"
/>
<window_info
id=
"Favorites"
order=
"1"
side_tool=
"true"
/>
<window_info
id=
"Favorites"
order=
"1"
side_tool=
"true"
/>
...
@@ -269,7 +203,7 @@
...
@@ -269,7 +203,7 @@
<window_info
anchor=
"bottom"
id=
"Inspection"
order=
"5"
weight=
"0.4"
/>
<window_info
anchor=
"bottom"
id=
"Inspection"
order=
"5"
weight=
"0.4"
/>
<window_info
anchor=
"bottom"
id=
"TODO"
order=
"6"
/>
<window_info
anchor=
"bottom"
id=
"TODO"
order=
"6"
/>
<window_info
anchor=
"bottom"
id=
"Version Control"
order=
"7"
show_stripe_button=
"false"
/>
<window_info
anchor=
"bottom"
id=
"Version Control"
order=
"7"
show_stripe_button=
"false"
/>
<window_info
active=
"true"
anchor=
"bottom"
id=
"Terminal"
order=
"8"
sideWeight=
"0.49960285"
visible=
"true"
weight=
"0.
0
"
/>
<window_info
active=
"true"
anchor=
"bottom"
id=
"Terminal"
order=
"8"
sideWeight=
"0.49960285"
visible=
"true"
weight=
"0.
34150326
"
/>
<window_info
anchor=
"bottom"
id=
"Event Log"
order=
"9"
sideWeight=
"0.50039715"
side_tool=
"true"
weight=
"0.32890365"
/>
<window_info
anchor=
"bottom"
id=
"Event Log"
order=
"9"
sideWeight=
"0.50039715"
side_tool=
"true"
weight=
"0.32890365"
/>
<window_info
anchor=
"bottom"
id=
"Python Console"
order=
"10"
/>
<window_info
anchor=
"bottom"
id=
"Python Console"
order=
"10"
/>
<window_info
anchor=
"bottom"
id=
"DB Execution Console"
order=
"11"
/>
<window_info
anchor=
"bottom"
id=
"DB Execution Console"
order=
"11"
/>
...
@@ -495,22 +429,10 @@
...
@@ -495,22 +429,10 @@
<provider
selected=
"true"
editor-type-id=
"text-editor"
/>
<provider
selected=
"true"
editor-type-id=
"text-editor"
/>
</entry>
</entry>
<entry
file=
"file://$PROJECT_DIR$/auto_follow.py"
>
<entry
file=
"file://$PROJECT_DIR$/auto_follow.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
/>
<state>
<folding>
<element
signature=
"e#0#15#0"
expanded=
"true"
/>
</folding>
</state>
</provider>
</entry>
</entry>
<entry
file=
"file://$PROJECT_DIR$/auto_follow2.py"
>
<entry
file=
"file://$PROJECT_DIR$/auto_follow2.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
/>
<state>
<folding>
<element
signature=
"e#0#15#0"
expanded=
"true"
/>
</folding>
</state>
</provider>
</entry>
</entry>
<entry
file=
"file://$PROJECT_DIR$/test.py"
>
<entry
file=
"file://$PROJECT_DIR$/test.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
...
@@ -554,9 +476,6 @@
...
@@ -554,9 +476,6 @@
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"764"
>
<state
relative-caret-position=
"764"
>
<caret
line=
"52"
column=
"43"
selection-start-line=
"52"
selection-start-column=
"43"
selection-end-line=
"52"
selection-end-column=
"43"
/>
<caret
line=
"52"
column=
"43"
selection-start-line=
"52"
selection-start-column=
"43"
selection-end-line=
"52"
selection-end-column=
"43"
/>
<folding>
<element
signature=
"e#0#14#0"
expanded=
"true"
/>
</folding>
</state>
</state>
</provider>
</provider>
</entry>
</entry>
...
@@ -571,9 +490,6 @@
...
@@ -571,9 +490,6 @@
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"330"
>
<state
relative-caret-position=
"330"
>
<caret
line=
"22"
selection-start-line=
"22"
selection-end-line=
"22"
/>
<caret
line=
"22"
selection-start-line=
"22"
selection-end-line=
"22"
/>
<folding>
<element
signature=
"e#0#16#0"
expanded=
"true"
/>
</folding>
</state>
</state>
</provider>
</provider>
</entry>
</entry>
...
@@ -581,9 +497,6 @@
...
@@ -581,9 +497,6 @@
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"393"
>
<state
relative-caret-position=
"393"
>
<caret
line=
"79"
column=
"74"
lean-forward=
"true"
selection-start-line=
"79"
selection-start-column=
"20"
selection-end-line=
"79"
selection-end-column=
"74"
/>
<caret
line=
"79"
column=
"74"
lean-forward=
"true"
selection-start-line=
"79"
selection-start-column=
"20"
selection-end-line=
"79"
selection-end-column=
"74"
/>
<folding>
<element
signature=
"e#0#14#0"
expanded=
"true"
/>
</folding>
</state>
</state>
</provider>
</provider>
</entry>
</entry>
...
@@ -591,9 +504,6 @@
...
@@ -591,9 +504,6 @@
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"330"
>
<state
relative-caret-position=
"330"
>
<caret
line=
"22"
selection-start-line=
"22"
selection-end-line=
"22"
/>
<caret
line=
"22"
selection-start-line=
"22"
selection-end-line=
"22"
/>
<folding>
<element
signature=
"e#0#16#0"
expanded=
"true"
/>
</folding>
</state>
</state>
</provider>
</provider>
</entry>
</entry>
...
@@ -601,19 +511,15 @@
...
@@ -601,19 +511,15 @@
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"138"
>
<state
relative-caret-position=
"138"
>
<caret
line=
"46"
column=
"35"
selection-start-line=
"46"
selection-start-column=
"35"
selection-end-line=
"46"
selection-end-column=
"35"
/>
<caret
line=
"46"
column=
"35"
selection-start-line=
"46"
selection-start-column=
"35"
selection-end-line=
"46"
selection-end-column=
"35"
/>
<folding>
<element
signature=
"e#0#14#0"
expanded=
"true"
/>
</folding>
</state>
</state>
</provider>
</provider>
</entry>
</entry>
<entry
file=
"file://$PROJECT_DIR$/auto_request.py"
>
<entry
file=
"file://$PROJECT_DIR$/auto_request.py"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<provider
selected=
"true"
editor-type-id=
"text-editor"
>
<state
relative-caret-position=
"
114
"
>
<state
relative-caret-position=
"
462
"
>
<caret
line=
"170"
column=
"61"
selection-start-line=
"170"
selection-start-column=
"
8
"
selection-end-line=
"170"
selection-end-column=
"61"
/>
<caret
line=
"170"
column=
"61"
selection-start-line=
"170"
selection-start-column=
"
61
"
selection-end-line=
"170"
selection-end-column=
"61"
/>
<folding>
<folding>
<element
signature=
"e#0#15#0"
expanded=
"true"
/>
<element
signature=
"e#0#15#0"
expanded=
"true"
/>
<marker
date=
"1563777637000"
expanded=
"true"
signature=
"1533:1535"
ph=
"..."
/>
</folding>
</folding>
</state>
</state>
</provider>
</provider>
...
...
index_contrast/.DS_Store
0 → 100644
View file @
9b980863
File added
index_contrast/pictorial.py
deleted
100644 → 0
View file @
d6fa13cd
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
os
import
sys
import
logging
import
traceback
import
os.path
import
re
import
json
from
elasticsearch
import
Elasticsearch
import
elasticsearch.helpers
from
django.conf
import
settings
ES_INFO_LIST
=
[
{
"host"
:
"172.21.40.14"
,
"port"
:
9200
}
]
ES_INDEX_PREFIX
=
"gm-dbmw"
class
ESPerform
(
object
):
cli_obj
=
None
cli_info_list
=
ES_INFO_LIST
index_prefix
=
ES_INDEX_PREFIX
@classmethod
def
get_cli
(
cls
,
cli_info
=
None
):
try
:
init_args
=
{
'sniff_on_start'
:
False
,
'sniff_on_connection_fail'
:
False
,
}
es_cli_info
=
cli_info
if
cli_info
else
cls
.
cli_info_list
cls
.
cli_obj
=
Elasticsearch
(
hosts
=
es_cli_info
,
**
init_args
)
return
cls
.
cli_obj
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
get_official_index_name
(
cls
,
sub_index_name
,
index_flag
=
None
):
"""
:remark:get official es index name
:param sub_index_name:
:param index_flag:
:return:
"""
try
:
index_flag
=
None
assert
(
index_flag
in
[
None
,
"read"
,
"write"
])
official_index_name
=
cls
.
index_prefix
+
"-"
+
sub_index_name
if
index_flag
:
official_index_name
+=
"-"
+
index_flag
return
official_index_name
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
__load_mapping
(
cls
,
doc_type
):
try
:
mapping_file_path
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'trans2es'
,
'mapping'
,
'
%
s.json'
%
(
doc_type
,))
mapping
=
''
with
open
(
mapping_file_path
,
'r'
)
as
f
:
for
line
in
f
:
# 去掉注释
mapping
+=
re
.
sub
(
r'//.*$'
,
''
,
line
)
mapping
=
json
.
loads
(
mapping
)
return
mapping
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
create_index
(
cls
,
es_cli
,
sub_index_name
):
"""
:remark: create es index,alias index
:param sub_index_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
)
index_exist
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exist
:
es_cli
.
indices
.
create
(
official_index_name
)
read_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
read_alias_name
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
write_alias_name
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_index_mapping
(
cls
,
es_cli
,
sub_index_name
,
mapping_type
=
"_doc"
,
force_sync
=
False
):
"""
:remark: put index mapping
:param es_cli:
:param sub_index_name:
:param mapping_type:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exist
=
es_cli
.
indices
.
exists
(
write_alias_name
)
if
not
index_exist
and
not
force_sync
:
return
False
mapping_dict
=
cls
.
__load_mapping
(
sub_index_name
)
es_cli
.
indices
.
put_mapping
(
index
=
write_alias_name
,
body
=
mapping_dict
,
doc_type
=
mapping_type
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_indices_template
(
cls
,
es_cli
,
template_file_name
,
template_name
):
"""
:remark put index template
:param es_cli:
:param template_file_name:
:param template_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
mapping_dict
=
cls
.
__load_mapping
(
template_file_name
)
es_cli
.
indices
.
put_template
(
name
=
template_name
,
body
=
mapping_dict
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
es_helpers_bulk
(
cls
,
es_cli
,
data_list
,
sub_index_name
,
auto_create_index
=
False
,
doc_type
=
"_doc"
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
sub_index_name
if
sub_index_name
!=
"mv-alpha-tag-test-190711901"
:
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
logging
.
info
(
"get index_exists:
%
s"
%
index_exists
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,bulk data error!"
%
official_index_name
)
return
False
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
bulk_actions
=
[]
for
data
in
data_list
:
if
data
:
bulk_actions
.
append
({
'_op_type'
:
'index'
,
'_index'
:
official_index_name
,
'_type'
:
doc_type
,
'_id'
:
data
[
'id'
],
'_source'
:
data
,
})
elasticsearch
.
helpers
.
bulk
(
es_cli
,
bulk_actions
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
get_search_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
,
offset
=
0
,
size
=
10
,
auto_create_index
=
False
,
doc_type
=
"_doc"
,
aggregations_query
=
False
,
is_suggest_request
=
False
,
batch_search
=
False
,
routing
=
None
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
sub_index_name
if
sub_index_name
!=
"mv-alpha-pictorial-test-190717904"
:
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
official_index_name
)
return
None
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
logging
.
info
(
"duan add,query_body:
%
s"
%
str
(
query_body
)
.
encode
(
"utf-8"
))
if
not
batch_search
:
if
not
routing
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
)
else
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
,
routing
=
routing
)
if
is_suggest_request
:
return
res
else
:
result_dict
=
{
"total_count"
:
res
[
"hits"
][
"total"
],
"hits"
:
res
[
"hits"
][
"hits"
]
}
if
aggregations_query
:
result_dict
[
"aggregations"
]
=
res
[
"aggregations"
]
return
result_dict
else
:
res
=
es_cli
.
msearch
(
body
=
query_body
,
index
=
official_index_name
,
doc_type
=
doc_type
)
logging
.
info
(
"duan add,msearch res:
%
s"
%
str
(
res
))
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
{
"total_count"
:
0
,
"hits"
:
[]}
@classmethod
def
get_analyze_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
):
try
:
assert
(
es_cli
is
not
None
)
# official_index_name = cls.get_official_index_name(sub_index_name, "read")
index_exists
=
es_cli
.
indices
.
exists
(
sub_index_name
)
if
not
index_exists
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
sub_index_name
)
return
None
res
=
es_cli
.
indices
.
analyze
(
index
=
sub_index_name
,
body
=
query_body
)
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
if_es_node_load_high
(
cls
,
es_cli
):
try
:
assert
(
es_cli
is
not
None
)
high_num
=
0
es_nodes_list
=
list
()
es_nodes_ori_info
=
es_cli
.
cat
.
nodes
()
es_nodes_info_list
=
es_nodes_ori_info
.
split
(
"
\n
"
)
for
item
in
es_nodes_info_list
:
try
:
item_list
=
item
.
split
(
" "
)
if
len
(
item_list
)
==
11
:
cpu_load
=
item_list
[
4
]
elif
len
(
item_list
)
==
10
:
cpu_load
=
item_list
[
3
]
else
:
continue
int_cpu_load
=
int
(
cpu_load
)
if
int_cpu_load
>
60
:
high_num
+=
1
es_nodes_list
.
append
(
int_cpu_load
)
except
:
logging
.
error
(
"catch exception,item:
%
s,err_msg:
%
s"
%
(
str
(
item
),
traceback
.
format_exc
()))
return
True
if
high_num
>
3
:
logging
.
info
(
"check es_nodes_load high,cpu load:
%
s,ori_cpu_info:
%
s"
%
(
str
(
es_nodes_list
),
str
(
es_nodes_info_list
)))
return
True
else
:
return
False
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
True
# 先获取一部分数据
es_cli_obj
=
ESPerform
.
get_cli
()
q
=
{}
q
[
"query"
]
=
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"is_online"
:
True
}},
{
"term"
:
{
"is_deleted"
:
False
}},
{
"range"
:
{
"update_time"
:
{
"gte"
:
"2019-05-09T00:00:00+00:00"
}}}
]
}
}
result_dict
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"pictorial"
,
q
,
0
,
1000
)
result_dict_ids
=
list
()
for
old_item
in
result_dict
[
"hits"
]:
old_source
=
old_item
[
"_source"
]
old_id
=
old_source
[
"id"
]
# 获取新index
es_cli_obj
=
ESPerform
.
get_cli
()
q
=
{}
q
[
"query"
]
=
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"id"
:
old_id
}},
{
"term"
:
{
"is_online"
:
True
}}
]
}
}
result_dict_test
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"mv-alpha-pictorial-test-190717904"
,
q
,
0
,
1
)
for
new_item
in
result_dict_test
[
"hits"
]:
new_source
=
new_item
[
"_source"
]
new_id
=
new_source
[
"id"
]
print
(
new_id
)
print
(
"-----id-----"
)
if
old_source
[
"is_online"
]
!=
new_source
[
"is_online"
]:
print
(
old_source
[
"is_online"
])
print
(
new_source
[
"is_online"
])
print
(
"-----is_online-----"
)
if
old_source
[
"is_deleted"
]
!=
new_source
[
"is_deleted"
]:
print
(
old_source
[
"is_deleted"
])
print
(
new_source
[
"is_deleted"
])
print
(
"-----is_deleted-----"
)
if
old_source
[
"is_recommend"
]
!=
new_source
[
"is_recommend"
]:
print
(
old_source
[
"is_recommend"
])
print
(
new_source
[
"is_recommend"
])
print
(
"-----is_recommend-----"
)
if
old_source
[
"name"
]
!=
new_source
[
"name"
]:
print
(
old_source
[
"name"
])
print
(
new_source
[
"name"
])
print
(
"-----name-----"
)
if
old_source
[
"description"
]
!=
new_source
[
"description"
]:
print
(
old_source
[
"description"
])
print
(
new_source
[
"description"
])
print
(
"-----description-----"
)
if
old_source
[
"topic_num"
]
!=
new_source
[
"topic_num"
]:
print
(
old_source
[
"topic_num"
])
print
(
new_source
[
"topic_num"
])
print
(
"-----topic_num-----"
)
if
old_source
[
"creator_id"
]
!=
new_source
[
"creator_id"
]:
print
(
old_source
[
"creator_id"
])
print
(
new_source
[
"creator_id"
])
print
(
"-----creator_id-----"
)
if
old_source
[
"icon"
]
!=
new_source
[
"icon"
]:
print
(
old_source
[
"icon"
])
print
(
new_source
[
"icon"
])
print
(
"-----icon-----"
)
if
old_source
[
"high_quality_topic_num"
]
!=
new_source
[
"high_quality_topic_num"
]:
print
(
old_source
[
"high_quality_topic_num"
])
print
(
new_source
[
"high_quality_topic_num"
])
print
(
"-----high_quality_topic_num-----"
)
if
old_source
[
"create_time"
]
!=
new_source
[
"create_time"
]:
print
(
old_source
[
"create_time"
])
print
(
new_source
[
"create_time"
])
print
(
"-----create_time-----"
)
if
old_source
[
"update_time"
]
!=
new_source
[
"update_time"
]:
print
(
old_source
[
"update_time"
])
print
(
new_source
[
"update_time"
])
print
(
"-----update_time-----"
)
if
old_source
[
"tag_id"
]
!=
new_source
[
"tag_id"
]:
print
(
old_source
[
"tag_id"
])
print
(
new_source
[
"tag_id"
])
print
(
"-----tag_id-----"
)
if
old_source
[
"tag_name"
]
!=
new_source
[
"tag_name"
]:
print
(
old_source
[
"tag_name"
])
print
(
new_source
[
"tag_name"
])
print
(
"-----tag_name-----"
)
if
old_source
[
"topic_id_list"
]
!=
new_source
[
"topic_id_list"
]:
print
(
old_source
[
"topic_id_list"
])
print
(
new_source
[
"topic_id_list"
])
print
(
"-----topic_id_list-----"
)
if
old_source
[
"effective"
]
!=
new_source
[
"effective"
]:
print
(
old_source
[
"effective"
])
print
(
new_source
[
"effective"
])
print
(
"-----effective-----"
)
if
old_source
[
"offline_score"
]
!=
new_source
[
"offline_score"
]:
print
(
old_source
[
"offline_score"
])
print
(
new_source
[
"offline_score"
])
print
(
"-----offline_score-----"
)
if
old_source
[
"is_default"
]
!=
new_source
[
"is_default"
]:
print
(
old_source
[
"is_default"
])
print
(
new_source
[
"is_default"
])
print
(
"-----is_default-----"
)
if
old_source
[
"is_cover"
]
!=
new_source
[
"is_cover"
]:
print
(
old_source
[
"is_cover"
])
print
(
new_source
[
"is_cover"
])
print
(
"-----is_cover-----"
)
if
old_source
[
"topic_vote_number"
]
!=
new_source
[
"topic_vote_number"
]:
print
(
old_source
[
"topic_vote_number"
])
print
(
new_source
[
"topic_vote_number"
])
print
(
"-----topic_vote_number-----"
)
if
old_source
[
"activity_join"
]
!=
new_source
[
"activity_join"
]:
print
(
old_source
[
"activity_join"
])
print
(
new_source
[
"activity_join"
])
print
(
"-----activity_join-----"
)
index_contrast/tag.py
deleted
100644 → 0
View file @
d6fa13cd
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
os
import
sys
import
logging
import
traceback
import
os.path
import
re
import
json
from
elasticsearch
import
Elasticsearch
import
elasticsearch.helpers
ES_INFO_LIST
=
[
{
"host"
:
"172.21.40.14"
,
"port"
:
9200
}
]
ES_INDEX_PREFIX
=
"gm-dbmw"
class
ESPerform
(
object
):
cli_obj
=
None
cli_info_list
=
ES_INFO_LIST
index_prefix
=
ES_INDEX_PREFIX
@classmethod
def
get_cli
(
cls
,
cli_info
=
None
):
try
:
init_args
=
{
'sniff_on_start'
:
False
,
'sniff_on_connection_fail'
:
False
,
}
es_cli_info
=
cli_info
if
cli_info
else
cls
.
cli_info_list
cls
.
cli_obj
=
Elasticsearch
(
hosts
=
es_cli_info
,
**
init_args
)
return
cls
.
cli_obj
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
get_official_index_name
(
cls
,
sub_index_name
,
index_flag
=
None
):
"""
:remark:get official es index name
:param sub_index_name:
:param index_flag:
:return:
"""
try
:
index_flag
=
None
assert
(
index_flag
in
[
None
,
"read"
,
"write"
])
official_index_name
=
cls
.
index_prefix
+
"-"
+
sub_index_name
if
index_flag
:
official_index_name
+=
"-"
+
index_flag
return
official_index_name
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
__load_mapping
(
cls
,
doc_type
):
try
:
mapping_file_path
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'trans2es'
,
'mapping'
,
'
%
s.json'
%
(
doc_type
,))
mapping
=
''
with
open
(
mapping_file_path
,
'r'
)
as
f
:
for
line
in
f
:
# 去掉注释
mapping
+=
re
.
sub
(
r'//.*$'
,
''
,
line
)
mapping
=
json
.
loads
(
mapping
)
return
mapping
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
create_index
(
cls
,
es_cli
,
sub_index_name
):
"""
:remark: create es index,alias index
:param sub_index_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
)
index_exist
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exist
:
es_cli
.
indices
.
create
(
official_index_name
)
read_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
read_alias_name
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
write_alias_name
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_index_mapping
(
cls
,
es_cli
,
sub_index_name
,
mapping_type
=
"_doc"
,
force_sync
=
False
):
"""
:remark: put index mapping
:param es_cli:
:param sub_index_name:
:param mapping_type:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exist
=
es_cli
.
indices
.
exists
(
write_alias_name
)
if
not
index_exist
and
not
force_sync
:
return
False
mapping_dict
=
cls
.
__load_mapping
(
sub_index_name
)
es_cli
.
indices
.
put_mapping
(
index
=
write_alias_name
,
body
=
mapping_dict
,
doc_type
=
mapping_type
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_indices_template
(
cls
,
es_cli
,
template_file_name
,
template_name
):
"""
:remark put index template
:param es_cli:
:param template_file_name:
:param template_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
mapping_dict
=
cls
.
__load_mapping
(
template_file_name
)
es_cli
.
indices
.
put_template
(
name
=
template_name
,
body
=
mapping_dict
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
es_helpers_bulk
(
cls
,
es_cli
,
data_list
,
sub_index_name
,
auto_create_index
=
False
,
doc_type
=
"_doc"
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
sub_index_name
if
sub_index_name
!=
"mv-alpha-tag-test-190711901"
:
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
logging
.
info
(
"get index_exists:
%
s"
%
index_exists
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,bulk data error!"
%
official_index_name
)
return
False
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
bulk_actions
=
[]
for
data
in
data_list
:
if
data
:
bulk_actions
.
append
({
'_op_type'
:
'index'
,
'_index'
:
official_index_name
,
'_type'
:
doc_type
,
'_id'
:
data
[
'id'
],
'_source'
:
data
,
})
elasticsearch
.
helpers
.
bulk
(
es_cli
,
bulk_actions
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
get_search_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
,
offset
=
0
,
size
=
10
,
auto_create_index
=
False
,
doc_type
=
"_doc"
,
aggregations_query
=
False
,
is_suggest_request
=
False
,
batch_search
=
False
,
routing
=
None
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
sub_index_name
if
sub_index_name
!=
"mv-alpha-tag-test-190718901"
:
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
official_index_name
)
return
None
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
logging
.
info
(
"duan add,query_body:
%
s"
%
str
(
query_body
)
.
encode
(
"utf-8"
))
if
not
batch_search
:
if
not
routing
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
)
else
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
,
routing
=
routing
)
if
is_suggest_request
:
return
res
else
:
result_dict
=
{
"total_count"
:
res
[
"hits"
][
"total"
],
"hits"
:
res
[
"hits"
][
"hits"
]
}
if
aggregations_query
:
result_dict
[
"aggregations"
]
=
res
[
"aggregations"
]
return
result_dict
else
:
res
=
es_cli
.
msearch
(
body
=
query_body
,
index
=
official_index_name
,
doc_type
=
doc_type
)
logging
.
info
(
"duan add,msearch res:
%
s"
%
str
(
res
))
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
{
"total_count"
:
0
,
"hits"
:
[]}
@classmethod
def
get_analyze_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
):
try
:
assert
(
es_cli
is
not
None
)
# official_index_name = cls.get_official_index_name(sub_index_name, "read")
index_exists
=
es_cli
.
indices
.
exists
(
sub_index_name
)
if
not
index_exists
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
sub_index_name
)
return
None
res
=
es_cli
.
indices
.
analyze
(
index
=
sub_index_name
,
body
=
query_body
)
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
if_es_node_load_high
(
cls
,
es_cli
):
try
:
assert
(
es_cli
is
not
None
)
high_num
=
0
es_nodes_list
=
list
()
es_nodes_ori_info
=
es_cli
.
cat
.
nodes
()
es_nodes_info_list
=
es_nodes_ori_info
.
split
(
"
\n
"
)
for
item
in
es_nodes_info_list
:
try
:
item_list
=
item
.
split
(
" "
)
if
len
(
item_list
)
==
11
:
cpu_load
=
item_list
[
4
]
elif
len
(
item_list
)
==
10
:
cpu_load
=
item_list
[
3
]
else
:
continue
int_cpu_load
=
int
(
cpu_load
)
if
int_cpu_load
>
60
:
high_num
+=
1
es_nodes_list
.
append
(
int_cpu_load
)
except
:
logging
.
error
(
"catch exception,item:
%
s,err_msg:
%
s"
%
(
str
(
item
),
traceback
.
format_exc
()))
return
True
if
high_num
>
3
:
logging
.
info
(
"check es_nodes_load high,cpu load:
%
s,ori_cpu_info:
%
s"
%
(
str
(
es_nodes_list
),
str
(
es_nodes_info_list
)))
return
True
else
:
return
False
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
True
# 先获取一部分数据
es_cli_obj
=
ESPerform
.
get_cli
()
q
=
{}
q
[
"query"
]
=
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"is_online"
:
True
}},
{
"term"
:
{
"is_deleted"
:
False
}},
]
}
}
result_dict
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"tag"
,
q
,
0
,
10000
)
result_dict_ids
=
list
()
for
old_item
in
result_dict
[
"hits"
]:
old_source
=
old_item
[
"_source"
]
old_id
=
old_source
[
"id"
]
# 获取新index
es_cli_obj
=
ESPerform
.
get_cli
()
q
=
{}
q
[
"query"
]
=
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"id"
:
old_id
}}
]
}
}
result_dict_test
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"mv-alpha-tag-test-190718901"
,
q
,
0
,
1
)
for
new_item
in
result_dict_test
[
"hits"
]:
new_source
=
new_item
[
"_source"
]
new_id
=
new_source
[
"id"
]
print
(
new_id
)
print
(
"-----id-----"
)
if
old_source
[
"name"
]
!=
new_source
[
"name"
]:
print
(
old_source
[
"name"
])
print
(
new_source
[
"name"
])
print
(
"-----name-----"
)
if
old_source
[
"tag_type"
]
!=
new_source
[
"tag_type"
]:
print
(
old_source
[
"tag_type"
])
print
(
new_source
[
"tag_type"
])
print
(
"-----tag_type-----"
)
if
old_source
[
"collection"
]
!=
new_source
[
"collection"
]:
print
(
old_source
[
"collection"
])
print
(
new_source
[
"collection"
])
print
(
"-----collection-----"
)
if
old_source
[
"is_ai"
]
!=
new_source
[
"is_ai"
]:
print
(
old_source
[
"is_ai"
])
print
(
new_source
[
"is_ai"
])
print
(
"-----is_ai-----"
)
if
old_source
[
"is_own"
]
!=
new_source
[
"is_own"
]:
print
(
old_source
[
"is_own"
])
print
(
new_source
[
"is_own"
])
print
(
"-----is_own-----"
)
if
old_source
[
"is_online"
]
!=
new_source
[
"is_online"
]:
print
(
old_source
[
"is_online"
])
print
(
new_source
[
"is_online"
])
print
(
"-----is_online-----"
)
if
old_source
[
"suggest"
]
!=
new_source
[
"suggest"
]:
print
(
old_source
[
"suggest"
])
print
(
new_source
[
"suggest"
])
print
(
"-----suggest-----"
)
for
key
,
values
in
old_source
[
"suggest"
]
.
items
():
if
key
==
"input"
:
for
i
in
values
:
if
i
not
in
values
:
print
(
i
)
else
:
is_online
=
values
[
"is_online"
]
is_deleted
=
values
[
"is_deleted"
]
if
is_online
!=
new_source
[
"suggest"
][
"contexts"
][
"is_online"
]:
print
(
is_online
)
print
(
"----is_online--------"
)
if
is_deleted
!=
new_source
[
"suggest"
][
"contexts"
][
"is_deleted"
]:
print
(
is_deleted
)
print
(
"-------is_deleted---------"
)
if
old_source
[
"is_deleted"
]
!=
new_source
[
"is_deleted"
]:
print
(
old_source
[
"is_deleted"
])
print
(
new_source
[
"is_deleted"
])
print
(
"-----is_deleted-----"
)
if
old_source
[
"near_new_topic_num"
]
!=
new_source
[
"near_new_topic_num"
]:
print
(
old_source
[
"near_new_topic_num"
])
print
(
new_source
[
"near_new_topic_num"
])
print
(
"-----near_new_topic_num-----"
)
index_contrast/topic.py
deleted
100644 → 0
View file @
d6fa13cd
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
os
import
sys
import
logging
import
traceback
import
os.path
import
re
import
json
from
elasticsearch
import
Elasticsearch
import
elasticsearch.helpers
from
django.conf
import
settings
ES_INFO_LIST
=
[
{
"host"
:
"62.234.191.183"
,
"port"
:
9200
}
]
ES_INDEX_PREFIX
=
"gm-dbmw"
class
ESPerform
(
object
):
cli_obj
=
None
cli_info_list
=
ES_INFO_LIST
index_prefix
=
ES_INDEX_PREFIX
@classmethod
def
get_cli
(
cls
,
cli_info
=
None
):
try
:
init_args
=
{
'sniff_on_start'
:
False
,
'sniff_on_connection_fail'
:
False
,
}
es_cli_info
=
cli_info
if
cli_info
else
cls
.
cli_info_list
cls
.
cli_obj
=
Elasticsearch
(
hosts
=
es_cli_info
,
**
init_args
)
return
cls
.
cli_obj
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
get_official_index_name
(
cls
,
sub_index_name
,
index_flag
=
None
):
"""
:remark:get official es index name
:param sub_index_name:
:param index_flag:
:return:
"""
try
:
assert
(
index_flag
in
[
None
,
"read"
,
"write"
])
official_index_name
=
cls
.
index_prefix
+
"-"
+
sub_index_name
if
index_flag
:
official_index_name
+=
"-"
+
index_flag
return
official_index_name
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
__load_mapping
(
cls
,
doc_type
):
try
:
mapping_file_path
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'trans2es'
,
'mapping'
,
'
%
s.json'
%
(
doc_type
,))
mapping
=
''
with
open
(
mapping_file_path
,
'r'
)
as
f
:
for
line
in
f
:
# 去掉注释
mapping
+=
re
.
sub
(
r'//.*$'
,
''
,
line
)
mapping
=
json
.
loads
(
mapping
)
return
mapping
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
create_index
(
cls
,
es_cli
,
sub_index_name
):
"""
:remark: create es index,alias index
:param sub_index_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
)
index_exist
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exist
:
es_cli
.
indices
.
create
(
official_index_name
)
read_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
read_alias_name
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
write_alias_name
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_index_mapping
(
cls
,
es_cli
,
sub_index_name
,
mapping_type
=
"_doc"
,
force_sync
=
False
):
"""
:remark: put index mapping
:param es_cli:
:param sub_index_name:
:param mapping_type:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exist
=
es_cli
.
indices
.
exists
(
write_alias_name
)
if
not
index_exist
and
not
force_sync
:
return
False
mapping_dict
=
cls
.
__load_mapping
(
sub_index_name
)
es_cli
.
indices
.
put_mapping
(
index
=
write_alias_name
,
body
=
mapping_dict
,
doc_type
=
mapping_type
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_indices_template
(
cls
,
es_cli
,
template_file_name
,
template_name
):
"""
:remark put index template
:param es_cli:
:param template_file_name:
:param template_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
mapping_dict
=
cls
.
__load_mapping
(
template_file_name
)
es_cli
.
indices
.
put_template
(
name
=
template_name
,
body
=
mapping_dict
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
es_helpers_bulk
(
cls
,
es_cli
,
data_list
,
sub_index_name
,
auto_create_index
=
False
,
doc_type
=
"_doc"
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,bulk data error!"
%
official_index_name
)
return
False
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
bulk_actions
=
[]
if
sub_index_name
==
"topic"
or
\
sub_index_name
==
"topic-star-routing"
or
\
sub_index_name
==
"topic-high-star"
:
for
data
in
data_list
:
if
data
:
bulk_actions
.
append
({
'_op_type'
:
'index'
,
'_index'
:
official_index_name
,
'_type'
:
doc_type
,
'_id'
:
data
[
'id'
],
'_source'
:
data
,
'routing'
:
data
[
"content_level"
]
})
else
:
for
data
in
data_list
:
if
data
:
bulk_actions
.
append
({
'_op_type'
:
'index'
,
'_index'
:
official_index_name
,
'_type'
:
doc_type
,
'_id'
:
data
[
'id'
],
'_source'
:
data
,
})
elasticsearch
.
helpers
.
bulk
(
es_cli
,
bulk_actions
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
get_search_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
,
offset
=
0
,
size
=
10
,
auto_create_index
=
False
,
doc_type
=
"_doc"
,
aggregations_query
=
False
,
is_suggest_request
=
False
,
batch_search
=
False
,
routing
=
None
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
official_index_name
)
return
None
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
logging
.
info
(
"duan add,query_body:
%
s"
%
str
(
query_body
)
.
encode
(
"utf-8"
))
if
not
batch_search
:
if
not
routing
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
)
else
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
,
routing
=
routing
)
if
is_suggest_request
:
return
res
else
:
result_dict
=
{
"total_count"
:
res
[
"hits"
][
"total"
],
"hits"
:
res
[
"hits"
][
"hits"
]
}
if
aggregations_query
:
result_dict
[
"aggregations"
]
=
res
[
"aggregations"
]
return
result_dict
else
:
res
=
es_cli
.
msearch
(
body
=
query_body
,
index
=
official_index_name
,
doc_type
=
doc_type
)
logging
.
info
(
"duan add,msearch res:
%
s"
%
str
(
res
))
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
{
"total_count"
:
0
,
"hits"
:
[]}
@classmethod
def
get_analyze_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exists
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
official_index_name
)
return
None
res
=
es_cli
.
indices
.
analyze
(
index
=
official_index_name
,
body
=
query_body
)
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
if_es_node_load_high
(
cls
,
es_cli
):
try
:
assert
(
es_cli
is
not
None
)
high_num
=
0
es_nodes_list
=
list
()
es_nodes_ori_info
=
es_cli
.
cat
.
nodes
()
es_nodes_info_list
=
es_nodes_ori_info
.
split
(
"
\n
"
)
for
item
in
es_nodes_info_list
:
try
:
item_list
=
item
.
split
(
" "
)
if
len
(
item_list
)
==
11
:
cpu_load
=
item_list
[
4
]
elif
len
(
item_list
)
==
10
:
cpu_load
=
item_list
[
3
]
else
:
continue
int_cpu_load
=
int
(
cpu_load
)
if
int_cpu_load
>
60
:
high_num
+=
1
es_nodes_list
.
append
(
int_cpu_load
)
except
:
logging
.
error
(
"catch exception,item:
%
s,err_msg:
%
s"
%
(
str
(
item
),
traceback
.
format_exc
()))
return
True
if
high_num
>
3
:
logging
.
info
(
"check es_nodes_load high,cpu load:
%
s,ori_cpu_info:
%
s"
%
(
str
(
es_nodes_list
),
str
(
es_nodes_info_list
)))
return
True
else
:
return
False
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
True
@classmethod
def
get_tag_topic_list
(
cls
,
tag_id
,
have_read_topic_id_list
,
size
=
100
):
try
:
functions_list
=
list
()
# for id in tag_id:
# functions_list.append(
# {
# "filter": {"term": {"tag_list": id}},
# "weight": 1
# }
# )
functions_list
+=
[
{
"filter"
:
{
"constant_score"
:
{
"filter"
:
{
"term"
:
{
"content_level"
:
6
}}
}
},
"weight"
:
60
},
{
"filter"
:
{
"constant_score"
:
{
"filter"
:
{
"term"
:
{
"content_level"
:
5
}}
}
},
"weight"
:
50
},
{
"filter"
:
{
"constant_score"
:
{
"filter"
:
{
"term"
:
{
"content_level"
:
4
}}
}
},
"weight"
:
40
}
]
q
=
{
"query"
:
{
"function_score"
:
{
"query"
:
{
"bool"
:
{
"must"
:
[
{
"range"
:
{
"content_level"
:
{
"gte"
:
4
,
"lte"
:
6
}}},
{
"term"
:
{
"is_online"
:
True
}},
{
"term"
:
{
"is_deleted"
:
False
}},
{
"terms"
:
{
"tag_list"
:
tag_id
}}
]
}
},
"boost_mode"
:
"sum"
,
"score_mode"
:
"sum"
,
"functions"
:
functions_list
}
},
"_source"
:
{
"include"
:
[
"id"
]
},
"sort"
:
[
{
"_score"
:
{
"order"
:
"desc"
}},
{
"create_time_val"
:
{
"order"
:
"desc"
}},
# {"language_type": {"order": "asc"}},
]
}
if
len
(
have_read_topic_id_list
)
>
0
:
q
[
"query"
][
"function_score"
][
"query"
][
"bool"
][
"must_not"
]
=
{
"terms"
:
{
"id"
:
have_read_topic_id_list
}
}
result_dict
=
ESPerform
.
get_search_results
(
ESPerform
.
get_cli
(),
sub_index_name
=
"topic"
,
query_body
=
q
,
offset
=
0
,
size
=
size
,
routing
=
"4,5,6"
)
topic_id_list
=
[
item
[
"_source"
][
"id"
]
for
item
in
result_dict
[
"hits"
]]
logging
.
info
(
"topic_id_list:
%
s"
%
str
(
topic_id_list
))
return
topic_id_list
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
list
()
@classmethod
def
get_tag_topic_list_dict
(
cls
,
tag_id
,
have_read_topic_id_list
,
size
=
100
):
try
:
functions_list
=
list
()
for
id
in
tag_id
:
functions_list
.
append
(
{
"filter"
:
{
"term"
:
{
"tag_list"
:
id
}},
"weight"
:
1
}
)
# functions_list += [
# {
# "filter": {"term": {"content_level": 6}},
# "weight": 6000
# },
# {
# "filter": {"term": {"content_level": 5}},
# "weight": 5000
# },
# {
# "filter": {"term": {"content_level": 4}},
# "weight": 4000
# }
# ]
q
=
{
"query"
:
{
"function_score"
:
{
"query"
:
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"content_level"
:
6
}},
{
"term"
:
{
"is_online"
:
True
}},
{
"term"
:
{
"is_deleted"
:
False
}},
{
"terms"
:
{
"tag_list"
:
tag_id
}}
]
}
},
"boost_mode"
:
"sum"
,
"score_mode"
:
"sum"
,
"functions"
:
functions_list
}
},
"_source"
:
{
"include"
:
[
"id"
,
"user_id"
]
},
"sort"
:
[
{
"_score"
:
{
"order"
:
"desc"
}},
{
"create_time_val"
:
{
"order"
:
"desc"
}},
{
"language_type"
:
{
"order"
:
"asc"
}},
],
"collapse"
:
{
"field"
:
"user_id"
}
}
if
len
(
have_read_topic_id_list
)
>
0
:
q
[
"query"
][
"function_score"
][
"query"
][
"bool"
][
"must_not"
]
=
{
"terms"
:
{
"id"
:
have_read_topic_id_list
}
}
result_dict
=
ESPerform
.
get_search_results
(
ESPerform
.
get_cli
(),
sub_index_name
=
"topic-high-star"
,
query_body
=
q
,
offset
=
0
,
size
=
size
,
routing
=
"6"
)
topic_id_list
=
[
item
[
"_source"
][
"id"
]
for
item
in
result_dict
[
"hits"
]]
# logging.info("topic_id_list:%s" % str(topic_id_list))
# topic_id_dict = [{str(item["_source"]["id"]):item["_source"]["user_id"]} for item in result_dict["hits"]]
topic_id_dict
=
dict
()
for
item
in
result_dict
[
"hits"
]:
topic_id_dict
[
str
(
item
[
"_source"
][
"id"
])]
=
item
[
"_source"
][
"user_id"
]
logging
.
info
(
"topic_id_list:
%
s"
%
str
(
topic_id_dict
))
return
topic_id_list
,
topic_id_dict
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
list
()
# 先获取一部分数据
es_cli_obj
=
ESPerform
.
get_cli
()
content_level
=
[
6
,
5
,
4
,
3
,
2
,
1
]
for
lev
in
content_level
:
q
=
{}
q
[
"query"
]
=
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"is_online"
:
True
}},
{
"term"
:
{
"content_level"
:
lev
}},
{
"range"
:
{
"update_time"
:
{
"gte"
:
"2019-05-09T00:00:00+00:00"
}}}
]
}
}
result_dict
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"topic"
,
q
,
0
,
100
)
result_dict_ids
=
list
()
for
old_item
in
result_dict
[
"hits"
]:
old_source
=
old_item
[
"_source"
]
old_id
=
old_source
[
"id"
]
# 获取新index
es_cli_obj
=
ESPerform
.
get_cli
()
q
=
{}
q
[
"query"
]
=
{
"term"
:
{
"id"
:
old_id
,
"is_online"
:
True
}
}
result_dict_test
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"topic"
,
q
,
0
,
1
)
for
new_item
in
result_dict_test
[
"hits"
]:
new_source
=
new_item
[
"_source"
]
new_id
=
new_source
[
"id"
]
print
(
new_id
)
print
(
"-----id-----"
)
if
old_source
[
"is_shadow"
]
!=
new_source
[
"is_shadow"
]:
print
(
old_source
[
"is_shadow"
])
print
(
new_source
[
"is_shadow"
])
print
(
"-----is_shadow-----"
)
if
old_source
[
"content_level"
]
!=
new_source
[
"content_level"
]:
print
(
old_source
[
"content_level"
])
print
(
new_source
[
"content_level"
])
print
(
"-----content_level-----"
)
if
old_source
[
"is_online"
]
!=
new_source
[
"is_online"
]:
print
(
old_source
[
"is_online"
])
print
(
new_source
[
"is_online"
])
print
(
"-----is_online-----"
)
if
old_source
[
"is_deleted"
]
!=
new_source
[
"is_deleted"
]:
print
(
old_source
[
"is_deleted"
])
print
(
new_source
[
"is_deleted"
])
print
(
"-----is_deleted-----"
)
if
old_source
[
"is_recommend"
]
!=
new_source
[
"is_recommend"
]:
print
(
old_source
[
"is_recommend"
])
print
(
new_source
[
"is_recommend"
])
print
(
"-----is_recommend-----"
)
if
old_source
[
"is_complaint"
]
!=
new_source
[
"is_complaint"
]:
print
(
old_source
[
"is_complaint"
])
print
(
new_source
[
"is_complaint"
])
print
(
"-----is_complaint-----"
)
if
old_source
[
"is_excellent"
]
!=
new_source
[
"is_excellent"
]:
print
(
old_source
[
"is_excellent"
])
print
(
new_source
[
"is_excellent"
])
print
(
"-----is_excellent-----"
)
if
old_source
[
"is_operation_home_recommend"
]
!=
new_source
[
"is_operation_home_recommend"
]:
print
(
old_source
[
"is_operation_home_recommend"
])
print
(
new_source
[
"is_operation_home_recommend"
])
print
(
"-----is_operation_home_recommend-----"
)
if
old_source
[
"vote_num"
]
!=
new_source
[
"vote_num"
]:
print
(
old_source
[
"vote_num"
])
print
(
new_source
[
"vote_num"
])
print
(
"-----vote_num-----"
)
if
old_source
[
"reply_num"
]
!=
new_source
[
"reply_num"
]:
print
(
old_source
[
"reply_num"
])
print
(
new_source
[
"reply_num"
])
print
(
"-----reply_num-----"
)
if
old_source
[
"user_id"
]
!=
new_source
[
"user_id"
]:
print
(
old_source
[
"user_id"
])
print
(
new_source
[
"user_id"
])
print
(
"-----user_id-----"
)
if
old_source
[
"group_id"
]
!=
new_source
[
"group_id"
]:
print
(
old_source
[
"group_id"
])
print
(
new_source
[
"group_id"
])
print
(
"-----group_id-----"
)
if
old_source
[
"share_num"
]
!=
new_source
[
"share_num"
]:
print
(
old_source
[
"share_num"
])
print
(
new_source
[
"share_num"
])
print
(
"-----share_num-----"
)
if
old_source
[
"offline_score"
]
!=
new_source
[
"offline_score"
]:
print
(
old_source
[
"offline_score"
])
print
(
new_source
[
"offline_score"
])
print
(
"-----offline_score-----"
)
if
old_source
[
"manual_score"
]
!=
new_source
[
"manual_score"
]:
print
(
old_source
[
"manual_score"
])
print
(
new_source
[
"manual_score"
])
print
(
"-----manual_score-----"
)
if
old_source
[
"has_image"
]
!=
new_source
[
"has_image"
]:
print
(
old_source
[
"has_image"
])
print
(
new_source
[
"has_image"
])
print
(
"-----has_image-----"
)
if
old_source
[
"has_video"
]
!=
new_source
[
"has_video"
]:
print
(
old_source
[
"has_video"
])
print
(
new_source
[
"has_video"
])
print
(
"-----has_video-----"
)
if
old_source
[
"language_type"
]
!=
new_source
[
"language_type"
]:
print
(
old_source
[
"language_type"
])
print
(
new_source
[
"language_type"
])
print
(
"-----language_type-----"
)
if
old_source
[
"virtual_content_level"
]
!=
new_source
[
"virtual_content_level"
]:
print
(
old_source
[
"virtual_content_level"
])
print
(
new_source
[
"virtual_content_level"
])
print
(
"-----virtual_content_level-----"
)
if
old_source
[
"like_num_crawl"
]
!=
new_source
[
"like_num_crawl"
]:
print
(
old_source
[
"like_num_crawl"
])
print
(
new_source
[
"like_num_crawl"
])
print
(
"-----like_num_crawl-----"
)
if
old_source
[
"comment_num_crawl"
]
!=
new_source
[
"comment_num_crawl"
]:
print
(
old_source
[
"comment_num_crawl"
])
print
(
new_source
[
"comment_num_crawl"
])
print
(
"-----comment_num_crawl-----"
)
if
old_source
[
"platform"
]
!=
new_source
[
"platform"
]:
print
(
old_source
[
"platform"
])
print
(
new_source
[
"platform"
])
print
(
"-----platform-----"
)
if
old_source
[
"platform_id"
]
!=
new_source
[
"platform_id"
]:
print
(
old_source
[
"platform_id"
])
print
(
new_source
[
"platform_id"
])
print
(
"-----platform_id-----"
)
#
if
old_source
[
"drop_score"
]
!=
new_source
[
"drop_score"
]:
print
(
old_source
[
"drop_score"
])
print
(
new_source
[
"drop_score"
])
print
(
"-----drop_score-----"
)
if
old_source
[
"sort_score"
]
!=
new_source
[
"sort_score"
]:
print
(
old_source
[
"sort_score"
])
print
(
new_source
[
"sort_score"
])
print
(
"-----sort_score-----"
)
if
old_source
[
"create_time_val"
]
!=
new_source
[
"create_time_val"
]:
print
(
old_source
[
"create_time_val"
])
print
(
new_source
[
"create_time_val"
])
print
(
"-----create_time_val-----"
)
#
if
old_source
[
"update_time_val"
]
!=
new_source
[
"update_time_val"
]:
print
(
old_source
[
"update_time_val"
])
print
(
new_source
[
"update_time_val"
])
print
(
"-----update_time_val-----"
)
if
old_source
[
"total_vote_num"
]
!=
new_source
[
"total_vote_num"
]:
print
(
old_source
[
"total_vote_num"
])
print
(
new_source
[
"total_vote_num"
])
print
(
"-----total_vote_num-----"
)
if
old_source
[
"pictorial_id"
]
!=
new_source
[
"pictorial_id"
]:
print
(
old_source
[
"pictorial_id"
])
print
(
new_source
[
"pictorial_id"
])
print
(
"-----pictorial_id-----"
)
if
old_source
[
"pick_id_list"
]
!=
new_source
[
"pick_id_list"
]:
print
(
old_source
[
"pick_id_list"
])
print
(
new_source
[
"pick_id_list"
])
print
(
"-----pick_id_list-----"
)
if
old_source
[
"tag_list"
]
!=
new_source
[
"tag_list"
]:
print
(
old_source
[
"tag_list"
])
print
(
new_source
[
"tag_list"
])
print
(
"-----tag_list-----"
)
if
old_source
[
"edit_tag_list"
]
!=
new_source
[
"edit_tag_list"
]:
print
(
old_source
[
"edit_tag_list"
])
print
(
new_source
[
"edit_tag_list"
])
print
(
"-----edit_tag_list-----"
)
if
old_source
[
"tag_name_list"
]
!=
new_source
[
"tag_name_list"
]:
print
(
old_source
[
"tag_name_list"
])
print
(
new_source
[
"tag_name_list"
])
print
(
"-----tag_name_list-----"
)
if
old_source
[
"name"
]
!=
new_source
[
"name"
]:
print
(
old_source
[
"name"
])
print
(
new_source
[
"name"
])
print
(
"-----name-----"
)
if
old_source
[
"description"
]
!=
new_source
[
"description"
]:
print
(
old_source
[
"description"
])
print
(
new_source
[
"description"
])
print
(
"-----description-----"
)
if
old_source
[
"user_nick_name"
]
!=
new_source
[
"user_nick_name"
]:
print
(
old_source
[
"user_nick_name"
])
print
(
new_source
[
"user_nick_name"
])
print
(
"-----user_nick_name-----"
)
if
old_source
[
"user_nick_name_pre"
]
!=
new_source
[
"user_nick_name_pre"
]:
print
(
old_source
[
"user_nick_name_pre"
])
print
(
new_source
[
"user_nick_name_pre"
])
print
(
"-----user_nick_name_pre-----"
)
index_contrast/user.py
deleted
100644 → 0
View file @
d6fa13cd
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import
os
import
sys
import
logging
import
traceback
import
os.path
import
re
import
json
from
elasticsearch
import
Elasticsearch
import
elasticsearch.helpers
from
django.conf
import
settings
ES_INFO_LIST
=
[
{
"host"
:
"172.21.40.14"
,
"port"
:
9200
}
]
ES_INDEX_PREFIX
=
"gm-dbmw"
class
ESPerform
(
object
):
cli_obj
=
None
cli_info_list
=
ES_INFO_LIST
index_prefix
=
ES_INDEX_PREFIX
@classmethod
def
get_cli
(
cls
,
cli_info
=
None
):
try
:
init_args
=
{
'sniff_on_start'
:
False
,
'sniff_on_connection_fail'
:
False
,
}
es_cli_info
=
cli_info
if
cli_info
else
cls
.
cli_info_list
cls
.
cli_obj
=
Elasticsearch
(
hosts
=
es_cli_info
,
**
init_args
)
return
cls
.
cli_obj
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
get_official_index_name
(
cls
,
sub_index_name
,
index_flag
=
None
):
"""
:remark:get official es index name
:param sub_index_name:
:param index_flag:
:return:
"""
try
:
index_flag
=
None
assert
(
index_flag
in
[
None
,
"read"
,
"write"
])
official_index_name
=
cls
.
index_prefix
+
"-"
+
sub_index_name
if
index_flag
:
official_index_name
+=
"-"
+
index_flag
return
official_index_name
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
__load_mapping
(
cls
,
doc_type
):
try
:
mapping_file_path
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'..'
,
'trans2es'
,
'mapping'
,
'
%
s.json'
%
(
doc_type
,))
mapping
=
''
with
open
(
mapping_file_path
,
'r'
)
as
f
:
for
line
in
f
:
# 去掉注释
mapping
+=
re
.
sub
(
r'//.*$'
,
''
,
line
)
mapping
=
json
.
loads
(
mapping
)
return
mapping
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
create_index
(
cls
,
es_cli
,
sub_index_name
):
"""
:remark: create es index,alias index
:param sub_index_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
)
index_exist
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exist
:
es_cli
.
indices
.
create
(
official_index_name
)
read_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
read_alias_name
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
es_cli
.
indices
.
put_alias
(
official_index_name
,
write_alias_name
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_index_mapping
(
cls
,
es_cli
,
sub_index_name
,
mapping_type
=
"_doc"
,
force_sync
=
False
):
"""
:remark: put index mapping
:param es_cli:
:param sub_index_name:
:param mapping_type:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
write_alias_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exist
=
es_cli
.
indices
.
exists
(
write_alias_name
)
if
not
index_exist
and
not
force_sync
:
return
False
mapping_dict
=
cls
.
__load_mapping
(
sub_index_name
)
es_cli
.
indices
.
put_mapping
(
index
=
write_alias_name
,
body
=
mapping_dict
,
doc_type
=
mapping_type
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
put_indices_template
(
cls
,
es_cli
,
template_file_name
,
template_name
):
"""
:remark put index template
:param es_cli:
:param template_file_name:
:param template_name:
:return:
"""
try
:
assert
(
es_cli
is
not
None
)
mapping_dict
=
cls
.
__load_mapping
(
template_file_name
)
es_cli
.
indices
.
put_template
(
name
=
template_name
,
body
=
mapping_dict
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
es_helpers_bulk
(
cls
,
es_cli
,
data_list
,
sub_index_name
,
auto_create_index
=
False
,
doc_type
=
"_doc"
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
sub_index_name
if
sub_index_name
!=
"mv-alpha-tag-test-190711901"
:
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"write"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
logging
.
info
(
"get index_exists:
%
s"
%
index_exists
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,bulk data error!"
%
official_index_name
)
return
False
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
bulk_actions
=
[]
for
data
in
data_list
:
if
data
:
bulk_actions
.
append
({
'_op_type'
:
'index'
,
'_index'
:
official_index_name
,
'_type'
:
doc_type
,
'_id'
:
data
[
'id'
],
'_source'
:
data
,
})
elasticsearch
.
helpers
.
bulk
(
es_cli
,
bulk_actions
)
return
True
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
False
@classmethod
def
get_search_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
,
offset
=
0
,
size
=
10
,
auto_create_index
=
False
,
doc_type
=
"_doc"
,
aggregations_query
=
False
,
is_suggest_request
=
False
,
batch_search
=
False
,
routing
=
None
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
sub_index_name
if
sub_index_name
!=
"mv-alpha-user-test-190717901"
:
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
index_exists
=
es_cli
.
indices
.
exists
(
official_index_name
)
if
not
index_exists
:
if
not
auto_create_index
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
official_index_name
)
return
None
else
:
cls
.
create_index
(
es_cli
,
sub_index_name
)
cls
.
put_index_mapping
(
es_cli
,
sub_index_name
)
logging
.
info
(
"duan add,query_body:
%
s"
%
str
(
query_body
)
.
encode
(
"utf-8"
))
if
not
batch_search
:
if
not
routing
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
)
else
:
res
=
es_cli
.
search
(
index
=
official_index_name
,
doc_type
=
doc_type
,
body
=
query_body
,
from_
=
offset
,
size
=
size
,
routing
=
routing
)
if
is_suggest_request
:
return
res
else
:
result_dict
=
{
"total_count"
:
res
[
"hits"
][
"total"
],
"hits"
:
res
[
"hits"
][
"hits"
]
}
if
aggregations_query
:
result_dict
[
"aggregations"
]
=
res
[
"aggregations"
]
return
result_dict
else
:
res
=
es_cli
.
msearch
(
body
=
query_body
,
index
=
official_index_name
,
doc_type
=
doc_type
)
logging
.
info
(
"duan add,msearch res:
%
s"
%
str
(
res
))
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
{
"total_count"
:
0
,
"hits"
:
[]}
@classmethod
def
get_analyze_results
(
cls
,
es_cli
,
sub_index_name
,
query_body
):
try
:
assert
(
es_cli
is
not
None
)
official_index_name
=
cls
.
get_official_index_name
(
sub_index_name
,
"read"
)
index_exists
=
es_cli
.
indices
.
exists
(
sub_index_name
)
if
not
index_exists
:
logging
.
error
(
"index:
%
s is not existing,get_search_results error!"
%
sub_index_name
)
return
None
res
=
es_cli
.
indices
.
analyze
(
index
=
sub_index_name
,
body
=
query_body
)
return
res
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
None
@classmethod
def
if_es_node_load_high
(
cls
,
es_cli
):
try
:
assert
(
es_cli
is
not
None
)
high_num
=
0
es_nodes_list
=
list
()
es_nodes_ori_info
=
es_cli
.
cat
.
nodes
()
es_nodes_info_list
=
es_nodes_ori_info
.
split
(
"
\n
"
)
for
item
in
es_nodes_info_list
:
try
:
item_list
=
item
.
split
(
" "
)
if
len
(
item_list
)
==
11
:
cpu_load
=
item_list
[
4
]
elif
len
(
item_list
)
==
10
:
cpu_load
=
item_list
[
3
]
else
:
continue
int_cpu_load
=
int
(
cpu_load
)
if
int_cpu_load
>
60
:
high_num
+=
1
es_nodes_list
.
append
(
int_cpu_load
)
except
:
logging
.
error
(
"catch exception,item:
%
s,err_msg:
%
s"
%
(
str
(
item
),
traceback
.
format_exc
()))
return
True
if
high_num
>
3
:
logging
.
info
(
"check es_nodes_load high,cpu load:
%
s,ori_cpu_info:
%
s"
%
(
str
(
es_nodes_list
),
str
(
es_nodes_info_list
)))
return
True
else
:
return
False
except
:
logging
.
error
(
"catch exception,err_msg:
%
s"
%
traceback
.
format_exc
())
return
True
# 先获取一部分数据
es_cli_obj
=
ESPerform
.
get_cli
()
q
=
{}
q
[
"query"
]
=
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"is_online"
:
True
}},
{
"term"
:
{
"is_deleted"
:
False
}},
{
"range"
:
{
"update_time"
:
{
"gte"
:
"2019-05-09T00:00:00+00:00"
}}}
]
}
}
result_dict
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"user"
,
q
,
0
,
1000
)
result_dict_ids
=
list
()
for
old_item
in
result_dict
[
"hits"
]:
old_source
=
old_item
[
"_source"
]
old_id
=
old_source
[
"id"
]
# 获取新index
es_cli_obj
=
ESPerform
.
get_cli
()
q
=
{}
q
[
"query"
]
=
{
"bool"
:
{
"must"
:
[
{
"term"
:
{
"id"
:
old_id
}}
]
}
}
result_dict_test
=
ESPerform
.
get_search_results
(
es_cli_obj
,
"mv-alpha-user-test-190717901"
,
q
,
0
,
1
)
for
new_item
in
result_dict_test
[
"hits"
]:
new_source
=
new_item
[
"_source"
]
new_id
=
new_source
[
"id"
]
print
(
new_id
)
print
(
"-----id-----"
)
if
old_source
[
"user_id"
]
!=
new_source
[
"user_id"
]:
print
(
old_source
[
"user_id"
])
print
(
new_source
[
"user_id"
])
print
(
"-----user_id-----"
)
if
old_source
[
"nick_name"
]
!=
new_source
[
"nick_name"
]:
print
(
old_source
[
"nick_name"
])
print
(
new_source
[
"nick_name"
])
print
(
"-----nick_name-----"
)
if
old_source
[
"nick_pre"
]
!=
new_source
[
"nick_pre"
]:
print
(
old_source
[
"nick_pre"
])
print
(
new_source
[
"nick_pre"
])
print
(
"-----nick_pre-----"
)
if
old_source
[
"nick_name_pre"
]
!=
new_source
[
"nick_name_pre"
]:
print
(
old_source
[
"nick_name_pre"
])
print
(
new_source
[
"nick_name_pre"
])
print
(
"-----nick_name_pre-----"
)
if
old_source
[
"profile_pic"
]
!=
new_source
[
"profile_pic"
]:
print
(
old_source
[
"profile_pic"
])
print
(
new_source
[
"profile_pic"
])
print
(
"-----profile_pic-----"
)
if
old_source
[
"gender"
]
!=
new_source
[
"gender"
]:
print
(
old_source
[
"gender"
])
print
(
new_source
[
"gender"
])
print
(
"-----gender-----"
)
if
old_source
[
"is_deleted"
]
!=
new_source
[
"is_deleted"
]:
print
(
old_source
[
"is_deleted"
])
print
(
new_source
[
"is_deleted"
])
print
(
"-----is_deleted-----"
)
if
old_source
[
"is_online"
]
!=
new_source
[
"is_online"
]:
print
(
old_source
[
"is_online"
])
print
(
new_source
[
"is_online"
])
print
(
"-----is_online-----"
)
for
i
in
old_source
[
"tag_list"
]:
if
i
not
in
new_source
[
"tag_list"
]:
print
(
i
)
print
(
old_source
[
"tag_list"
])
print
(
new_source
[
"tag_list"
])
print
(
"-----tag_list-----"
)
for
i
in
old_source
[
"useful_tag_list"
]:
if
i
not
in
new_source
[
"useful_tag_list"
]:
print
(
i
)
print
(
old_source
[
"useful_tag_list"
])
print
(
new_source
[
"useful_tag_list"
])
print
(
"-----useful_tag_list-----"
)
if
old_source
[
"city_id"
]
!=
new_source
[
"city_id"
]:
print
(
old_source
[
"city_id"
])
print
(
new_source
[
"city_id"
])
print
(
"-----city_id-----"
)
if
old_source
[
"country_id"
]
!=
new_source
[
"country_id"
]:
print
(
old_source
[
"country_id"
])
print
(
new_source
[
"country_id"
])
print
(
"-----country_id-----"
)
if
old_source
[
"is_recommend"
]
!=
new_source
[
"is_recommend"
]:
print
(
old_source
[
"is_recommend"
])
print
(
new_source
[
"is_recommend"
])
print
(
"-----is_recommend-----"
)
if
old_source
[
"is_shadow"
]
!=
new_source
[
"is_shadow"
]:
print
(
old_source
[
"is_shadow"
])
print
(
new_source
[
"is_shadow"
])
print
(
"-----is_shadow-----"
)
if
old_source
[
"latest_topic_time_val"
]
!=
new_source
[
"latest_topic_time_val"
]:
print
(
old_source
[
"latest_topic_time_val"
])
print
(
new_source
[
"latest_topic_time_val"
])
print
(
"-----latest_topic_time_val-----"
)
for
item
in
old_source
[
"attention_user_id_list"
]:
if
item
not
in
new_source
[
"attention_user_id_list"
]:
print
(
item
)
print
(
old_source
[
"attention_user_id_list"
])
print
(
new_source
[
"attention_user_id_list"
])
print
(
"-----attention_user_id_list-----"
)
if
old_source
[
"pick_user_id_list"
]
!=
new_source
[
"pick_user_id_list"
]:
print
(
old_source
[
"pick_user_id_list"
])
print
(
new_source
[
"pick_user_id_list"
])
print
(
"-----pick_user_id_list-----"
)
for
item
in
old_source
[
"same_pictorial_user_id_list"
]:
if
item
not
in
new_source
[
"same_pictorial_user_id_list"
]:
print
(
item
)
print
(
old_source
[
"same_pictorial_user_id_list"
])
print
(
new_source
[
"same_pictorial_user_id_list"
])
print
(
"-----same_pictorial_user_id_list-----"
)
for
item
in
old_source
[
"attention_pictorial_id_list"
]:
if
item
not
in
new_source
[
"attention_pictorial_id_list"
]:
print
(
old_source
[
"attention_pictorial_id_list"
])
print
(
new_source
[
"attention_pictorial_id_list"
])
print
(
"-----attention_pictorial_id_list-----"
)
if
old_source
[
"create_time"
]
!=
new_source
[
"create_time"
]:
print
(
old_source
[
"create_time"
])
print
(
new_source
[
"create_time"
])
print
(
"-----create_time-----"
)
if
old_source
[
"update_time"
]
!=
new_source
[
"update_time"
]:
print
(
old_source
[
"update_time"
])
print
(
new_source
[
"update_time"
])
print
(
"-----update_time-----"
)
if
old_source
[
"create_time_val"
]
!=
new_source
[
"create_time_val"
]:
print
(
old_source
[
"create_time_val"
])
print
(
new_source
[
"create_time_val"
])
print
(
"-----create_time_val-----"
)
if
old_source
[
"update_time_val"
]
!=
new_source
[
"update_time_val"
]:
print
(
old_source
[
"update_time_val"
])
print
(
new_source
[
"update_time_val"
])
print
(
"-----update_time_val-----"
)
if
old_source
[
"count_topic"
]
!=
new_source
[
"count_topic"
]:
print
(
old_source
[
"count_topic"
])
print
(
new_source
[
"count_topic"
])
print
(
"-----count_topic-----"
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment