Commit b3d12ede authored by 段英荣's avatar 段英荣

modify sync data

parent 69e37523
......@@ -4,29 +4,34 @@ import os
import sys
import logging
import traceback
import os.path
import re
import json
from elasticsearch import Elasticsearch
class ESPerform(object):
def __init__(self):
self.cli_obj = None
self.cli_info_list = [
{
"host":"10.29.130.141",
"port":9200
}
]
self.index_prefix = "gm-dbmw"
def get_cli(self):
cli_obj = None
cli_info_list = [
{
"host": "10.29.130.141",
"port": 9200
}
]
index_prefix = "gm-dbmw"
@classmethod
def get_cli(cls):
try:
self.cli_obj = Elasticsearch(self.cli_info_list)
return self.cli_obj
cls.cli_obj = Elasticsearch(cls.cli_info_list)
return cls.cli_obj
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None
def __get_official_index_name(self,sub_index_name,index_flag=None):
@classmethod
def get_official_index_name(cls,sub_index_name,index_flag=None):
"""
:remark:get official es index name
:param sub_index_name:
......@@ -36,7 +41,7 @@ class ESPerform(object):
try:
assert (index_flag in [None,"read","write"])
official_index_name = self.index_prefix + "-" + sub_index_name
official_index_name = cls.index_prefix + "-" + sub_index_name
if index_flag:
official_index_name += "-" + index_flag
......@@ -45,7 +50,25 @@ class ESPerform(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None
def create_index(self,es_cli,sub_index_name):
@classmethod
def __load_mapping(cls,doc_type):
try:
mapping_file_path = os.path.join(
os.path.dirname(__file__),
'..', 'mapping', '%s.json' % (doc_type,))
mapping = ''
with open(mapping_file_path, 'r') as f:
for line in f:
# 去掉注释
mapping += re.sub(r'//.*$', '', line)
mapping = json.loads(mapping)
return mapping
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None
@classmethod
def create_index(cls,es_cli,sub_index_name):
"""
:remark: create es index,alias index
:param sub_index_name:
......@@ -54,17 +77,42 @@ class ESPerform(object):
try:
assert (es_cli is not None)
official_index_name = self.__get_official_index_name(sub_index_name)
official_index_name = cls.get_official_index_name(sub_index_name)
index_exist = es_cli.indices.exists(official_index_name)
if not index_exist:
es_cli.indices.create(official_index_name)
read_alias_name = self.__get_official_index_name(sub_index_name,"read")
read_alias_name = cls.get_official_index_name(sub_index_name,"read")
es_cli.indices.put_alias(official_index_name,read_alias_name)
write_alias_name = self.__get_official_index_name(sub_index_name,"write")
write_alias_name = cls.get_official_index_name(sub_index_name,"write")
es_cli.indices.put_alias(official_index_name,write_alias_name)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
@classmethod
def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc"):
"""
:remark: put index mapping
:param es_cli:
:param sub_index_name:
:param mapping_type:
:return:
"""
try:
assert (es_cli is not None)
write_alias_name = cls.get_official_index_name(sub_index_name,"write")
index_exist = es_cli.indices.exists(write_alias_name)
if not index_exist:
return False
mapping_dict = cls.__load_mapping(sub_index_name)
es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
\ No newline at end of file
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import six
import random
from django.db import models
class ITableChunk(object):
def __iter__(self):
raise NotImplementedError
def get_pk_start(self):
raise NotImplementedError
def get_pk_stop(self):
raise NotImplementedError
class TableScannerChunk(ITableChunk):
def __init__(self, data_list, pk_start, pk_stop):
self._data_list = data_list
self._pk_start = pk_start
self._pk_stop = pk_stop
def __iter__(self):
return iter(self._data_list)
def get_pk_start(self):
return self._pk_start
def get_pk_stop(self):
return self._pk_stop
class TableScannerChunkIterator(object):
def __init__(self, scanner, last_pk, chunk_size):
assert isinstance(scanner, TableScanner)
self._scanner = scanner
self._last_pk = last_pk
self._chunk_size = chunk_size
def __iter__(self):
while True:
last_pk = self._last_pk
data_list, next_last_pk = self._scanner.get_next_data_list(last_pk=last_pk, chunk_size=self._chunk_size)
self._last_pk = next_last_pk
yield TableScannerChunk(data_list=data_list, pk_start=last_pk, pk_stop=next_last_pk)
class TableScannerFlattenIterator(object):
def __init__(self, scanner, last_pk):
assert isinstance(scanner, TableScanner)
self._scanner = scanner
self._last_pk = last_pk
def __iter__(self):
while True:
data_list, next_last_pk = self._scanner.get_next_data_list(last_pk=self._last_pk)
self._last_pk = next_last_pk
for data in data_list:
yield data
class TableScanner(object):
def __init__(self, queryset):
assert isinstance(queryset, models.QuerySet)
self._model = queryset.model
self._query = queryset.query
self._db_table = self._model._meta.db_table
@property
def queryset(self):
return models.QuerySet(model=self._model, query=self._query)
@property
def model_queryset(self):
return self._model.objects
def get_random_pk(self):
count = self.model_queryset.count()
if count == 0:
return None
index = random.randrange(count)
try:
return self.model_queryset.values_list('pk', flat=True)[index]
except IndexError:
return None
def get_next_data_list(self, last_pk=None, chunk_size=1):
qs = self.queryset.order_by('pk')
if last_pk is not None:
qs = qs.filter(pk__gt=last_pk)
data_list = list(qs[:chunk_size])
if len(data_list) == 0:
next_last_pk = None
else:
next_last_pk = data_list[-1].pk
return data_list, next_last_pk
def __iter__(self):
pk = self.get_random_pk()
return iter(TableScannerFlattenIterator(scanner=self, last_pk=pk))
def chunks(self, chunk_size):
pk = self.get_random_pk()
return iter(TableScannerChunkIterator(scanner=self, last_pk=pk, chunk_size=chunk_size))
class TableSlicerChunk(ITableChunk):
"""
this object can be pickled and transferred to another process.
"""
def __init__(self, model, query, pk_start, pk_stop):
self._model = model
self._query = query
self._pk_start = pk_start
self._pk_stop = pk_stop
def __iter__(self):
data_list = self.__get_range(self._model, self._query, pk_start=self._pk_start, pk_stop=self._pk_stop)
return iter(data_list)
def get_pk_start(self):
return self._pk_start
def get_pk_stop(self):
return self._pk_stop
@classmethod
def __get_range(cls, model, query, pk_start, pk_stop):
qs = models.QuerySet(model=model, query=query)
if pk_start is not None:
qs = qs.filter(pk__gte=pk_start)
if pk_stop is not None:
qs = qs.filter(pk__lt=pk_stop)
return list(qs)
class TableSlicer(object):
def __init__(self, queryset, chunk_size=None, chunk_count=None, sep_list=None):
assert isinstance(queryset, models.QuerySet)
assert chunk_size is None or isinstance(chunk_size, six.integer_types)
assert chunk_count is None or isinstance(chunk_count, six.integer_types)
assert sep_list is None or isinstance(sep_list, list)
assert (chunk_size is not None) + (chunk_count is not None) + (sep_list is not None) == 1
if sep_list is not None:
sep_list = list(sep_list)
else:
count = queryset.count()
if chunk_size is None:
chunk_size = count / chunk_count
index_list = list(range(0, count, chunk_size))
sep_list = [
queryset.order_by('pk').values_list('pk', flat=True)[index]
for index in index_list
]
self._model = queryset.model
self._query = queryset.query
self._sep_list = [None] + sep_list + [None]
def chunks(self):
reversed_sep_list = list(reversed(self._sep_list))
for i in range(len(self._sep_list) - 1):
pk_start = reversed_sep_list[i+1]
pk_stop = reversed_sep_list[i]
yield TableSlicerChunk(model=self._model, query=self._query, pk_start=pk_start, pk_stop=pk_stop)
class TableStreamingSlicer(object):
def __init__(self, queryset, chunk_size=None):
assert isinstance(queryset, models.QuerySet)
assert chunk_size is None or isinstance(chunk_size, six.integer_types)
self._model = queryset.model
self._query = queryset.query
self._chunk_size = chunk_size
self._descend = False
def chunks(self):
last_pk = None
queryset = models.QuerySet(model=self._model, query=self._query).order_by('pk')
value_list = queryset.values_list('pk', flat=True)
while True:
current_value_list = value_list
if last_pk is not None:
current_value_list = current_value_list.filter(pk__gt=last_pk)
try:
next_last_pk = current_value_list[self._chunk_size-1]
except IndexError:
next_last_pk = None
yield TableSlicerChunk(model=self._model, query=self._query, pk_start=last_pk, pk_stop=next_last_pk)
last_pk = next_last_pk
if last_pk is None:
break
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import django.db.models
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
import six
import sys
from libs.es import ESPerform
import trans2es.models as md
from trans2es.utils import diary_transfer
from libs.table_scan import TableSlicer,TableSlicerChunk
from trans2es.type_info import get_type_info_map,TypeInfo
class Job(object):
__es = None
def __init__(self, index_prefix, type_name, chunk):
assert isinstance(index_prefix, six.string_types)
assert isinstance(type_name, six.string_types)
assert isinstance(chunk, TableSlicerChunk)
self._index_prefix = index_prefix
self._type_name = type_name
self._chunk = chunk
@classmethod
def get_es(cls):
if cls.__es is None:
cls.__es = ESPerform().get_cli()
return cls.__es
def __call__(self):
type_info = get_type_info_map()[self._type_name]
assert isinstance(type_info, TypeInfo)
result = type_info.insert_table_chunk(
index_prefix=self._index_prefix,
table_chunk=self._chunk,
es=self.get_es(),
)
class Command(BaseCommand):
args = ''
help = 'dump data to elasticsearch, parallel'
from optparse import make_option
option_list = BaseCommand.option_list + (
make_option('-t', '--type', dest='type_list', action='append', help='type name to dump data to elasticsearch', metavar='TYPE'),
make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'),
make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
)
def handle(self, *args, **options):
try:
index_prefix = "gm-dbmw"
#get_data_func = diary_transfer.DiaryTransfer
#query_set = django.db.models.QuerySet(model=md.Diary,query=md.Diary.objects.all().query)
type_name_list = get_type_info_map.keys()
for type_name in type_name_list:
type_info = get_type_info_map()[type_name]
query_set = type_info.queryset
slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
for chunk in slicer.chunks():
job = Job(
index_prefix=index_prefix,
type_name=type_name,
chunk=chunk,
)
job()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -15,6 +15,7 @@ class Command(BaseCommand):
es_obj = ESPerform()
es_cli = es_obj.get_cli()
es_obj.create_index(es_cli,"test")
es_obj.create_index(es_cli,"diary")
es_obj.put_index_mapping(es_cli,"diary")
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
from libs.es import ESPerform
from django.db import models
class Diary(models.Model):
class Meta:
verbose_name = u'日记'
verbose_name_plural = u'日记'
db_table = 'topic'
app_label = 'api'
group_id = models.IntegerField(verbose_name='日记ID')
create_time = models.DateTimeField(verbose_name='日记创建时间')
update_time = models.DateTimeField(verbose_name='日记更新时间')
name = models.CharField(verbose_name='日记名称')
user_id = models.IntegerField(verbose_name='用户ID')
description = models.CharField(verbose_name='日记本描述')
content = models.CharField(verbose_name='日记本内容')
share_num = models.IntegerField(verbose_name='')
vote_num = models.IntegerField(verbose_name='点赞数')
reply_num = models.IntegerField(verbose_name='回复数')
cover = models.CharField(verbose_name='')
is_online = models.BooleanField(verbose_name='是否上线')
is_deleted = models.BooleanField(verbose_name='是否删除')
content_level = models.CharField(verbose_name='内容等级')
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import time
import datetime
import logging
import traceback
import django.db.models
from django.conf import settings
import elasticsearch
import elasticsearch.helpers
import sys
import trans2es.models as md
from trans2es.utils.diary_transfer import DiaryTransfer
from libs.es import ESPerform
__es = None
def get_elasticsearch_instance():
global __es
if __es is None:
__es = ESPerform.get_cli()
return __es
def get_es_list_by_type(es_type):
return [get_elasticsearch_instance()]
class TypeInfo(object):
def __init__(
self,
name,
type,
model,
query_deferred,
get_data_func,
bulk_insert_chunk_size,
round_insert_chunk_size,
round_insert_period,
batch_get_data_func=None, # receive a list of pks, not instance
logic_database_id=None,
):
self.name = name
self.type = type
self.model = model
self.query_deferred = query_deferred
self.get_data_func = get_data_func
self.batch_get_data_func = batch_get_data_func
self.pk_blacklist = ()
self.bulk_insert_chunk_size = bulk_insert_chunk_size
self.round_insert_chunk_size = round_insert_chunk_size
self.round_insert_period = round_insert_period
self.logic_database_id = logic_database_id
@property
def query(self):
return self.query_deferred()
@property
def queryset(self):
return django.db.models.QuerySet(model=self.model, query=self.query)
@property
def pk_blacklist(self):
return self.__pk_blacklist
@pk_blacklist.setter
def pk_blacklist(self, value):
self.__pk_blacklist = frozenset(value)
def bulk_get_data(self, instance_iterable):
data_list = []
if self.batch_get_data_func:
_pk_list = [getattr(instance, 'pk', None) for instance in instance_iterable]
not_found_pk_list = []
blacklisted_pk_list = []
pk_list = []
for pk in _pk_list:
if pk is None:
not_found_pk_list.append(pk)
elif pk in self.__pk_blacklist:
blacklisted_pk_list.append(pk)
else:
pk_list.append(pk)
if not_found_pk_list:
logging.exception('those pks not found for name={}, doc_type={}, pk_list={}'.format(
self.name,
self.type,
str(not_found_pk_list),
))
if blacklisted_pk_list:
logging.info('those pks are in blacklist for name={}, doc_type={}, pk_list={}'.format(
self.name,
self.type,
str(blacklisted_pk_list),
))
try:
data_list = self.batch_get_data_func(pk_list)
except Exception:
traceback.print_exc()
logging.exception('bulk_get_data for name={}, doc_type={}, pk_list={}'.format(
self.name,
self.type,
str(pk_list),
))
else:
for instance in instance_iterable:
pk = getattr(instance, 'pk', None)
try:
if pk is None:
raise Exception('pk not found')
if pk in self.__pk_blacklist:
logging.info('bulk_get_data for name={}, doc_type={}, pk={}: ignore blacklisted pk'.format(
self.name,
self.type,
pk,
))
continue
data = self.get_data_func(instance)
except Exception:
traceback.print_exc()
logging.exception('bulk_get_data for name={}, doc_type={}, pk={}'.format(
self.name,
self.type,
pk,
))
else:
data_list.append(data)
return data_list
def elasticsearch_bulk_insert_data(self, index_prefix, data_list, es=None):
if es is None:
es = get_es_list_by_type(self.type)
if not isinstance(es, (list, tuple,)):
es = [es]
index = ESPerform.get_official_index_name(sub_index_name=index_prefix,index_flag="write")
bulk_actions = []
for data in data_list:
bulk_actions.append({
'_op_type': 'index',
'_index': index,
'_type': self.type,
'_id': data['id'],
'_source': data,
})
es_result = None
if bulk_actions:
for t in es:
try:
es_result = elasticsearch.helpers.bulk(client=t, actions=bulk_actions)
except Exception as e:
traceback.print_exc()
es_result = 'error'
return es_result
def elasticsearch_bulk_insert(self, index_prefix, instance_iterable, es=None):
data_list = self.bulk_get_data(instance_iterable)
return self.elasticsearch_bulk_insert_data(
index_prefix=index_prefix,
data_list=data_list,
es=es,
)
def insert_table_by_pk_list(self, index_prefix, pk_list, es=None, use_batch_query_set=False):
if use_batch_query_set:
qs = self.queryset
else:
qs = self.model.objects.all()
instance_list = qs.filter(pk__in=pk_list)
data_list = self.bulk_get_data(instance_list)
self.elasticsearch_bulk_insert_data(
index_prefix=index_prefix,
data_list=data_list,
es=es,
)
def insert_table_chunk(self, index_prefix, table_chunk, es=None):
start_clock = time.clock()
start_time = time.time()
instance_list = list(table_chunk)
stage_1_time = time.time()
data_list = self.bulk_get_data(instance_list)
stage_2_time = time.time()
es_result = self.elasticsearch_bulk_insert_data(
index_prefix=index_prefix,
data_list=data_list,
es=es,
)
stage_3_time = time.time()
end_clock = time.clock()
return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
'{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
'{response}').format(
datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
index_prefix=index_prefix,
type_name=self.name,
pk_start=repr(table_chunk.get_pk_start()),
pk_stop=repr(table_chunk.get_pk_stop()),
count=len(instance_list),
stage_1_duration=stage_1_time - start_time,
stage_2_duration=stage_2_time - stage_1_time,
stage_3_duration=stage_3_time - stage_2_time,
clock_duration=end_clock - start_clock,
response=es_result,
)
_get_type_info_map_result = None
def get_type_info_map():
global _get_type_info_map_result
if _get_type_info_map_result:
return _get_type_info_map_result
type_info_list = [
TypeInfo(
name='topic', # 日记
type='topic',
model=md.Diary,
query_deferred=lambda: md.Diary.objects.all().query,
get_data_func=DiaryTransfer.get_diary_data,
bulk_insert_chunk_size=100,
round_insert_chunk_size=5,
round_insert_period=2,
)
]
try:
pk_blacklist_map = settings.DATA_SYNC.get('pk_blacklist', {})
assert isinstance(pk_blacklist_map, dict)
except (ImportError, AttributeError):
traceback.print_exc()
print('fallback to empty pk_blacklist_map', file=sys.stderr)
pk_blacklist_map = {}
for type_info in type_info_list:
type_info.pk_blacklist = pk_blacklist_map.get(type_info.name, ())
print('loaded pk_blacklist for {}: {}'.format(
type_info.name,
sorted(list(type_info.pk_blacklist)),
), file=sys.stderr)
type_info_map = {
type_info.name: type_info
for type_info in type_info_list
}
_get_type_info_map_result = type_info_map
return type_info_map
......@@ -10,7 +10,8 @@ class DiaryTransfer(object):
def __init__(self):
pass
def get_diary_data(self,instance):
@classmethod
def get_diary_data(cls,instance):
try:
res = dict()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment