Commit 0e23730e authored by 段英荣's avatar 段英荣

Merge branch 'dev' into 'master'

Dev

See merge request !2
parents 70480519 4e7b2961
......@@ -9,17 +9,13 @@ import re
import json
from elasticsearch import Elasticsearch
import elasticsearch.helpers
from django.conf import settings
class ESPerform(object):
cli_obj = None
cli_info_list = [
{
"host": "10.29.130.141",
"port": 9200
}
]
index_prefix = "gm-dbmw"
cli_info_list = settings.ES_INFO_LIST
index_prefix = settings.ES_INDEX_PREFIX
@classmethod
......@@ -118,6 +114,26 @@ class ESPerform(object):
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
@classmethod
def put_indices_template(cls,es_cli,template_file_name, template_name):
"""
:remark put index template
:param es_cli:
:param template_file_name:
:param template_name:
:return:
"""
try:
assert (es_cli is not None)
mapping_dict = cls.__load_mapping(template_file_name)
es_cli.indices.put_template(name=template_name,body=mapping_dict)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
@classmethod
def es_helpers_bulk(cls,es_cli,data_list,sub_index_name,auto_create_index=False,doc_type="_doc"):
try:
......
......@@ -30,7 +30,7 @@ ALLOWED_HOSTS = []
# Application definition
BROKER_URL = "redis://127.0.0.1:6379/0"
BROKER_URL = "redis://127.0.0.1:6379/2"
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ROUTES = ['physical.celery_task_router.CeleryTaskRouter']
......
......@@ -26,6 +26,10 @@ def recommend_user(self_user_id,interesting_user_id,offset=0,size=10):
#获取关注用户列表
(self_attention_user_id_list,recursion_attention_user_id_list) = UserUtils.get_attention_user_list([self_user_id,interesting_user_id],self_user_id)
#去除自身及感兴趣的用户ID
self_attention_user_id_list.append(self_user_id)
self_attention_user_id_list.append(interesting_user_id)
recommend_user_list = UserUtils.get_recommend_user_list(self_attention_user_id_list,recursion_attention_user_id_list,offset,size)
return recommend_user_list
......
......@@ -58,26 +58,31 @@ class Command(BaseCommand):
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
)
def __sync_data_by_type(self, type_name):
try:
type_info = get_type_info_map()[type_name]
query_set = type_info.queryset
slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
for chunk in slicer.chunks():
job = Job(
sub_index_name=type_name,
type_name=type_name,
chunk=chunk,
)
job()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def handle(self, *args, **options):
try:
type_name_list = get_type_info_map().keys()
for type_name in type_name_list:
if len(options["type"]) and type_name!=options["type"]:
logging.warning("type_name:%s can not need to execute!" % type_name)
continue
type_info = get_type_info_map()[type_name]
query_set = type_info.queryset
slicer = TableSlicer(queryset=query_set, chunk_size=type_info.bulk_insert_chunk_size)
for chunk in slicer.chunks():
job = Job(
sub_index_name=type_name,
type_name=type_name,
chunk=chunk,
)
job()
if len(options["type"]):
if options["type"] == "all" or type_name==options["type"]:
logging.info("begin sync [%s] data to es!" % type_name)
self.__sync_data_by_type(type_name)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -6,16 +6,53 @@ from django.core.management.base import BaseCommand, CommandError
import traceback
import logging
from libs.es import ESPerform
from trans2es.type_info import get_type_info_map,TypeInfo
class Command(BaseCommand):
args = ''
help = 'dump mapping to elasticsearch'
from optparse import make_option
option_list = BaseCommand.option_list + (
make_option('-t', '--type', dest='type', help='type name to dump data to elasticsearch', metavar='TYPE',default=''),
make_option('-T', '--indices_template', dest='indices_template', help='index template name to dump data to elasticsearch', metavar='TYPE',
default=''),
make_option('-i', '--index-prefix', dest='index_prefix', help='index name to dump data to elasticsearch', metavar='INDEX_PREFIX'),
make_option('-p', '--parallel', dest='parallel', help='parallel process count', metavar='PARALLEL'),
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
)
def handle(self, *args, **options):
try:
es_obj = ESPerform()
es_cli = es_obj.get_cli()
es_cli = ESPerform.get_cli()
type_name_list = get_type_info_map().keys()
for type_name in type_name_list:
if len(options["type"]):
if options["type"] == "all" or type_name==options["type"]:
official_index_name = ESPerform.get_official_index_name(type_name)
index_exists = es_cli.indices.exists(official_index_name)
if not index_exists:
logging.info("begin create [%s] index and mapping!" % type_name)
ESPerform.create_index(es_cli,type_name)
ESPerform.put_index_mapping(es_cli,type_name)
else:
logging.warning("index:[%s] has already existing!" % type_name)
es_obj.create_index(es_cli,"topic")
es_obj.put_index_mapping(es_cli=es_cli,sub_index_name="topic")
if len(options["indices_template"]):
template_file_name = options["indices_template"]
if ESPerform.put_indices_template(es_cli=es_cli,template_file_name=template_file_name,template_name=template_file_name):
logging.info("put indices template suc!")
else:
logging.error("put indices template err!")
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
{
"index_patterns": ["*"],
"settings":{
"number_of_shards": 9,
"number_of_replicas": 3,
"index":{
"analysis":{
"filter":{
"gm_synonym_ik_smart":{
"type": "synonym",
"synonyms_path": "analysis/synonym.txt"
}
},
"analyzer":{
"gm_default_search":{
"tokenizer": "ik_smart"
},
"gm_default_index":{
"tokenizer": "ik_max_word",
"filter": ["gm_synonym_ik_smart"]
}
}
}
}
}
}
\ No newline at end of file
......@@ -8,7 +8,7 @@
"gender":{"type":"integer"},
"is_online":{"type":"boolean"},//是否上线
"tag_list":{"type":"long"},//标签属性
"city_id":{"type":"long"},
"city_id":{"type":"text"},
"country_id":{"type":"long"},
"is_recommend":{"type":"boolean"},//是否运营推荐用户
"is_shadow":{"type":"boolean"},//是否马甲用户
......
......@@ -20,7 +20,7 @@ class Celebrity(models.Model):
name = models.CharField(verbose_name=u'名称', max_length=300, default='')
portrait = models.CharField(verbose_name=u'肖像', max_length=100, default='')
gender = models.SmallIntegerField(verbose_name=u'性别')
city_id = models.IntegerField(verbose_name=u'城市id')
city_id = models.CharField(verbose_name=u'城市id')
desc = models.IntegerField(verbose_name='')
is_online = models.BooleanField(verbose_name="是否上线")
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
......
......@@ -29,7 +29,7 @@ class AccountUserTag(models.Model):
id = models.IntegerField(primary_key=True,verbose_name=u"主键ID")
tag_id = models.IntegerField(verbose_name=u"标签ID")
user_id = models.IntegerField(verbose_name=u"用户ID")
user = models.IntegerField(verbose_name=u"用户ID")
is_deleted = models.BooleanField(verbose_name=u"是否删除")
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
......
......@@ -27,7 +27,7 @@ class User(models.Model):
nick_name = models.CharField(verbose_name=u'昵称', max_length=255, default='')
profile_pic = models.CharField(verbose_name=u'头像', max_length=300)
gender = models.SmallIntegerField(verbose_name=u'性别')
city_id = models.IntegerField(verbose_name=u'城市id')
city_id = models.CharField(verbose_name=u'城市id')
country_id = models.IntegerField(verbose_name='国家id')
is_online = models.BooleanField(verbose_name="是否上线")
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
......@@ -140,7 +140,7 @@ class User(models.Model):
try:
user_tag_id_list = list()
query_results = AccountUserTag.objects.filter(user_id=self.user_id)
query_results = AccountUserTag.objects.filter(user=self.user_id)
for item in query_results:
user_tag_id_list.append(item.tag_id)
......
......@@ -258,17 +258,17 @@ def get_type_info_map():
round_insert_chunk_size=5,
round_insert_period=2,
),
TypeInfo(
name="pick_celebrity", # 打榜明星
type="pick_celebrity",
model=pick_celebrity.PickCelebrity,
# query_deferred=lambda:user.User.objects.all().query,
query_deferred=lambda: pick_celebrity.PickCelebrity.objects.all().query,
get_data_func=PickCelebrityTransfer.get_pick_celebrity_data,
bulk_insert_chunk_size=100,
round_insert_chunk_size=5,
round_insert_period=2,
),
# TypeInfo(
# name="pick_celebrity", # 打榜明星
# type="pick_celebrity",
# model=pick_celebrity.PickCelebrity,
# # query_deferred=lambda:user.User.objects.all().query,
# query_deferred=lambda: pick_celebrity.PickCelebrity.objects.all().query,
# get_data_func=PickCelebrityTransfer.get_pick_celebrity_data,
# bulk_insert_chunk_size=100,
# round_insert_chunk_size=5,
# round_insert_period=2,
# ),
TypeInfo(
name="celebrity", # 明星
type="celebrity",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment