1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# -*- coding: utf-8 -*-
"""
Created on Fri Dec 21 15:45:44 2018
reindex 程序
换索引 short-video-production 到 short-video-irregular
@author: zhouyujiang
"""
import datetime
import elasticsearch
from elasticsearch import helpers
hosts = '192.168.17.11'
port = 9200
from_index = 'test2'
re_index = 'test5'
es = elasticsearch.Elasticsearch(hosts=hosts, port=port)
es_indices = elasticsearch.client.IndicesClient(es)
log_path = '/home/hanye/project_data/Python/Projects/proj-short-videos/write-data-into-es/log/'
log_fn = ('reindex_' + 'from_[' + from_index + ']_to_[' + re_index + ']_'
+ datetime.datetime.now().isoformat().replace(':', '-'))
f_log = open(log_path+log_fn, 'a', encoding='gb18030')
f_log = open(log_path+log_fn, 'a', encoding='gb18030')
print('log starts at', datetime.datetime.now(), file=f_log)
type_list = ['fyc1025']
for one_type in type_list:
print( datetime.datetime.now(),'start_reindex_[{_type}]'.format(_type=one_type), file=f_log)
resp_get_mapping = es_indices.get_mapping(index='test2',
doc_type='fyc1025')
mapping_propertity_dict = resp_get_mapping['test2']['mappings']['fyc1025']
put_mapping_resp = es_indices.put_mapping(doc_type=one_type,
body=mapping_propertity_dict,
index=re_index)
print('mapping result\n', put_mapping_resp, file=f_log)
seach_total = {
"query": {
"match_all": {}
}
}
total_re = es.search(index=from_index, doc_type=one_type, body=seach_total)
total = total_re['hits']['total']
seach_body = {
"query": {
"bool": {
"filter": {"term": {"_type": one_type}
}
}
}
}
re = helpers.reindex(client=es,source_index=from_index,target_index=re_index,query=seach_body)
if re[0] != total:
print(datetime.datetime.now(), 'reindex_error_in_[', one_type, ']', file=f_log)
print('re:',re)
print('total:', total)
else:
print(datetime.datetime.now(),'end_reindex_[{_type}]'.format(_type=one_type), file=f_log)
print(datetime.datetime.now(),'start_del_[',from_index, '][{_type}]'.format(_type=one_type), file=f_log)
re_del = es.delete_by_query(index=from_index, doc_type=one_type, body=seach_total)
del_total = re_del['total']
if del_total == re[0]:
print(datetime.datetime.now(), 'end_del_[',from_index, '][{_type}]'.format(_type=one_type), file=f_log)
print( datetime.datetime.now(),'the_reindex_[{_type}] ALL done'.format(_type=one_type), file=f_log)
else:
print('error in del[',from_index, '][{_type}]'.format(_type=one_type), file=f_log)