Commit 165eded0 authored by lixiaofang's avatar lixiaofang

add

parents 79122e95 34a89a76
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<DBN-PSQL>
<case-options enabled="false">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false" />
</DBN-PSQL>
<DBN-SQL>
<case-options enabled="false">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false">
<option name="STATEMENT_SPACING" value="one_line" />
<option name="CLAUSE_CHOP_DOWN" value="chop_down_if_statement_long" />
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
</formatting-settings>
</DBN-SQL>
<DBN-PSQL>
<case-options enabled="false">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false" />
</DBN-PSQL>
<DBN-SQL>
<case-options enabled="false">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false">
<option name="STATEMENT_SPACING" value="one_line" />
<option name="CLAUSE_CHOP_DOWN" value="chop_down_if_statement_long" />
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
</formatting-settings>
</DBN-SQL>
</code_scheme>
</component>
\ No newline at end of file
This diff is collapsed.
......@@ -9,11 +9,13 @@
<config name="initializer_list">
<element value="physical.django_init"/>
<element value="search.views.topic"/>
<element value="search.views.business_topic"/>
<element value="search.views.pick"/>
<element value="search.views.group"/>
<element value="search.views.user"/>
<element value="search.views.tag"/>
<element value="search.views.contrast_similar"/>
<element value="injection.data_sync.tasks"/>
<element value="search.views.contrast_similar"/>
</config>
</gm_rpcd_config>
......@@ -4,14 +4,15 @@ from celery import shared_task
from django.conf import settings
from django.core import serializers
from trans2es.type_info import get_type_info_map
#from rpc.all import get_rpc_remote_invoker
# from rpc.all import get_rpc_remote_invoker
from libs.es import ESPerform
import logging
import traceback
from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar,UserSimilarScore
import json
@shared_task
def write_to_es(es_type, pk_list, use_batch_query_set=False):
try:
......@@ -20,6 +21,7 @@ def write_to_es(es_type, pk_list, use_batch_query_set=False):
type_info = type_info_map[es_type]
logging.info("duan add,es_type:%s" % str(es_type))
logging.info("get es_type:%s"%es_type)
type_info.insert_table_by_pk_list(
sub_index_name=es_type,
pk_list=pk_list,
......@@ -33,7 +35,8 @@ def write_to_es(es_type, pk_list, use_batch_query_set=False):
@shared_task
def sync_face_similar_data_to_redis():
try:
result_items = FaceUserContrastSimilar.objects.filter(is_online=True,is_deleted=False).distinct().values("participant_user_id").values_list("participant_user_id",flat=True)
result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False).distinct().values(
"participant_user_id").values_list("participant_user_id", flat=True)
logging.info("duan add,begin sync_face_similar_data_to_redis!")
......@@ -41,19 +44,45 @@ def sync_face_similar_data_to_redis():
for participant_user_id in result_items:
redis_key = redis_key_prefix + str(participant_user_id)
similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True,is_deleted=False,participant_user_id=participant_user_id,similarity__gt=0.4).order_by("-similarity").limit(100)
similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False,
participant_user_id=participant_user_id,
similarity__gt=0.3).order_by(
"-similarity")
item_list = list()
for item in similar_result_items:
item_list.append(
{
"contrast_user_id":item.contrast_user_id,
"similarity":item.similarity
"contrast_user_id": item.contrast_user_id,
"similarity": item.similarity
}
)
redis_client.set(redis_key,json.dumps(item_list))
redis_client.set(redis_key, json.dumps(item_list))
logging.info("duan add,participant_user_id:%d set data done!" % participant_user_id)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
@shared_task
def sync_user_similar_score():
try:
results_items = UserSimilarScore.objects.filter(is_deleted=False).distinct().values("user_id").values_list("user_id",flat=True)
redis_key_prefix = "physical:user_similar_score:user_id:"
logging.info("duan add,begin sync user_similar_score!")
for user_id in results_items:
redis_key = redis_key_prefix + str(user_id)
similar_results_items = UserSimilarScore.objects.filter(is_deleted=False,user_id=user_id).order_by("-score")
item_list = list()
for item in similar_results_items:
contrast_user_id = item.contrast_user_id
score = item.score
item_list.append(
[contrast_user_id,score]
)
redis_client.set(redis_key, json.dumps(item_list))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -94,7 +94,7 @@ class ESPerform(object):
return False
@classmethod
def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc"):
def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc",force_sync=False):
"""
:remark: put index mapping
:param es_cli:
......@@ -107,10 +107,14 @@ class ESPerform(object):
write_alias_name = cls.get_official_index_name(sub_index_name,"write")
index_exist = es_cli.indices.exists(write_alias_name)
if not index_exist:
if not index_exist and not force_sync:
return False
mapping_dict = cls.__load_mapping(sub_index_name)
logging.info("get write_alias_name:%s"%write_alias_name)
logging.info("get mapping_dict:%s"%mapping_dict)
logging.info("get mapping_type:%s"%mapping_type)
es_cli.indices.put_mapping(index=write_alias_name,body=mapping_dict,doc_type=mapping_type)
return True
......@@ -143,7 +147,7 @@ class ESPerform(object):
try:
assert (es_cli is not None)
official_index_name = cls.get_official_index_name(sub_index_name)
official_index_name = cls.get_official_index_name(sub_index_name, "write")
index_exists = es_cli.indices.exists(official_index_name)
if not index_exists:
if not auto_create_index:
......@@ -208,3 +212,34 @@ class ESPerform(object):
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"total_count":0,"hits":[]}
@classmethod
def if_es_node_load_high(cls, es_cli):
try:
assert (es_cli is not None)
high_num = 0
es_nodes_list = list()
es_nodes_ori_info = es_cli.cat.nodes()
es_nodes_info_list = es_nodes_ori_info.split("\n")
for item in es_nodes_info_list:
try:
item_list = item.split(" ")
if len(item_list)>4:
cpu_load = item_list[3]
if int(cpu_load) > 60:
high_num += 1
es_nodes_list.append(int(cpu_load))
except:
logging.error("catch exception,item:%s,err_msg:%s" % (str(item),traceback.format_exc()))
return True
if high_num > 3:
logging.info("check es_nodes_load high,cpu load:%s" % str(es_nodes_info_list))
return True
else:
return False
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return True
......@@ -5,6 +5,8 @@ from __future__ import unicode_literals, print_function, absolute_import
import six
import random
from django.db import models
import logging
import traceback
class ITableChunk(object):
......@@ -147,36 +149,47 @@ class TableSlicerChunk(ITableChunk):
class TableSlicer(object):
def __init__(self, queryset, chunk_size=None, chunk_count=None, sep_list=None):
assert isinstance(queryset, models.QuerySet)
assert chunk_size is None or isinstance(chunk_size, six.integer_types)
assert chunk_count is None or isinstance(chunk_count, six.integer_types)
assert sep_list is None or isinstance(sep_list, list)
try:
assert isinstance(queryset, models.QuerySet)
assert (chunk_size is not None) + (chunk_count is not None) + (sep_list is not None) == 1
assert chunk_size is None or isinstance(chunk_size, six.integer_types)
if sep_list is not None:
sep_list = list(sep_list)
else:
count = queryset.count()
if chunk_size is None:
chunk_size = count / chunk_count
index_list = list(range(0, count, chunk_size))
sep_list = [
queryset.order_by('pk').values_list('pk', flat=True)[index]
for index in index_list
]
assert chunk_count is None or isinstance(chunk_count, six.integer_types)
self._model = queryset.model
self._query = queryset.query
self._sep_list = [None] + sep_list + [None]
assert sep_list is None or isinstance(sep_list, list)
assert (chunk_size is not None) + (chunk_count is not None) + (sep_list is not None) == 1
if sep_list is not None:
sep_list = list(sep_list)
else:
count = queryset.count()
if chunk_size is None:
chunk_size = count / chunk_count
index_list = list(range(0, count, chunk_size))
sep_list = [
queryset.order_by('pk').values_list('pk', flat=True)[index]
for index in index_list
]
self._model = queryset.model
self._query = queryset.query
self._sep_list = [None] + sep_list + [None]
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def chunks(self):
reversed_sep_list = list(reversed(self._sep_list))
for i in range(len(self._sep_list) - 1):
pk_start = reversed_sep_list[i+1]
pk_stop = reversed_sep_list[i]
yield TableSlicerChunk(model=self._model, query=self._query, pk_start=pk_start, pk_stop=pk_stop)
try:
reversed_sep_list = list(reversed(self._sep_list))
logging.info("duan add,reversed_sep_list:%d" % (len(self._sep_list) - 1))
for i in range(len(self._sep_list) - 1):
pk_start = reversed_sep_list[i + 1]
pk_stop = reversed_sep_list[i]
yield TableSlicerChunk(model=self._model, query=self._query, pk_start=pk_start, pk_stop=pk_stop)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
class TableStreamingSlicer(object):
......
from django.contrib import admin
# Register your models here.
from django.db import models
# Create your models here.
from django.test import TestCase
# Create your tests here.
from django.shortcuts import render
# Create your views here.
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from kafka import KafkaConsumer
from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
from trans2es.models.tag import TopicTag
import traceback
from django.conf import settings
class KafkaManager(object):
consumser_obj = None
@classmethod
def get_kafka_consumer_ins(cls, topic_name=None):
if not cls.consumser_obj:
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=settings.KAFKA_BROKER_LIST)
# cls.consumser_obj.subscribe([topic_name])
return cls.consumser_obj
class CollectData(object):
def __init__(self):
self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
# 默认
self.user_feature = [0,1]
def _get_user_linucb_info(self, device_id):
try:
redis_key = self.linucb_matrix_redis_prefix + str(device_id)
# dict的key为标签ID,value为4个矩阵
redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)
return redis_linucb_tag_data_dict
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict()
def update_recommend_tag_list(self, device_id,user_feature=None):
try:
recommend_tag_list = list()
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
if len(redis_linucb_tag_data_dict) == 0:
recommend_tag_list = LinUCB.get_default_tag_list()
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
else:
user_feature = user_feature if user_feature else self.user_feature
recommend_tag_list = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
logging.info("duan add,device_id:%s,recommend_tag_list:%s" % (str(device_id), str(recommend_tag_list)))
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
# Todo:设置过期时间,调研set是否支持
redis_client.expire(tag_recommend_redis_key, 7*24*60*60)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None):
try:
user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client)
except:
logging.error("update_user_linucb_tag_info error!")
return False
def consume_data_from_kafka(self,topic_name=None):
try:
user_feature = [1,1]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
while True:
msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
for msg_key in msg_dict:
consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg:
try:
logging.info(ori_msg)
raw_val_dict = json.loads(ori_msg.value)
if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
device_id = raw_val_dict["device"]["device_id"]
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
str(device_id), str(topic_id), str(tag_list)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
else:
exposure_cards_list = list()
device_id = raw_val_dict["device"]["device_id"]
exposure_topic_id_list = list()
for item in exposure_cards_list:
if "card_id" not in item:
continue
exposure_topic_id = item["card_id"]
logging.info(
"consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
exposure_topic_id_list.append(exposure_topic_id)
topic_tag_id_dict = dict()
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id__in=exposure_topic_id_list)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
if sql_item.topic_id not in topic_tag_id_dict:
topic_tag_id_dict[sql_item.topic_id] = list()
topic_tag_id_dict[sql_item.topic_id].append(sql_item.tag_id)
is_click = 0
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % (
str(device_id), str(topic_tag_id_dict)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
import numpy as np
import redis
from libs.cache import redis_client
from trans2es.models.tag import Tag
import logging
import traceback
import json
import pickle
class LinUCB:
d = 2
alpha = 0.25
r1 = 1
r0 = -0.5
default_tag_list = list()
@classmethod
def get_default_tag_list(cls):
try:
if len(cls.default_tag_list) == 0:
query_item_results = Tag.objects.filter(is_online=True)
for item in query_item_results:
cls.default_tag_list.append(item.id)
return cls.default_tag_list[:20]
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
@classmethod
def linucb_recommend_tag(cls,device_id,redis_linucb_tag_data_dict,user_features_list,tag_list):
"""
:remark 获取推荐标签
:param redis_linucb_tag_data_dict:
:param user_features_list:
:param tag_list:
:return:
"""
try:
Aa_list = list()
theta_list = list()
for tag_id in tag_list:
tag_dict = pickle.loads(redis_linucb_tag_data_dict[tag_id])
Aa_list.append(tag_dict["Aa"])
theta_list.append(tag_dict["theta"])
xaT = np.array([user_features_list])
xa = np.transpose(xaT)
art_max = -1
old_pa = 0
AaI_tmp = np.array(Aa_list)
theta_tmp = np.array(theta_list)
np_array = np.dot(xaT, theta_tmp) + cls.alpha * np.sqrt(np.dot(np.dot(xaT, AaI_tmp), xa))
# top_tag_list_len = int(np_array.size/2)
# top_np_ind = np.argpartition(np_array, -top_tag_list_len)[-top_tag_list_len:]
#
# top_tag_list = list()
# top_np_list = top_np_ind.tolist()
# for tag_id in top_np_list:
# top_tag_list.append(tag_id)
#art_max = tag_list[np.argmax(np.dot(xaT, theta_tmp) + cls.alpha * np.sqrt(np.dot(np.dot(xaT, AaI_tmp), xa)))]
top_tag_set = set()
np_score_list = list()
np_score_dict = dict()
for score_index in range(0,np_array.size):
score = np_array.take(score_index)
np_score_list.append(score)
if score not in np_score_dict:
np_score_dict[score] = [score_index]
else:
np_score_dict[score].append(score_index)
sorted_np_score_list = sorted(np_score_list,reverse=True)
for top_score in sorted_np_score_list:
for top_score_index in np_score_dict[top_score]:
top_tag_set.add(str(tag_list[top_score_index], encoding="utf-8"))
if len(top_tag_set) >= 10:
break
logging.info("duan add,device_id:%s,sorted_np_score_list:%s,np_score_dict:%s" % (str(device_id), str(sorted_np_score_list), str(np_score_dict)))
return list(top_tag_set)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
@classmethod
def init_device_id_linucb_info(cls, redis_cli,redis_prefix, device_id, tag_list):
try:
redis_key = redis_prefix + str(device_id)
user_tag_linucb_dict = dict()
for tag_id in tag_list:
init_dict = {
"Aa": np.identity(cls.d),
"theta": np.zeros((cls.d, 1)),
"ba": np.zeros((cls.d, 1)),
"AaI": np.identity(cls.d)
}
pickle_data = pickle.dumps(init_dict)
user_tag_linucb_dict[tag_id] = pickle_data
redis_cli.hmset(redis_key, user_tag_linucb_dict)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
@classmethod
def update_linucb_info(cls, user_features,reward, tag_id, device_id, redis_prefix,redis_cli):
try:
if reward == -1:
logging.warning("reward val error!")
elif reward == 1 or reward == 0:
if reward == 1:
r = cls.r1
else:
r = cls.r0
xaT = np.array([user_features])
xa = np.transpose(xaT)
redis_key = redis_prefix + str(device_id)
ori_redis_tag_data = redis_cli.hget(redis_key, tag_id)
if not ori_redis_tag_data:
LinUCB.init_device_id_linucb_info(redis_client, redis_prefix, device_id,[tag_id])
else:
ori_redis_tag_dict = pickle.loads(ori_redis_tag_data)
new_Aa_matrix = ori_redis_tag_dict["Aa"] + np.dot(xa, xaT)
new_AaI_matrix = np.linalg.solve(new_Aa_matrix, np.identity(cls.d))
new_ba_matrix = ori_redis_tag_dict["ba"] + r*xa
user_tag_dict = {
"Aa": new_Aa_matrix,
"ba": new_ba_matrix,
"AaI": new_AaI_matrix,
"theta": np.dot(new_AaI_matrix, new_ba_matrix)
}
redis_cli.hset(redis_key, tag_id, pickle.dumps(user_tag_dict))
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
\ No newline at end of file
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import itertools
from django.conf import settings
from __future__ import unicode_literals, print_function, absolute_import
import itertools
from django.conf import settings
import logging
class CeleryTaskRouter(object):
queue_task_map = {
"tapir-alpha":[
"tapir-alpha": [
'injection.data_sync.tasks.write_to_es',
]
}
......@@ -30,4 +31,4 @@ class CeleryTaskRouter(object):
logging.info("duan add,task is:%s" % str(task))
queue_name_or_none = self.task_queue_map.get(task)
return queue_name_or_none
\ No newline at end of file
return queue_name_or_none
......@@ -14,188 +14,4 @@ import os
from .log_settings import *
from datetime import timedelta
from celery.schedules import crontab
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '^j3sg)sj8rc@du74%fb$c2926tv!!4g(kp-=rx1)c5!1&1(dq='
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = False
ALLOWED_HOSTS = []
# Application definition
SENTRY_CELERY_ENDPOINT = "http://60b0004c8884420f8067fb32fc3ed244:20f97fc73ffa4aad9735d0e6542a6d78@sentry.igengmei.com/140"
BROKER_URL = "redis://127.0.0.1:6379/8"
# CELERY_SEND_EVENTS = True
# CELERY_SEND_TASK_SENT_EVENT = True
#
# CELERY_DEFAULT_EXCHANGE = 'celery'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
# CELERY_DEFAULT_ROUTING_KEY = 'celery'
#
# CELERY_QUEUES = {
# 'celery': {
# 'exchange': CELERY_DEFAULT_EXCHANGE,
# 'routing_key': CELERY_DEFAULT_ROUTING_KEY,
# },
# 'order': {
# 'exchange': 'order',
# 'routing_key': 'order',
# },
# }
CELERY_BROKER_URL = "redis://127.0.0.1:6379/8"
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ROUTES = ['physical.celery_task_router.CeleryTaskRouter']
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'trans2es',
'search',
'injection.data_sync',
)
CELERYBEAT_SCHEDULE = {
'sync_face_similar_data_to_redis': {
'task': 'injection.data_sync.tasks.sync_face_similar_data_to_redis',
'schedule': timedelta(seconds=120),
'args': ()
},
}
"""
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
"""
ROOT_URLCONF = 'physical.urls'
WSGI_APPLICATION = 'physical.wsgi.application'
REDIS_URL = "redis://127.0.0.1:6379"
# Database
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
DATABASE_ROUTERS = ['physical.DBRouter.DBRouter']
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'alpha',
'USER': 'work',
'PASSWORD': 'Gengmei123',
# 'HOST': 'rm-2ze5k2we69904787l.mysql.rds.aliyuncs.com',
"HOST": 'rm-2zeaut61u9sm21m0bjo.mysql.rds.aliyuncs.com',
#'HOST': 'rm-2zeaut61u9sm21m0b.mysql.rds.aliyuncs.com',
'PORT': '3306',
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4",
},
},
'face': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'face',
'USER': 'work',
'PASSWORD': 'Gengmei123',
# 'HOST': 'rm-2ze5k2we69904787l.mysql.rds.aliyuncs.com',
"HOST": "rm-2zeaut61u9sm21m0bjo.mysql.rds.aliyuncs.com",
# 'HOST': 'rm-2zeaut61u9sm21m0b.mysql.rds.aliyuncs.com',
'PORT': '3306',
# 'CONN_MAX_AGE': None,
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4",
},
}
}
ES_INFO_LIST = [
{
"host": "10.29.130.141",
"port": 9200
}
]
ES_INDEX_PREFIX = "gm-dbmw"
MIDDLEWARE_CLASSES = (
'gm_tracer.middleware.TracerMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'helios.DjangoL5dMiddleware',
)
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
# Internationalization
# https://docs.djangoproject.com/en/1.10/topics/i18n/
# LANGUAGE_CODE = 'en-us'
#
TIME_ZONE = 'Asia/Shanghai'
#
USE_I18N = True
USE_L10N = True
#
# USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
STATIC_URL = '/static/'
from .settings_local import *
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '^j3sg)sj8rc@du74%fb$c2926tv!!4g(kp-=rx1)c5!1&1(dq='
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = False
ALLOWED_HOSTS = []
# Application definition
SENTRY_CELERY_ENDPOINT = "http://60b0004c8884420f8067fb32fc3ed244:20f97fc73ffa4aad9735d0e6542a6d78@sentry.igengmei.com/140"
BROKER_URL = "redis://127.0.0.1:6379/8"
# CELERY_SEND_EVENTS = True
# CELERY_SEND_TASK_SENT_EVENT = True
#
# CELERY_DEFAULT_EXCHANGE = 'celery'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
# CELERY_DEFAULT_ROUTING_KEY = 'celery'
#
# CELERY_QUEUES = {
# 'celery': {
# 'exchange': CELERY_DEFAULT_EXCHANGE,
# 'routing_key': CELERY_DEFAULT_ROUTING_KEY,
# },
# 'order': {
# 'exchange': 'order',
# 'routing_key': 'order',
# },
# }
CELERY_BROKER_URL = "redis://127.0.0.1:6379/8"
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ROUTES = ['physical.celery_task_router.CeleryTaskRouter']
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'trans2es',
'search',
'injection.data_sync',
)
CELERYBEAT_SCHEDULE = {
'sync_face_similar_data_to_redis': {
'task': 'injection.data_sync.tasks.sync_face_similar_data_to_redis',
'schedule': 120.0,
'args': ()
},
}
"""
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
"""
ROOT_URLCONF = 'physical.urls'
WSGI_APPLICATION = 'physical.wsgi.application'
REDIS_URL = "redis://127.0.0.1:6379"
# Database
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'alpha',
'USER': 'work',
'PASSWORD': 'Gengmei123',
# 'HOST': 'rm-2ze5k2we69904787l.mysql.rds.aliyuncs.com',
'HOST': 'rm-2zeaut61u9sm21m0b.mysql.rds.aliyuncs.com',
'PORT': '3306',
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4",
},
},
'face': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'face',
'USER': 'work',
'PASSWORD': 'Gengmei123',
# 'HOST': 'rm-2ze5k2we69904787l.mysql.rds.aliyuncs.com',
'HOST': 'rm-2zeaut61u9sm21m0b.mysql.rds.aliyuncs.com',
'PORT': '3306',
# 'CONN_MAX_AGE': None,
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4",
},
}
}
ES_INFO_LIST = [
{
"host": "10.29.130.141",
"port": 9200
}
]
ES_INDEX_PREFIX = "gm-dbmw"
MIDDLEWARE_CLASSES = (
'gm_tracer.middleware.TracerMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'helios.DjangoL5dMiddleware',
)
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
# Internationalization
# https://docs.djangoproject.com/en/1.10/topics/i18n/
# LANGUAGE_CODE = 'en-us'
#
TIME_ZONE = 'Asia/Shanghai'
#
USE_I18N = True
USE_L10N = True
#
# USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
STATIC_URL = '/static/'
......@@ -12,6 +12,8 @@ PyMySQL==0.9.2
gunicorn==19.9.0
gevent==1.3.7
pypinyin==0.34.1
numpy==1.16.2
lz4==2.1.6
git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git@master
git+ssh://git@git.wanmeizhensuo.com/backend/helios.git@master
......
......@@ -47,7 +47,7 @@ class GroupUtils(object):
return {"total_count":0, "hits":[]}
@classmethod
def get_hot_group_recommend_result_list(cls,offset,size,es_cli_obj=None):
def get_hot_pictorial_recommend_result_list(cls,offset,size,es_cli_obj=None):
try:
if not es_cli_obj:
es_cli_obj = ESPerform.get_cli()
......@@ -68,19 +68,19 @@ class GroupUtils(object):
"includes":["id"]
}
result_dict = ESPerform.get_search_results(es_cli_obj,"group",q,offset,size)
result_dict = ESPerform.get_search_results(es_cli_obj,"pictorial",q,offset,size)
group_ids_list = []
pictorial_ids_list = []
if len(result_dict["hits"]) > 0:
group_ids_list = [item["_source"]["id"] for item in result_dict["hits"]]
pictorial_ids_list = [item["_source"]["id"] for item in result_dict["hits"]]
return group_ids_list
return pictorial_ids_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
@classmethod
def get_user_attention_group_list(cls,user_id,offset=0,size=10,es_cli_obj=None):
def get_user_attention_pictorial_list(cls,user_id,offset=0,size=10,es_cli_obj=None):
"""
:remark: 获取用户关注小组列表
:return:
......@@ -100,12 +100,12 @@ class GroupUtils(object):
}
}
q["_source"] = {
"includes":["attention_group_id_list"]
"includes":["attention_pictorial_id_list"]
}
result_dict = ESPerform.get_search_results(es_cli_obj,"user",q,offset,size)
if len(result_dict["hits"])>0:
return result_dict["hits"][0]["_source"]["attention_group_id_list"]
return result_dict["hits"][0]["_source"]["attention_pictorial_id_list"]
else:
return []
except:
......@@ -113,7 +113,7 @@ class GroupUtils(object):
return []
@classmethod
def get_group_ids_by_aggs(cls,group_id_list,es_cli_obj=None):
def get_pictorial_ids_by_aggs(cls,pictorial_ids_list,es_cli_obj=None):
"""
:remark:聚合查询获取小组列表
:param group_id_list:
......@@ -127,13 +127,13 @@ class GroupUtils(object):
q["size"]=0
q["query"] = {
"terms":{
"group_id":group_id_list
"pictorial_id":pictorial_ids_list
}
}
q["aggs"] = {
"group_ids":{
"pictorial_ids":{
"terms":{
"field":"group_id"
"field":"pictorial_id"
},
"aggs":{
"max_date":{
......@@ -146,12 +146,12 @@ class GroupUtils(object):
}
result_dict = ESPerform.get_search_results(es_cli_obj,"topic",q,aggregations_query=True)
buckets_list = result_dict["aggregations"]["group_ids"]["buckets"]
buckets_list = result_dict["aggregations"]["pictorial_ids"]["buckets"]
sorted_buckets_list = sorted(buckets_list,key=lambda item:item["max_date"]["value"],reverse=True)
sorted_group_id_list = [item["key"] for item in sorted_buckets_list]
sorted_pictorial_id_list = [item["key"] for item in sorted_buckets_list]
return sorted_group_id_list
return sorted_pictorial_id_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
\ No newline at end of file
This diff is collapsed.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import, print_function
from gm_rpcd.all import bind
import logging
import traceback
import json
from search.utils.topic import TopicUtils
from libs.es import ESPerform
from libs.cache import redis_client
from search.utils.common import *
from libs.es import ESPerform
@bind("physical/search/business/topic")
def business_topic_search(filters, nfilters=None, sorts_by=None, offset=0, size=10):
"""帖子搜索。"""
try:
result_list = TopicUtils.business_topic_ids(
filters=filters, nfilters=nfilters, sorts_by=sorts_by,
offset=offset, size=size, filter_online=False,
index_name="topic"
)
logging.info("get result_lsit:%s"%result_list)
topic_ids = [item["_source"]["id"] for item in result_list["hits"]]
return {"topic_ids": topic_ids, "total_count": result_list["total_count"]}
except:
logging.error("catch exception, err_msg:%s" % traceback.format_exc())
return {"topic_ids": [], "total_count": 0}
......@@ -12,8 +12,8 @@ from search.utils.common import GroupSortTypes
from libs.es import ESPerform
@bind("physical/search/query_group")
def query_group(query="",offset=0,size=10):
@bind("physical/search/query_pictorial")
def query_group(query="", offset=0, size=10):
"""
:remark:小组搜索排序策略,缺少排序策略
:param query:
......@@ -22,7 +22,7 @@ def query_group(query="",offset=0,size=10):
:return:
"""
try:
result_dict = GroupUtils.get_group_query_result(query,offset,size)
result_dict = GroupUtils.get_group_query_result(query, offset, size)
group_ids_list = []
if len(result_dict["hits"]) > 0:
......@@ -31,10 +31,11 @@ def query_group(query="",offset=0,size=10):
return {"group_ids": group_ids_list}
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"group_ids":[]}
return {"group_ids": []}
@bind("physical/search/group_sort")
def group_sort(user_id=-1,sort_type=GroupSortTypes.HOT_RECOMMEND,offset=0,size=10):
@bind("physical/search/pictorial_sort")
def pictorial_sort(user_id=-1, sort_type=GroupSortTypes.HOT_RECOMMEND, offset=0, size=10):
"""
:remark 小组排序,缺少:前1天发评论人数*x
:param user_id:
......@@ -44,34 +45,165 @@ def group_sort(user_id=-1,sort_type=GroupSortTypes.HOT_RECOMMEND,offset=0,size=1
:return:
"""
try:
if not isinstance(user_id,int):
if not isinstance(user_id, int):
user_id = -1
#获取es链接对象
# 获取es链接对象
es_cli_obj = ESPerform.get_cli()
if sort_type==GroupSortTypes.HOT_RECOMMEND:
group_ids_list = GroupUtils.get_hot_group_recommend_result_list(offset,size,es_cli_obj)
if sort_type == GroupSortTypes.HOT_RECOMMEND:
pictorial_ids_list = GroupUtils.get_hot_pictorial_recommend_result_list(offset, size, es_cli_obj)
return {"group_recommend_ids":group_ids_list}
return {"pictorial_recommend_ids": pictorial_ids_list}
elif sort_type==GroupSortTypes.ATTENTION_RECOMMEND:
attention_group_list = GroupUtils.get_user_attention_group_list(user_id,offset=0,size=1,es_cli_obj=es_cli_obj)
if len(attention_group_list)==0:
return {"group_recommend_ids": []}
elif sort_type == GroupSortTypes.ATTENTION_RECOMMEND:
attention_pictorial_list = GroupUtils.get_user_attention_pictorial_list(user_id, offset=0, size=1,
es_cli_obj=es_cli_obj)
if len(attention_pictorial_list) == 0:
return {"pictorial_recommend_ids": []}
else:
attention_group_id_list = [item["group_id"] for item in attention_group_list]
sorted_group_ids_list = GroupUtils.get_group_ids_by_aggs(attention_group_id_list,es_cli_obj)
attention_pictorial_id_list = [item["pictorial_id"] for item in attention_pictorial_list]
sorted_pictorial_ids_list = GroupUtils.get_pictorial_ids_by_aggs(attention_pictorial_id_list,
es_cli_obj)
pictorial_recommend_ids_list = sorted_pictorial_ids_list
# if len(group_recommend_ids_list) < size and len(group_recommend_ids_list)<len(attention_group_list):
sorted_attention_pictorial_list = sorted(attention_pictorial_list,
key=lambda item: item["update_time_val"], reverse=True)
for item in sorted_attention_pictorial_list:
if item["pictorial_id"] not in pictorial_recommend_ids_list:
pictorial_recommend_ids_list.append(item["pictorial_id"])
return {"pictorial_recommend_ids": pictorial_recommend_ids_list[offset:(offset + size)]}
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"pictorial_recommend_ids": []}
@bind("physical/search/pictorial_topic")
def pictorial_topic(topic_id=-1, offset=0, size=10):
"""
:remark 入选画报
:param user_id:
:param sort_type:
:param offset:
:param size:
:return:
"""
try:
if not isinstance(topic_id, int):
user_id = -1
# 获取es链接对象
es_cli_obj = ESPerform.get_cli()
q = {}
# 获取帖子从属的画报
q["query"] = {
"term": {
"id": topic_id
}
}
q["_source"] = {
"include": ["id", "pictorial_id", "tag_list"]
}
result_dict = ESPerform.get_search_results(es_cli_obj, "topic", q, offset, size)
logging.info("get result_dict:%s" % result_dict)
pict_pictorial_ids_list = []
topic_tag_list = []
pictorial_id_list = []
if len(result_dict["hits"]) > 0:
for item in result_dict["hits"]:
pict_pictorial_ids_list = item["_source"]["pictorial_id"]
topic_tag_list = item["_source"]["tag_list"]
q["query"] = {
"bool": {
"must": [{
"terms": {
"id": pict_pictorial_ids_list
}
},
{
"term": {
"is_online": True
}
},
{
"term": {
"is_deleted": False
}
},
]
}
}
q["_source"] = {
"include": ["id", "update_time"]
}
q["sort"] = {
'update_time': {
'order': 'desc'
}
}
result_dict = ESPerform.get_search_results(es_cli_obj, "pictorial", q, offset, size)
if len(result_dict["hits"]) > 0:
for item in result_dict["hits"]:
pictorial_id = item["_source"]["id"]
pictorial_id_list.append(pictorial_id)
logging.info("get pictorial_id_list:%s" % pictorial_id_list)
logging.info("get topic_tag_list:%s" % topic_tag_list)
if len(pictorial_id_list) < 10:
num = 10 - len(pictorial_id_list)
q["query"] = {
"bool": {
"must": [{
"terms": {
"tag_id": topic_tag_list
}}, {
"term": {
"is_online": True
}
},{
"term": {
"is_deleted": False
}
}]
}
}
q["_source"] = {
"include": ["id", "tag_id"]}
q["sort"] = {
'update_time': {
'order': 'desc'
}}
q["from"] = 0
q["size"] = 10
result_dict = ESPerform.get_search_results(es_cli_obj, "pictorial", q, offset, size)
if len(result_dict["hits"]) > 0:
for item in result_dict["hits"]:
id = item["_source"]["id"]
if id not in pictorial_id_list:
pictorial_id_list.append(id) #
logging.info("get result_dict tag:%s" % result_dict)
logging.info("get pictorial_id_list tag:%s" % pictorial_id_list)
pictorial_list = pictorial_id_list if len(pictorial_id_list) < 10 else pictorial_id_list[:10]
group_recommend_ids_list = sorted_group_ids_list
#if len(group_recommend_ids_list) < size and len(group_recommend_ids_list)<len(attention_group_list):
sorted_attention_group_list = sorted(attention_group_list,key=lambda item:item["update_time_val"],reverse=True)
for item in sorted_attention_group_list:
if item["group_id"] not in group_recommend_ids_list:
group_recommend_ids_list.append(item["group_id"])
return {"pictorial_ids_list": pictorial_list}
return {"group_recommend_ids": group_recommend_ids_list[offset:(offset+size)]}
else:
return {"pictorial_ids_list": pictorial_id_list}
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"group_recommend_ids":[]}
return {"pictorial_ids_list": []}
This diff is collapsed.
......@@ -24,8 +24,10 @@ def recommend_user(self_user_id,interesting_user_id,offset=0,size=10):
:return:
"""
try:
if not isinstance(self_user_id,int):
if not isinstance(self_user_id, int):
self_user_id = -1
if not isinstance(interesting_user_id, int):
interesting_user_id = -1
#获取es链接对象
es_cli_obj = ESPerform.get_cli()
......
......@@ -19,6 +19,15 @@ from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar
import json
from search.utils.topic import TopicUtils
from trans2es.models.pick_topic import PickTopic
from trans2es.models.tag import TopicTag,Tag
from trans2es.models.user_extra import UserExtra
from trans2es.models.group import Group
from trans2es.models.topic import Topic,ActionSumAboutTopic
from search.utils.common import *
from linucb.views.collect_data import CollectData
from injection.data_sync.tasks import sync_user_similar_score
class Job(object):
__es = None
......@@ -40,7 +49,6 @@ class Job(object):
def __call__(self):
type_info = get_type_info_map()[self._type_name]
assert isinstance(type_info, TypeInfo)
result = type_info.insert_table_chunk(
sub_index_name=self._sub_index_name,
table_chunk=self._chunk,
......@@ -97,7 +105,9 @@ class Command(BaseCommand):
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default='')
make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default=''),
make_option('-T', '--test_score', dest='test_score', help='test_score', metavar='TYPE', default='')
)
def __sync_data_by_type(self, type_name):
......@@ -116,6 +126,68 @@ class Command(BaseCommand):
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def generate_topic_score_detail(self):
try:
topic_id_dict = TopicUtils.get_recommend_topic_ids(241432787,0, 0, 500,query_type=TopicPageType.HOME_RECOMMEND,test_score=True)
for topic_id in topic_id_dict:
offline_score = 0.0
user_is_shadow = False
topic_sql_item = Topic.objects.filter(id=topic_id)
user_is_recommend=0.0
# 是否官方推荐用户
user_query_results = UserExtra.objects.filter(user_id=topic_sql_item[0].user_id)
if user_query_results.count() > 0:
if user_query_results[0].is_recommend:
offline_score += 2.0
user_is_recommend = 2.0
elif user_query_results[0].is_shadow:
user_is_shadow = True
group_is_recommend=0.0
# 是否官方推荐小组
# if topic_sql_item[0].group and topic_sql_item[0].group.is_recommend:
# offline_score += 4.0
# group_is_recommend = 4.0
topic_level_score = 0.0
# 帖子等级
if topic_sql_item[0].content_level == '5':
offline_score += 6.0
topic_level_score = 6.0
elif topic_sql_item[0].content_level == '4':
offline_score += 5.0
topic_level_score = 5.0
elif topic_sql_item[0].content_level == '3':
offline_score += 2.0
topic_level_score = 2.0
exposure_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=1).count()
click_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=2).count()
uv_num = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=3).count()
exposure_score = 0.0
uv_score = 0.0
if exposure_count > 0:
offline_score += click_count / exposure_count
exposure_score = click_count / exposure_count
if uv_num > 0:
offline_score += (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num)
uv_score = (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num)
"""
1:马甲账号是否对总分降权?
"""
if user_is_shadow:
offline_score = offline_score * 0.5
logging.info("test_score######topic_id:%d,score:%f,offline_score:%f,user_is_recommend:%f,group_is_recommend:%f,topic_level_score:%f,exposure_score:%f,uv_score:%f"
% (topic_id,topic_id_dict[topic_id][2],offline_score,user_is_recommend,group_is_recommend,topic_level_score,exposure_score,uv_score))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def handle(self, *args, **options):
try:
type_name_list = get_type_info_map().keys()
......@@ -129,6 +201,14 @@ class Command(BaseCommand):
if len(options["sync_type"]) and options["sync_type"]=="sync_data_to_es":
SyncDataToRedis.sync_face_similar_data_to_redis()
if len(options["test_score"]):
self.generate_topic_score_detail()
if len(options["sync_type"]) and options["sync_type"]=="linucb":
collect_obj = CollectData()
collect_obj.consume_data_from_kafka()
if len(options["sync_type"]) and options["sync_type"]=="similar":
sync_user_similar_score()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -41,11 +41,12 @@ class Command(BaseCommand):
official_index_name = ESPerform.get_official_index_name(type_name)
index_exists = es_cli.indices.exists(official_index_name)
if not index_exists:
logging.info("begin create [%s] index and mapping!" % type_name)
logging.info("begin create [%s] index!" % type_name)
ESPerform.create_index(es_cli, type_name)
ESPerform.put_index_mapping(es_cli, type_name)
else:
logging.warning("index:[%s] has already existing!" % type_name)
logging.info("begin create [%s] mapping!" % type_name)
ESPerform.put_index_mapping(es_cli, type_name, force_sync=True)
if len(options["indices_template"]):
template_file_name = options["indices_template"]
......
{
"index_patterns": ["*"],
"settings":{
"number_of_shards": 9,
"number_of_replicas": 2,
"number_of_shards": 5,
"number_of_replicas": 3,
"index":{
"analysis":{
"filter":{
......
{
"dynamic":"strict",
"properties": {
"id":{"type":"long"},
"is_online":{"type":"boolean"},//上线
"is_deleted":{"type":"boolean"},
"is_recommend":{"type":"boolean"},
"name":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"description":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"topic_num":{"type":"long"},
"creator_id":{"type":"long"},
"icon":{"type":"text"},
"high_quality_topic_num":{"type":"long"},//前一天该小组4&5星帖子数量
"create_time":{"type":"date", "format":"date_time_no_millis"},
"update_time":{"type":"date", "format":"date_time_no_millis"},
"tag_id":{"type":"long"},
"tag_name":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"topic_id_list":{"type":"long"}
}
}
\ No newline at end of file
......@@ -19,6 +19,9 @@
},
"name":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"tag_type":{"type":"long"},
"collection":{"type":"long"},
"is_ai":{"type":"long"},
"is_own":{"type":"long"},
"is_online":{"type":"keyword"},//上线
"is_deleted":{"type":"keyword"},
"near_new_topic_num":{"type":"long","store": true}
......
{
"dynamic":"strict",
"properties": {
"id":{"type":"long"},
"is_online":{"type":"boolean"},//上线
"is_deleted":{"type":"boolean"},
"vote_num":{"type":"long"},
"reply_num":{"type":"long"},
"name":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"description":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"content":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"content_level":{"type":"text"},
"user_id":{"type":"long"},
"group_id":{"type":"long"}, //所在组ID
"tag_list":{"type":"long"},//标签属性
"edit_tag_list":{"type":"long"},//编辑标签
"tag_name_list":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"share_num":{"type":"long"},
"pick_id_list":{"type":"long"},
"offline_score":{"type":"double"},//离线算分
"manual_score":{"type":"double"},//人工赋分
"has_image":{"type":"boolean"},//是否有图
"has_video":{"type":"boolean"},//是否是视频
"create_time":{"type":"date", "format":"date_time_no_millis"},
"update_time":{"type":"date", "format":"date_time_no_millis"},
"create_time_val":{"type":"long"},
"update_time_val":{"type":"long"},
"language_type":{"type":"long"},
"is_shadow": {"type": "boolean"},
"is_recommend": {"type": "boolean"},
"is_complaint": {"type": "boolean"}, // 是否被举报
"virtual_content_level":{"type": "text"},
"like_num_crawl": {"type": "long"}, // 爬取点赞数
"comment_num_crawl": {"type": "long"}, // 爬取评论数
"is_crawl": {"type": "boolean"},
"platform": {"type": "long"},
"platform_id": {"type": "long"},
"drop_score":{"type": "double"}, // 人工降分
"sort_score":{"type": "double"}, // 排序分
"pictorial_id":{"type": "long"}, //所在组ID
"pictorial_name":{ // 所在组名称
"type": "text",
"analyzer": "gm_default_index",
"search_analyzer": "gm_default_index"
}
}
}
......@@ -13,15 +13,41 @@
"user_id":{"type":"long"},
"group_id":{"type":"long"}, //所在组ID
"tag_list":{"type":"long"},//标签属性
"edit_tag_list":{"type":"long"},//编辑标签
"tag_name_list":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"share_num":{"type":"long"},
"pick_id_list":{"type":"long"},
"offline_score":{"type":"double"},//离线算分
"manual_score":{"type":"double"},//人工赋分
"has_image":{"type":"boolean"},//是否有图
"has_video":{"type":"boolean"},//是否是视频
"create_time":{"type":"date", "format":"date_time_no_millis"},
"update_time":{"type":"date", "format":"date_time_no_millis"},
"create_time_val":{"type":"long"},
"update_time_val":{"type":"long"}
"update_time_val":{"type":"long"},
"language_type":{"type":"long"},
"is_shadow": {"type": "boolean"},
"is_recommend": {"type": "boolean"},
"is_complaint": {"type": "boolean"}, // 是否被举报
"virtual_content_level":{"type": "text"},
"like_num_crawl": {"type": "long"}, // 爬取点赞数
"comment_num_crawl": {"type": "long"}, // 爬取评论数
"is_crawl": {"type": "boolean"},
"platform": {"type": "long"},
"platform_id": {"type": "long"},
"drop_score":{"type": "double"}, // 人工降分
"sort_score":{"type": "double"}, // 排序分
"pictorial_id":{"type": "long"}, //所在组ID
"pictorial_name":{ // 所在组名称
"type": "text",
"analyzer": "gm_default_index",
"search_analyzer": "gm_default_index"
}
}
}
\ No newline at end of file
}
......@@ -28,17 +28,31 @@
"country_id":{"type":"text"}
}
},
"same_group_user_id_list":{//同组用户列表
// "same_group_user_id_list":{//同组用户列表
// "type":"nested",
// "properties":{
// "user_id":{"type":"long"},
// "country_id":{"type":"text"}
// }
// },
// "attention_group_id_list":{//关注小组列表
// "type":"nested",
// "properties":{
// "group_id":{"type":"long"},
// "update_time_val":{"type":"long"}
// }
// },
"same_pictorial_user_id_list":{//同画报用户列表
"type":"nested",
"properties":{
"user_id":{"type":"long"},
"country_id":{"type":"text"}
}
},
"attention_group_id_list":{//关注小组列表
"attention_pictorial_id_list":{//关注画报列表
"type":"nested",
"properties":{
"group_id":{"type":"long"},
"pictorial_id":{"type":"long"},
"update_time_val":{"type":"long"}
}
},
......
......@@ -12,4 +12,5 @@ from .tag import AccountUserTag
from .user import User
from .group import Group
from .topic import Topic
from .pictorial import PictorialFollow
from .pictorial import Pictorial
......@@ -30,7 +30,7 @@ class Celebrity(models.Model):
def get_pick_id_list(self):
try:
pick_id_list = list()
query_results = PickCelebrity.objects.filter(celebrity_id=self.id,is_deleted=False)
query_results = PickCelebrity.objects.using(settings.SLAVE_DB_NAME).filter(celebrity_id=self.id,is_deleted=False)
for data_item in query_results:
pick_id_list.append(data_item.pick_id)
......
......@@ -27,3 +27,19 @@ class FaceUserContrastSimilar(models.Model):
create_time = models.DateTimeField(verbose_name=u'创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
contrast_user_id = models.IntegerField(verbose_name=u'对照者用户ID')
class UserSimilarScore(models.Model):
class Meta:
verbose_name=u"首页推荐用"
db_table="user_similar_score"
id = models.IntegerField(verbose_name=u"主键ID",primary_key=True)
is_deleted = models.BooleanField(verbose_name=u"是否删除")
user_id = models.IntegerField(verbose_name=u"用户ID")
contrast_user_id = models.BigIntegerField(verbose_name="参数对比的用户id", db_index=True)
score = models.FloatField(verbose_name='相似度', default=0)
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
......@@ -27,15 +27,15 @@ class Group(models.Model):
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
#获取前一天4,5星发帖数
def get_high_quality_topic_num(self):
yesterday = datetime.datetime.now()-datetime.timedelta(days=1)
yesterday_begin_time = "%s-%s-%s 00:00:00" % (yesterday.year, yesterday.month, yesterday.day)
yesterday_end_time = "%s-%s-%s 23:59:59" % (yesterday.year, yesterday.month, yesterday.day)
topic_num = self.group_topics.filter(content_level__in=("4","5"),create_time__gte=yesterday_begin_time,create_time__lte=yesterday_end_time).count()
return topic_num
# #获取前一天4,5星发帖数
# def get_high_quality_topic_num(self):
# yesterday = datetime.datetime.now()-datetime.timedelta(days=1)
# yesterday_begin_time = "%s-%s-%s 00:00:00" % (yesterday.year, yesterday.month, yesterday.day)
# yesterday_end_time = "%s-%s-%s 23:59:59" % (yesterday.year, yesterday.month, yesterday.day)
#
# topic_num = self.group_topics.filter(content_level__in=("4","5"),create_time__gte=yesterday_begin_time,create_time__lte=yesterday_end_time).count()
#
# return topic_num
def detail(self):
result = {
......
from django.db import models
import datetime
import logging
import traceback
from .tag import Tag
from .topic import Topic
class PictorialFollow(models.Model):
"""画报关注"""
class Meta:
verbose_name = u"画报用户关系"
app_label = "community"
db_table = "community_pictorial_follow"
id = models.IntegerField(verbose_name=u'关注ID', primary_key=True)
create_time = models.DateTimeField(verbose_name=u'创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
is_online = models.BooleanField(verbose_name=u'是否上线')
is_deleted = models.BooleanField(verbose_name=u'是否删除')
pictorial_id = models.BigIntegerField(verbose_name=u'画报ID')
user_id = models.BigIntegerField(verbose_name=u'用户ID')
class PictorialTopics(models.Model):
"""画报帖子关系"""
class Meta:
verbose_name = u'画报帖子关系'
app_label = 'community'
db_table = 'community_pictorial_topic'
id = models.IntegerField(verbose_name=u'日记ID', primary_key=True)
pictorial_id = models.BigIntegerField(verbose_name=u'画报ID')
topic_id = models.BigIntegerField(verbose_name=u'帖子ID')
is_online = models.BooleanField(verbose_name=u"是否有效", default=True)
is_online = models.BooleanField(verbose_name=u'是否上线')
is_deleted = models.BooleanField(verbose_name=u'是否删除')
class Pictorial(models.Model):
"""画报关注"""
class Meta:
verbose_name = u"画报"
app_label = "community"
db_table = "community_pictorial"
id = models.IntegerField(verbose_name=u'关注ID', primary_key=True)
create_time = models.DateTimeField(verbose_name=u'创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
is_online = models.BooleanField(verbose_name=u'是否上线')
is_deleted = models.BooleanField(verbose_name=u'是否删除')
is_recommend = models.BooleanField(verbose_name=u'推荐')
name = models.CharField(verbose_name=u'画报名称', max_length=100)
description = models.CharField(verbose_name=u'画报描述', max_length=200)
creator_id = models.BigIntegerField(verbose_name=u'画报用户ID')
icon = models.CharField(verbose_name=u'画报名称', max_length=255)
topic_num = models.IntegerField(verbose_name=u'次数')
def get_topic_id(self):
try:
topic_id =[]
topic_id_list = PictorialTopics.objects.filter(pictorial_id=self.id).values_list("topic_id", flat=True)
for i in topic_id_list:
topic_id.append(i)
return topic_id
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
# 获取前一天4,5星发帖数
def get_high_quality_topic_num(self):
try:
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
yesterday_begin_time = "%s-%s-%s 00:00:00" % (yesterday.year, yesterday.month, yesterday.day)
yesterday_end_time = "%s-%s-%s 23:59:59" % (yesterday.year, yesterday.month, yesterday.day)
topic_id_list = PictorialTopics.objects.filter(pictorial_id=self.id).values_list("topic_id", flat=True)
topic_num = Topic.objects.filter(content_level__in=["4", "5"], create_time__gte=yesterday_begin_time,
create_time__lte=yesterday_end_time, id__in=topic_id_list).count()
return topic_num
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def get_tag_by_id(self):
try:
tag_id_list = []
tags = PictorialTag.objects.filter(pictorial_id=self.id, is_online=True).values_list("tag_id", flat=True)
for i in tags:
tag_id_list.append(i)
return tag_id_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def get_tag_by_name(self, tag_id):
try:
tag_name_list = []
tags = Tag.objects.filter(id__in=tag_id, is_online=True).values_list("name", flat=True)
for i in tags:
tag_name_list.append(i)
logging.info("get tags name i:%s" % i)
return tag_name_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
class PictorialTag(models.Model):
"""画报关注标签"""
class Meta:
verbose_name = u"画报标签"
app_label = "community"
db_table = "community_pictorial_tag"
id = models.IntegerField(verbose_name=u'关注ID', primary_key=True)
create_time = models.DateTimeField(verbose_name=u'创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
pictorial_id = models.BigIntegerField(verbose_name=u'画报ID', max_length=20)
tag_id = models.BigIntegerField(verbose_name=u'标签ID', max_length=20)
is_online = models.BooleanField(verbose_name=u'是否上线', max_length=1)
......@@ -45,6 +45,9 @@ class Tag(models.Model):
name = models.CharField(verbose_name=u"标签名称",max_length=128)
description = models.TextField(verbose_name=u"标签描述")
icon_url=models.CharField(verbose_name=u"icon_url",max_length=120)
collection = models.IntegerField(verbose_name=u"是否编辑")
is_ai = models.IntegerField(verbose_name=u"是否ai")
is_own = models.IntegerField(verbose_name=u"是否ins上自带")
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
......
This diff is collapsed.
This diff is collapsed.
......@@ -12,11 +12,12 @@ import elasticsearch
import elasticsearch.helpers
import sys
from trans2es.models import topic, user, pick_celebrity, group, celebrity, tag, contrast_similar
from trans2es.models import topic, user, pick_celebrity, group, celebrity, tag, contrast_similar,pictorial
from trans2es.utils.user_transfer import UserTransfer
from trans2es.utils.pick_celebrity_transfer import PickCelebrityTransfer
from trans2es.utils.group_transfer import GroupTransfer
from trans2es.utils.topic_transfer import TopicTransfer
from trans2es.utils.pictorial_transfer import PictorialTransfer
from trans2es.utils.celebrity_transfer import CelebrityTransfer
from trans2es.utils.tag_transfer import TagTransfer
from trans2es.utils.contrast_similar_transfer import Contrast_Similar_Transfer
......@@ -172,57 +173,82 @@ class TypeInfo(object):
)
def insert_table_by_pk_list(self, sub_index_name, pk_list, es=None, use_batch_query_set=False):
begin = time.time()
if use_batch_query_set:
qs = self.queryset
else:
qs = self.model.objects.all()
end = time.time()
time0=end-begin
begin = time.time()
instance_list = qs.filter(pk__in=pk_list)
end = time.time()
time1=end-begin
begin = time.time()
data_list = self.bulk_get_data(instance_list)
end = time.time()
time2=end-begin
begin = time.time()
logging.info("get sub_index_name:%s"%sub_index_name)
logging.info("get data_list:%s"%data_list)
self.elasticsearch_bulk_insert_data(
sub_index_name=sub_index_name,
data_list=data_list,
es=es,
)
end = time.time()
time3=end-begin
logging.info("duan add,insert_table_by_pk_list time cost:%ds,%ds,%ds,%ds" % (time0,time1,time2,time3))
def insert_table_chunk(self, sub_index_name, table_chunk, es=None):
start_clock = time.clock()
start_time = time.time()
instance_list = list(table_chunk)
stage_1_time = time.time()
data_list = self.bulk_get_data(instance_list)
stage_2_time = time.time()
es_result = ESPerform.es_helpers_bulk(
es_cli=es,
data_list=data_list,
sub_index_name=sub_index_name,
auto_create_index=True
)
stage_3_time = time.time()
end_clock = time.clock()
return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
'{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
'{response}').format(
datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
index_prefix=sub_index_name,
type_name=self.name,
pk_start=repr(table_chunk.get_pk_start()),
pk_stop=repr(table_chunk.get_pk_stop()),
count=len(instance_list),
stage_1_duration=stage_1_time - start_time,
stage_2_duration=stage_2_time - stage_1_time,
stage_3_duration=stage_3_time - stage_2_time,
clock_duration=end_clock - start_clock,
response=es_result,
)
def insert_table_chunk(self, sub_index_name, table_chunk, es=None):
try:
start_clock = time.clock()
start_time = time.time()
instance_list = list(table_chunk)
stage_1_time = time.time()
data_list = self.bulk_get_data(instance_list)
stage_2_time = time.time()
es_result = ESPerform.es_helpers_bulk(
es_cli=es,
data_list=data_list,
sub_index_name=sub_index_name,
auto_create_index=True
)
logging.info("es_helpers_bulk,sub_index_name:%s,data_list len:%d" % (sub_index_name,len(data_list)))
stage_3_time = time.time()
end_clock = time.clock()
return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
'{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
'{response}').format(
datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
index_prefix=sub_index_name,
type_name=self.name,
pk_start=repr(table_chunk.get_pk_start()),
pk_stop=repr(table_chunk.get_pk_stop()),
count=len(instance_list),
stage_1_duration=stage_1_time - start_time,
stage_2_duration=stage_2_time - stage_1_time,
stage_3_duration=stage_3_time - stage_2_time,
clock_duration=end_clock - start_clock,
response=es_result,
)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None
_get_type_info_map_result = None
......@@ -234,6 +260,7 @@ def get_type_info_map():
return _get_type_info_map_result
type_info_list = [
TypeInfo(
name='topic', # 日记
type='topic',
......@@ -244,6 +271,16 @@ def get_type_info_map():
round_insert_chunk_size=5,
round_insert_period=2,
),
TypeInfo(
name='topic-v1', # 日记
type='topic-v1',
model=topic.Topic,
query_deferred=lambda: topic.Topic.objects.all().query,
get_data_func=TopicTransfer.get_topic_data,
bulk_insert_chunk_size=100,
round_insert_chunk_size=5,
round_insert_period=2,
),
TypeInfo(
name="user", # 用户
type="user",
......@@ -308,7 +345,18 @@ def get_type_info_map():
bulk_insert_chunk_size=100,
round_insert_chunk_size=5,
round_insert_period=2
),
TypeInfo(
name="pictorial", # 画报
type="pictorial",
model=pictorial.Pictorial,
query_deferred=lambda: pictorial.Pictorial.objects.all().query,
get_data_func=PictorialTransfer.get_poctorial_data,
bulk_insert_chunk_size=100,
round_insert_chunk_size=5,
round_insert_period=2,
)
]
type_info_map = {
......@@ -318,3 +366,4 @@ def get_type_info_map():
_get_type_info_map_result = type_info_map
return type_info_map
......@@ -33,7 +33,7 @@ class GroupTransfer(object):
update_time = instance.update_time
tzlc_udpate_time = tzlc(update_time)
res["update_time"] = tzlc_udpate_time
res["high_quality_topic_num"] = instance.get_high_quality_topic_num()
# res["high_quality_topic_num"] = instance.get_high_quality_topic_num()
return res
except:
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import sys
import logging
import traceback
from libs.tools import tzlc
class PictorialTransfer(object):
def __init__(self):
pass
@classmethod
def get_poctorial_data(cls, instance):
try:
res = dict()
res["id"] = instance.id
res["is_online"] = instance.is_online
res["is_deleted"] = instance.is_deleted
res["is_recommend"] = instance.is_recommend
res["name"] = instance.name
res["description"] = instance.description
res["topic_num"] = instance.topic_num
res["creator_id"] = instance.creator_id
res["icon"] = instance.icon
create_time = instance.create_time
tzlc_create_time = tzlc(create_time)
res["create_time"] = tzlc_create_time
update_time = instance.update_time
tzlc_udpate_time = tzlc(update_time)
res["update_time"] = tzlc_udpate_time
res["high_quality_topic_num"] = instance.get_high_quality_topic_num()
tag_id = instance.get_tag_by_id()
res["tag_id"] = tag_id
res["tag_name"] = instance.get_tag_by_name(tag_id)
res["topic_id_list"] =instance.get_topic_id()
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict()
......@@ -10,6 +10,9 @@ from libs.tools import tzlc
from trans2es.models.topic import Topic
from trans2es.models.tag import TopicTag,CommunityTagType,CommunityTagTypeRelation
import datetime
from django.conf import settings
class TagTransfer(object):
......@@ -38,26 +41,34 @@ class TagTransfer(object):
res["is_online"] = instance.is_online
res["is_deleted"] = instance.is_deleted
res["near_new_topic_num"] = 0
topic_num = 0
res["near_new_topic_num"] = topic_num
if instance.is_online==True and instance.is_deleted==False:
topic_id_list = list()
sql_result_list = TopicTag.objects.filter(tag_id=instance.id).values_list("topic_id",flat=True)
for item_id in sql_result_list:
topic_id_list.append(item_id)
sql_result_results = list(TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(
tag_id=instance.id).values_list("topic_id", "is_online"))
for topic_id,is_online in sql_result_results:
if is_online:
topic_id_list.append(topic_id)
time_base_val = datetime.datetime.strftime(datetime.datetime.now()+datetime.timedelta(-7), "%Y-%m-%d")
topic_num = Topic.objects.filter(id__in=topic_id_list,create_time__gte=time_base_val).count()
for topic_begin_index in range(0,len(topic_id_list),100):
cur_topic_num = Topic.objects.using(settings.SLAVE_DB_NAME).filter(id__in=topic_id_list[topic_begin_index:topic_begin_index+100],create_time__gte=time_base_val).count()
topic_num += cur_topic_num
res["near_new_topic_num"] = topic_num
tag_type_sql_list = CommunityTagTypeRelation.objects.filter(tag_id=instance.id).values_list("tag_type_id",flat=True)
tag_type_sql_list = CommunityTagTypeRelation.objects.using(settings.SLAVE_DB_NAME).filter(tag_id=instance.id).values_list("tag_type_id",flat=True)
tag_type_list = list()
for tag_type_id in tag_type_sql_list:
tag_type_list.append(tag_type_id)
res["tag_type"] = tag_type_list
res["collection"] = instance.collection
res["is_ai"] = instance.is_ai
res["is_own"] = instance.is_own
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -6,6 +6,8 @@ import logging
import traceback
from libs.tools import tzlc
import time
import re
import datetime
class TopicTransfer(object):
......@@ -26,19 +28,60 @@ class TopicTransfer(object):
res["content_level"] = instance.content_level
res["user_id"] = instance.user_id
if instance.group:
res["group_id"] = instance.group.id
else:
res["group_id"] = -1
# if instance.group:
# res["group_id"] = instance.group.id
# else:
# res["group_id"] = -1
res["pictorial_id"] = instance.get_pictorial_id()
res["share_num"] = instance.share_num
begin = time.time()
res["pick_id_list"] = instance.get_pick_id_info()
res["tag_list"] = instance.get_topic_tag_id_list()
end = time.time()
time0 = (end-begin)
begin = time.time()
(topic_tag_id_list, edit_tag_id_list) = instance.get_topic_tag_id_list()
res["tag_list"] = topic_tag_id_list
res["edit_tag_list"] = edit_tag_id_list
end = time.time()
time1 = (end-begin)
begin = time.time()
res["tag_name_list"] = instance.get_tag_name_list(res["tag_list"])
end = time.time()
time2 = (end-begin)
begin = time.time()
res["offline_score"] = instance.get_topic_offline_score()
end = time.time()
time3 = (end-begin)
begin = time.time()
res["manual_score"] = instance.drop_score
res["has_image"] = instance.topic_has_image()
res["has_video"] = instance.has_video
res["language_type"] = instance.language_type
end = time.time()
time4 = (end-begin)
# # 片假名
# re_jp_pian_words = re.compile(u"[\u30a0-\u30ff]+")
# m_pian = re_jp_pian_words.search(instance.content, 0)
#
# # 平假名
# re_jp_ping_words = re.compile(u"[\u3040-\u309f]+")
# m_ping = re_jp_ping_words.search(instance.content, 0)
# if m_pian or m_ping:
# res["language_type"] = 10
# else:
# res["language_type"] = instance.language_type
create_time = instance.create_time
tzlc_create_time = tzlc(create_time)
......@@ -48,9 +91,10 @@ class TopicTransfer(object):
update_time = instance.update_time
tzlc_update_time = tzlc(update_time)
res["update_time"] = tzlc_update_time
# res["update_time"] = tzlc_update_time
res["update_time_val"] = int(time.mktime(tzlc_update_time.timetuple()))
logging.info("test topic transfer time cost,time0:%d,time1:%d,time2:%d,time3:%d,time4:%d" % (time0,time1,time2,time3,time4))
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -7,30 +7,33 @@ import traceback
import time
from libs.tools import tzlc
from trans2es.models.user import User
from django.conf import settings
class UserTransfer(object):
@classmethod
def get_follow_user_id_list(cls,userInstance):
def get_follow_user_id_list(cls, userInstance):
follow_user_id_list = list()
user_follows = userInstance.userfollow.filter(is_online=True)
for user_follow in user_follows:
follow_user_id_list.append(user_follow.follow_id)
follow_user_detail_list = list()
sql_data_list = User.objects.filter(user_id__in=follow_user_id_list,is_online=True,is_deleted=False)
for detail_data in sql_data_list:
item = {
"user_id":detail_data.user_id,
"country_id":detail_data.country_id
}
follow_user_detail_list.append(item)
for i in range(0, len(follow_user_id_list), 1000):
sql_data_list = User.objects.using(settings.SLAVE_DB_NAME).filter(user_id__in=follow_user_id_list[i:i + 1000], is_online=True,
is_deleted=False)
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
"country_id": detail_data.country_id
}
follow_user_detail_list.append(item)
return follow_user_detail_list
@classmethod
def get_user_data(cls,instance):
def get_user_data(cls, instance):
try:
res = dict()
......@@ -71,19 +74,28 @@ class UserTransfer(object):
try:
res["tag_list"] = instance.get_user_tag_id_list()
res["attention_user_id_list"] = cls.get_follow_user_id_list(userInstance=instance)
res["attention_group_id_list"] = instance.get_attention_group_id_list()
# res["attention_group_id_list"] = instance.get_attention_group_id_list()
res["pick_user_id_list"] = instance.get_pick_user_id_list()
res["same_group_user_id_list"] = instance.get_same_group_user_id_list()
res["attention_pictorial_id_list"] = instance.get_attention_pictorial_id_list()
# res["same_group_user_id_list"] = instance.get_same_group_user_id_list()
res["same_pictorial_user_id_list"] = instance.get_same_pictorial_user_id_list()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
res["tag_list"] = []
res["attention_user_id_list"] = []
res["attention_group_id_list"] = []
# res["attention_group_id_list"] = []
res["pick_user_id_list"] = []
res["same_group_user_id_list"] = []
# res["same_group_user_id_list"] = []
res["attention_pictorial_id_list"] = []
res["same_pictorial_user_id_list"] = []
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {}
\ No newline at end of file
return {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment