Commit d8fdc11c authored by lixiaofang's avatar lixiaofang

add

parents 80814216 227e7e01
......@@ -24,5 +24,29 @@
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
</formatting-settings>
</DBN-SQL>
<DBN-PSQL>
<case-options enabled="false">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false" />
</DBN-PSQL>
<DBN-SQL>
<case-options enabled="false">
<option name="KEYWORD_CASE" value="lower" />
<option name="FUNCTION_CASE" value="lower" />
<option name="PARAMETER_CASE" value="lower" />
<option name="DATATYPE_CASE" value="lower" />
<option name="OBJECT_CASE" value="preserve" />
</case-options>
<formatting-settings enabled="false">
<option name="STATEMENT_SPACING" value="one_line" />
<option name="CLAUSE_CHOP_DOWN" value="chop_down_if_statement_long" />
<option name="ITERATION_ELEMENTS_WRAPPING" value="chop_down_if_not_single" />
</formatting-settings>
</DBN-SQL>
</code_scheme>
</component>
\ No newline at end of file
......@@ -15,5 +15,6 @@
<element value="search.views.tag"/>
<element value="search.views.contrast_similar"/>
<element value="injection.data_sync.tasks"/>
<element value="search.views.contrast_similar"/>
</config>
</gm_rpcd_config>
......@@ -4,14 +4,15 @@ from celery import shared_task
from django.conf import settings
from django.core import serializers
from trans2es.type_info import get_type_info_map
#from rpc.all import get_rpc_remote_invoker
# from rpc.all import get_rpc_remote_invoker
from libs.es import ESPerform
import logging
import traceback
from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar,UserSimilarScore
import json
@shared_task
def write_to_es(es_type, pk_list, use_batch_query_set=False):
try:
......@@ -33,7 +34,8 @@ def write_to_es(es_type, pk_list, use_batch_query_set=False):
@shared_task
def sync_face_similar_data_to_redis():
try:
result_items = FaceUserContrastSimilar.objects.filter(is_online=True,is_deleted=False).distinct().values("participant_user_id").values_list("participant_user_id",flat=True)
result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False).distinct().values(
"participant_user_id").values_list("participant_user_id", flat=True)
logging.info("duan add,begin sync_face_similar_data_to_redis!")
......@@ -41,19 +43,45 @@ def sync_face_similar_data_to_redis():
for participant_user_id in result_items:
redis_key = redis_key_prefix + str(participant_user_id)
similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True,is_deleted=False,participant_user_id=participant_user_id,similarity__gt=0.4).order_by("-similarity").limit(100)
similar_result_items = FaceUserContrastSimilar.objects.filter(is_online=True, is_deleted=False,
participant_user_id=participant_user_id,
similarity__gt=0.3).order_by(
"-similarity")
item_list = list()
for item in similar_result_items:
item_list.append(
{
"contrast_user_id":item.contrast_user_id,
"similarity":item.similarity
"contrast_user_id": item.contrast_user_id,
"similarity": item.similarity
}
)
redis_client.set(redis_key,json.dumps(item_list))
redis_client.set(redis_key, json.dumps(item_list))
logging.info("duan add,participant_user_id:%d set data done!" % participant_user_id)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
@shared_task
def sync_user_similar_score():
try:
results_items = UserSimilarScore.objects.filter(is_deleted=False).distinct().values("user_id").values_list("user_id",flat=True)
redis_key_prefix = "physical:user_similar_score:user_id:"
logging.info("duan add,begin sync user_similar_score!")
for user_id in results_items:
redis_key = redis_key_prefix + str(user_id)
similar_results_items = UserSimilarScore.objects.filter(is_deleted=False,user_id=user_id).order_by("-score")
item_list = list()
for item in similar_results_items:
contrast_user_id = item.contrast_user_id
score = item.score
item_list.append(
[contrast_user_id,score]
)
redis_client.set(redis_key, json.dumps(item_list))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -94,7 +94,7 @@ class ESPerform(object):
return False
@classmethod
def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc"):
def put_index_mapping(cls,es_cli,sub_index_name,mapping_type="_doc",force_sync=False):
"""
:remark: put index mapping
:param es_cli:
......@@ -107,7 +107,7 @@ class ESPerform(object):
write_alias_name = cls.get_official_index_name(sub_index_name,"write")
index_exist = es_cli.indices.exists(write_alias_name)
if not index_exist:
if not index_exist and not force_sync:
return False
mapping_dict = cls.__load_mapping(sub_index_name)
......
......@@ -5,6 +5,8 @@ from __future__ import unicode_literals, print_function, absolute_import
import six
import random
from django.db import models
import logging
import traceback
class ITableChunk(object):
......@@ -147,36 +149,47 @@ class TableSlicerChunk(ITableChunk):
class TableSlicer(object):
def __init__(self, queryset, chunk_size=None, chunk_count=None, sep_list=None):
assert isinstance(queryset, models.QuerySet)
assert chunk_size is None or isinstance(chunk_size, six.integer_types)
assert chunk_count is None or isinstance(chunk_count, six.integer_types)
assert sep_list is None or isinstance(sep_list, list)
try:
assert isinstance(queryset, models.QuerySet)
assert (chunk_size is not None) + (chunk_count is not None) + (sep_list is not None) == 1
assert chunk_size is None or isinstance(chunk_size, six.integer_types)
if sep_list is not None:
sep_list = list(sep_list)
else:
count = queryset.count()
if chunk_size is None:
chunk_size = count / chunk_count
index_list = list(range(0, count, chunk_size))
sep_list = [
queryset.order_by('pk').values_list('pk', flat=True)[index]
for index in index_list
]
assert chunk_count is None or isinstance(chunk_count, six.integer_types)
self._model = queryset.model
self._query = queryset.query
self._sep_list = [None] + sep_list + [None]
assert sep_list is None or isinstance(sep_list, list)
assert (chunk_size is not None) + (chunk_count is not None) + (sep_list is not None) == 1
if sep_list is not None:
sep_list = list(sep_list)
else:
count = queryset.count()
if chunk_size is None:
chunk_size = count / chunk_count
index_list = list(range(0, count, chunk_size))
sep_list = [
queryset.order_by('pk').values_list('pk', flat=True)[index]
for index in index_list
]
self._model = queryset.model
self._query = queryset.query
self._sep_list = [None] + sep_list + [None]
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def chunks(self):
reversed_sep_list = list(reversed(self._sep_list))
for i in range(len(self._sep_list) - 1):
pk_start = reversed_sep_list[i+1]
pk_stop = reversed_sep_list[i]
yield TableSlicerChunk(model=self._model, query=self._query, pk_start=pk_start, pk_stop=pk_stop)
try:
reversed_sep_list = list(reversed(self._sep_list))
logging.info("duan add,reversed_sep_list:%d" % (len(self._sep_list) - 1))
for i in range(len(self._sep_list) - 1):
pk_start = reversed_sep_list[i + 1]
pk_stop = reversed_sep_list[i]
yield TableSlicerChunk(model=self._model, query=self._query, pk_start=pk_start, pk_stop=pk_stop)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
class TableStreamingSlicer(object):
......
from django.contrib import admin
# Register your models here.
from django.db import models
# Create your models here.
from django.test import TestCase
# Create your tests here.
from django.shortcuts import render
# Create your views here.
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from kafka import KafkaConsumer
from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
from trans2es.models.tag import TopicTag
import traceback
class KafkaManager(object):
# kafka信息
kafka_broker_list = "192.168.13.114:9092,192.168.13.116:9092,192.168.13.115:9092"
topic_name = "alpha-maidian-data"
consumser_obj = None
@classmethod
def get_kafka_consumer_ins(cls, topic_name=None):
if not cls.consumser_obj:
topic_name = cls.topic_name if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=cls.kafka_broker_list)
# cls.consumser_obj.subscribe([topic_name])
return cls.consumser_obj
class CollectData(object):
def __init__(self):
self.linucb_matrix_redis_prefix = "physical:linucb:device_id:"
self.linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
# 默认
self.user_feature = [0,1]
def _get_user_linucb_info(self, device_id):
try:
redis_key = self.linucb_matrix_redis_prefix + str(device_id)
# dict的key为标签ID,value为4个矩阵
redis_linucb_tag_data_dict = redis_client.hgetall(redis_key)
return redis_linucb_tag_data_dict
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return dict()
def update_recommend_tag_list(self, device_id,user_feature=None):
try:
recommend_tag_list = list()
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id)
if len(redis_linucb_tag_data_dict) == 0:
recommend_tag_list = LinUCB.get_default_tag_list()
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
else:
user_feature = user_feature if user_feature else self.user_feature
recommend_tag_list = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
logging.info("duan add,device_id:%s,recommend_tag_list:%s" % (str(device_id), str(recommend_tag_list)))
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
# Todo:设置过期时间,调研set是否支持
redis_client.expire(tag_recommend_redis_key, 7*24*60*60)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature=None):
try:
user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id,self.linucb_matrix_redis_prefix,redis_client)
except:
logging.error("update_user_linucb_tag_info error!")
return False
def consume_data_from_kafka(self,topic_name=None):
try:
user_feature = [1,1]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
while True:
msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
for msg_key in msg_dict:
consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg:
try:
logging.info(ori_msg)
raw_val_dict = json.loads(ori_msg.value)
if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
device_id = raw_val_dict["device"]["device_id"]
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
str(device_id), str(topic_id), str(tag_list)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
else:
exposure_cards_list = list()
device_id = raw_val_dict["device"]["device_id"]
exposure_topic_id_list = list()
for item in exposure_cards_list:
if "card_id" not in item:
continue
exposure_topic_id = item["card_id"]
logging.info(
"consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
exposure_topic_id_list.append(exposure_topic_id)
topic_tag_id_dict = dict()
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id__in=exposure_topic_id_list)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
if sql_item.topic_id not in topic_tag_id_dict:
topic_tag_id_dict[sql_item.topic_id] = list()
topic_tag_id_dict[sql_item.topic_id].append(sql_item.tag_id)
is_click = 0
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % (
str(device_id), str(topic_tag_id_dict)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
import numpy as np
import redis
from libs.cache import redis_client
from trans2es.models.tag import Tag
import logging
import traceback
import json
import pickle
class LinUCB:
d = 2
alpha = 0.25
r1 = 1
r0 = -16
default_tag_list = list()
@classmethod
def get_default_tag_list(cls):
try:
if len(cls.default_tag_list) == 0:
query_item_results = Tag.objects.filter(is_online=True)
for item in query_item_results:
cls.default_tag_list.append(item.id)
return cls.default_tag_list[:20]
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return list()
@classmethod
def linucb_recommend_tag(cls,device_id,redis_linucb_tag_data_dict,user_features_list,tag_list):
"""
:remark 获取推荐标签
:param redis_linucb_tag_data_dict:
:param user_features_list:
:param tag_list:
:return:
"""
try:
Aa_list = list()
theta_list = list()
for tag_id in tag_list:
tag_dict = pickle.loads(redis_linucb_tag_data_dict[tag_id])
Aa_list.append(tag_dict["Aa"])
theta_list.append(tag_dict["theta"])
xaT = np.array([user_features_list])
xa = np.transpose(xaT)
art_max = -1
old_pa = 0
AaI_tmp = np.array(Aa_list)
theta_tmp = np.array(theta_list)
np_array = np.dot(xaT, theta_tmp) + cls.alpha * np.sqrt(np.dot(np.dot(xaT, AaI_tmp), xa))
# top_tag_list_len = int(np_array.size/2)
# top_np_ind = np.argpartition(np_array, -top_tag_list_len)[-top_tag_list_len:]
#
# top_tag_list = list()
# top_np_list = top_np_ind.tolist()
# for tag_id in top_np_list:
# top_tag_list.append(tag_id)
#art_max = tag_list[np.argmax(np.dot(xaT, theta_tmp) + cls.alpha * np.sqrt(np.dot(np.dot(xaT, AaI_tmp), xa)))]
top_tag_set = set()
np_score_list = list()
np_score_dict = dict()
for score_index in range(0,np_array.size):
score = np_array.take(score_index)
np_score_list.append(score)
if score not in np_score_dict:
np_score_dict[score] = [score_index]
else:
np_score_dict[score].append(score_index)
sorted_np_score_list = sorted(np_score_list,reverse=True)
for top_score in sorted_np_score_list:
for top_score_index in np_score_dict[top_score]:
top_tag_set.add(str(tag_list[top_score_index], encoding="utf-8"))
if len(top_tag_set) >= 10:
break
logging.info("duan add,device_id:%s,sorted_np_score_list:%s,np_score_dict:%s" % (str(device_id), str(sorted_np_score_list), str(np_score_dict)))
return list(top_tag_set)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
@classmethod
def init_device_id_linucb_info(cls, redis_cli,redis_prefix, device_id, tag_list):
try:
redis_key = redis_prefix + str(device_id)
user_tag_linucb_dict = dict()
for tag_id in tag_list:
init_dict = {
"Aa": np.identity(cls.d),
"theta": np.zeros((cls.d, 1)),
"ba": np.zeros((cls.d, 1)),
"AaI": np.identity(cls.d)
}
pickle_data = pickle.dumps(init_dict)
user_tag_linucb_dict[tag_id] = pickle_data
redis_cli.hmset(redis_key, user_tag_linucb_dict)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
@classmethod
def update_linucb_info(cls, user_features,reward, tag_id, device_id, redis_prefix,redis_cli):
try:
if reward == -1:
logging.warning("reward val error!")
elif reward == 1 or reward == 0:
if reward == 1:
r = cls.r1
else:
r = cls.r0
xaT = np.array([user_features])
xa = np.transpose(xaT)
redis_key = redis_prefix + str(device_id)
ori_redis_tag_data = redis_cli.hget(redis_key, tag_id)
if not ori_redis_tag_data:
LinUCB.init_device_id_linucb_info(redis_client, redis_prefix, device_id,[tag_id])
else:
ori_redis_tag_dict = pickle.loads(ori_redis_tag_data)
new_Aa_matrix = ori_redis_tag_dict["Aa"] + np.dot(xa, xaT)
new_AaI_matrix = np.linalg.solve(new_Aa_matrix, np.identity(cls.d))
new_ba_matrix = ori_redis_tag_dict["ba"] + r*xa
user_tag_dict = {
"Aa": new_Aa_matrix,
"ba": new_ba_matrix,
"AaI": new_AaI_matrix,
"theta": np.dot(new_AaI_matrix, new_ba_matrix)
}
redis_cli.hset(redis_key, tag_id, pickle.dumps(user_tag_dict))
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
\ No newline at end of file
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
import itertools
from django.conf import settings
from __future__ import unicode_literals, print_function, absolute_import
import itertools
from django.conf import settings
import logging
class CeleryTaskRouter(object):
queue_task_map = {
"tapir-alpha":[
"tapir-alpha": [
'injection.data_sync.tasks.write_to_es',
]
}
......@@ -30,4 +31,4 @@ class CeleryTaskRouter(object):
logging.info("duan add,task is:%s" % str(task))
queue_name_or_none = self.task_queue_map.get(task)
return queue_name_or_none
\ No newline at end of file
return queue_name_or_none
......@@ -14,6 +14,7 @@ import os
from .log_settings import *
from datetime import timedelta
from celery.schedules import crontab
<<<<<<< HEAD
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
......@@ -199,3 +200,4 @@ USE_L10N = True
# https://docs.djangoproject.com/en/1.10/howto/static-files/
STATIC_URL = '/static/'
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '^j3sg)sj8rc@du74%fb$c2926tv!!4g(kp-=rx1)c5!1&1(dq='
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = False
ALLOWED_HOSTS = []
# Application definition
SENTRY_CELERY_ENDPOINT = "http://60b0004c8884420f8067fb32fc3ed244:20f97fc73ffa4aad9735d0e6542a6d78@sentry.igengmei.com/140"
BROKER_URL = "redis://127.0.0.1:6379/8"
# CELERY_SEND_EVENTS = True
# CELERY_SEND_TASK_SENT_EVENT = True
#
# CELERY_DEFAULT_EXCHANGE = 'celery'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
# CELERY_DEFAULT_ROUTING_KEY = 'celery'
#
# CELERY_QUEUES = {
# 'celery': {
# 'exchange': CELERY_DEFAULT_EXCHANGE,
# 'routing_key': CELERY_DEFAULT_ROUTING_KEY,
# },
# 'order': {
# 'exchange': 'order',
# 'routing_key': 'order',
# },
# }
CELERY_BROKER_URL = "redis://127.0.0.1:6379/8"
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ROUTES = ['physical.celery_task_router.CeleryTaskRouter']
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'trans2es',
'search',
'injection.data_sync',
)
CELERYBEAT_SCHEDULE = {
'sync_face_similar_data_to_redis': {
'task': 'injection.data_sync.tasks.sync_face_similar_data_to_redis',
'schedule': 120.0,
'args': ()
},
}
"""
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
"""
ROOT_URLCONF = 'physical.urls'
WSGI_APPLICATION = 'physical.wsgi.application'
REDIS_URL = "redis://127.0.0.1:6379"
# Database
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'alpha',
'USER': 'work',
'PASSWORD': 'Gengmei123',
# 'HOST': 'rm-2ze5k2we69904787l.mysql.rds.aliyuncs.com',
'HOST': 'rm-2zeaut61u9sm21m0b.mysql.rds.aliyuncs.com',
'PORT': '3306',
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4",
},
},
'face': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'face',
'USER': 'work',
'PASSWORD': 'Gengmei123',
# 'HOST': 'rm-2ze5k2we69904787l.mysql.rds.aliyuncs.com',
'HOST': 'rm-2zeaut61u9sm21m0b.mysql.rds.aliyuncs.com',
'PORT': '3306',
# 'CONN_MAX_AGE': None,
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4",
},
}
}
ES_INFO_LIST = [
{
"host": "10.29.130.141",
"port": 9200
}
]
ES_INDEX_PREFIX = "gm-dbmw"
MIDDLEWARE_CLASSES = (
'gm_tracer.middleware.TracerMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'helios.DjangoL5dMiddleware',
)
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
# Internationalization
# https://docs.djangoproject.com/en/1.10/topics/i18n/
# LANGUAGE_CODE = 'en-us'
#
TIME_ZONE = 'Asia/Shanghai'
#
USE_I18N = True
USE_L10N = True
#
# USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
STATIC_URL = '/static/'
......@@ -12,6 +12,8 @@ PyMySQL==0.9.2
gunicorn==19.9.0
gevent==1.3.7
pypinyin==0.34.1
numpy==1.16.2
lz4==2.1.6
git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git@master
git+ssh://git@git.wanmeizhensuo.com/backend/helios.git@master
......
This diff is collapsed.
This diff is collapsed.
......@@ -19,6 +19,15 @@ from libs.cache import redis_client
from trans2es.models.face_user_contrast_similar import FaceUserContrastSimilar
import json
from search.utils.topic import TopicUtils
from trans2es.models.pick_topic import PickTopic
from trans2es.models.tag import TopicTag,Tag
from trans2es.models.user_extra import UserExtra
from trans2es.models.group import Group
from trans2es.models.topic import Topic,ActionSumAboutTopic
from search.utils.common import *
from linucb.views.collect_data import CollectData
from injection.data_sync.tasks import sync_user_similar_score
class Job(object):
__es = None
......@@ -97,7 +106,9 @@ class Command(BaseCommand):
make_option('-s', '--pks', dest='pks', help='specify sync pks, comma separated', metavar='PKS', default=''),
make_option('--streaming-slicing', dest='streaming_slicing', action='store_true', default=True),
make_option('--no-streaming-slicing', dest='streaming_slicing', action='store_false', default=True),
make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default='')
make_option('-S', '--sync_type',dest='sync_type', help='sync data to es',metavar='TYPE',default=''),
make_option('-T', '--test_score', dest='test_score', help='test_score', metavar='TYPE', default='')
)
def __sync_data_by_type(self, type_name):
......@@ -116,6 +127,68 @@ class Command(BaseCommand):
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def generate_topic_score_detail(self):
try:
topic_id_dict = TopicUtils.get_recommend_topic_ids(241432787,0, 0, 500,query_type=TopicPageType.HOME_RECOMMEND,test_score=True)
for topic_id in topic_id_dict:
offline_score = 0.0
user_is_shadow = False
topic_sql_item = Topic.objects.filter(id=topic_id)
user_is_recommend=0.0
# 是否官方推荐用户
user_query_results = UserExtra.objects.filter(user_id=topic_sql_item[0].user_id)
if user_query_results.count() > 0:
if user_query_results[0].is_recommend:
offline_score += 2.0
user_is_recommend = 2.0
elif user_query_results[0].is_shadow:
user_is_shadow = True
group_is_recommend=0.0
# 是否官方推荐小组
# if topic_sql_item[0].group and topic_sql_item[0].group.is_recommend:
# offline_score += 4.0
# group_is_recommend = 4.0
topic_level_score = 0.0
# 帖子等级
if topic_sql_item[0].content_level == '5':
offline_score += 6.0
topic_level_score = 6.0
elif topic_sql_item[0].content_level == '4':
offline_score += 5.0
topic_level_score = 5.0
elif topic_sql_item[0].content_level == '3':
offline_score += 2.0
topic_level_score = 2.0
exposure_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=1).count()
click_count = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=2).count()
uv_num = ActionSumAboutTopic.objects.filter(topic_id=topic_id, data_type=3).count()
exposure_score = 0.0
uv_score = 0.0
if exposure_count > 0:
offline_score += click_count / exposure_count
exposure_score = click_count / exposure_count
if uv_num > 0:
offline_score += (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num)
uv_score = (topic_sql_item[0].vote_num / uv_num + topic_sql_item[0].reply_num / uv_num)
"""
1:马甲账号是否对总分降权?
"""
if user_is_shadow:
offline_score = offline_score * 0.5
logging.info("test_score######topic_id:%d,score:%f,offline_score:%f,user_is_recommend:%f,group_is_recommend:%f,topic_level_score:%f,exposure_score:%f,uv_score:%f"
% (topic_id,topic_id_dict[topic_id][2],offline_score,user_is_recommend,group_is_recommend,topic_level_score,exposure_score,uv_score))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
def handle(self, *args, **options):
try:
type_name_list = get_type_info_map().keys()
......@@ -129,6 +202,14 @@ class Command(BaseCommand):
if len(options["sync_type"]) and options["sync_type"]=="sync_data_to_es":
SyncDataToRedis.sync_face_similar_data_to_redis()
if len(options["test_score"]):
self.generate_topic_score_detail()
if len(options["sync_type"]) and options["sync_type"]=="linucb":
collect_obj = CollectData()
collect_obj.consume_data_from_kafka()
if len(options["sync_type"]) and options["sync_type"]=="similar":
sync_user_similar_score()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -37,6 +37,7 @@ class Command(BaseCommand):
for type_name in type_name_list:
if len(options["type"]):
<<<<<<< HEAD
if options["type"] == "all" or type_name == options["type"]:
official_index_name = ESPerform.get_official_index_name(type_name)
index_exists = es_cli.indices.exists(official_index_name)
......@@ -46,6 +47,20 @@ class Command(BaseCommand):
ESPerform.put_index_mapping(es_cli, type_name)
else:
logging.warning("index:[%s] has already existing!" % type_name)
=======
if type_name==options["type"]:
# ESPerform.put_index_mapping(es_cli, type_name, force_sync=True)
official_index_name = ESPerform.get_official_index_name(type_name)
index_exists = es_cli.indices.exists(official_index_name)
if not index_exists:
logging.info("begin create [%s] index!" % type_name)
ESPerform.create_index(es_cli,type_name)
logging.info("begin create [%s] mapping!" % type_name)
ESPerform.put_index_mapping(es_cli,type_name,force_sync=True)
>>>>>>> homeq
if len(options["indices_template"]):
template_file_name = options["indices_template"]
......
......@@ -19,6 +19,9 @@
},
"name":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"tag_type":{"type":"long"},
"collection":{"type":"long"},
"is_ai":{"type":"long"},
"is_own":{"type":"long"},
"is_online":{"type":"keyword"},//上线
"is_deleted":{"type":"keyword"},
"near_new_topic_num":{"type":"long","store": true}
......
......@@ -13,15 +13,18 @@
"user_id":{"type":"long"},
"group_id":{"type":"long"}, //所在组ID
"tag_list":{"type":"long"},//标签属性
"edit_tag_list":{"type":"long"},//编辑标签
"tag_name_list":{"type":"text","analyzer":"gm_default_index","search_analyzer":"gm_default_index"},
"share_num":{"type":"long"},
"pick_id_list":{"type":"long"},
"offline_score":{"type":"double"},//离线算分
"manual_score":{"type":"double"},//人工赋分
"has_image":{"type":"boolean"},//是否有图
"has_video":{"type":"boolean"},//是否是视频
"create_time":{"type":"date", "format":"date_time_no_millis"},
"update_time":{"type":"date", "format":"date_time_no_millis"},
"create_time_val":{"type":"long"},
"update_time_val":{"type":"long"}
"update_time_val":{"type":"long"},
"language_type":{"type":"long"}
}
}
\ No newline at end of file
......@@ -27,3 +27,22 @@ class FaceUserContrastSimilar(models.Model):
create_time = models.DateTimeField(verbose_name=u'创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
contrast_user_id = models.IntegerField(verbose_name=u'对照者用户ID')
<<<<<<< HEAD
=======
class UserSimilarScore(models.Model):
class Meta:
verbose_name=u"首页推荐用"
db_table="user_similar_score"
id = models.IntegerField(verbose_name=u"主键ID",primary_key=True)
is_deleted = models.BooleanField(verbose_name=u"是否删除")
user_id = models.IntegerField(verbose_name=u"用户ID")
contrast_user_id = models.BigIntegerField(verbose_name="参数对比的用户id", db_index=True)
score = models.FloatField(verbose_name='相似度', default=0)
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
>>>>>>> homeq
......@@ -45,6 +45,9 @@ class Tag(models.Model):
name = models.CharField(verbose_name=u"标签名称",max_length=128)
description = models.TextField(verbose_name=u"标签描述")
icon_url=models.CharField(verbose_name=u"icon_url",max_length=120)
collection = models.IntegerField(verbose_name=u"是否编辑")
is_ai = models.IntegerField(verbose_name=u"是否ai")
is_own = models.IntegerField(verbose_name=u"是否ins上自带")
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import, print_function
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
......@@ -10,34 +11,35 @@ from django.db import models
import datetime
from .pick_topic import PickTopic
from .tag import TopicTag,Tag
from .tag import TopicTag, Tag
from .user_extra import UserExtra
from .group import Group
class ActionSumAboutTopic(models.Model):
class Meta:
verbose_name=u"帖子埋点数据汇总"
db_table="action_sum_about_topic"
verbose_name = u"帖子埋点数据汇总"
db_table = "action_sum_about_topic"
partiton_date = models.CharField(verbose_name=u"日期",max_length=20)
device_id = models.CharField(verbose_name=u"用户设备号",max_length=50)
topic_id = models.CharField(verbose_name=u"帖子ID",max_length=50)
user_id = models.CharField(verbose_name=u"用户ID",max_length=50)
partiton_date = models.CharField(verbose_name=u"日期", max_length=20)
device_id = models.CharField(verbose_name=u"用户设备号", max_length=50)
topic_id = models.CharField(verbose_name=u"帖子ID", max_length=50)
user_id = models.CharField(verbose_name=u"用户ID", max_length=50)
data_type = models.IntegerField(verbose_name=u"动作类型")
data_value = models.BigIntegerField(verbose_name=u"值")
class TopicImage(models.Model):
class Meta:
verbose_name = u'日记图片'
db_table = 'topic_image'
id = models.IntegerField(verbose_name='日记图片ID',primary_key=True)
id = models.IntegerField(verbose_name='日记图片ID', primary_key=True)
topic_id = models.IntegerField(verbose_name=u'日记ID')
url = models.CharField(verbose_name=u'图片URL',max_length=300)
url = models.CharField(verbose_name=u'图片URL', max_length=300)
is_online = models.BooleanField(verbose_name='是否上线')
is_deleted = models.BooleanField(verbose_name='是否删除')
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
create_time = models.DateTimeField(verbose_name=u'创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
......@@ -46,33 +48,35 @@ class Topic(models.Model):
verbose_name = u'日记'
db_table = 'topic'
id = models.IntegerField(verbose_name=u'日记ID',primary_key=True)
name = models.CharField(verbose_name=u'日记名称',max_length=100)
#group_id = models.IntegerField(verbose_name='用户所在组ID',default=-1)
id = models.IntegerField(verbose_name=u'日记ID', primary_key=True)
name = models.CharField(verbose_name=u'日记名称', max_length=100)
# group_id = models.IntegerField(verbose_name='用户所在组ID',default=-1)
group = models.ForeignKey(
Group, verbose_name=u"关联的小组", related_name=u"group_topics",null=True, blank=True, default=None, on_delete=models.CASCADE)
Group, verbose_name=u"关联的小组", related_name=u"group_topics", null=True, blank=True, default=None,
on_delete=models.CASCADE)
user_id = models.IntegerField(verbose_name=u'用户ID')
drop_score = models.IntegerField(verbose_name=u'人工赋分',default=0)
description = models.CharField(verbose_name=u'日记本描述',max_length=200)
content = models.CharField(verbose_name=u'日记本内容',max_length=1000)
has_video = models.BooleanField(verbose_name=u'是否是视频日记')
drop_score = models.IntegerField(verbose_name=u'人工赋分', default=0)
description = models.CharField(verbose_name=u'日记本描述', max_length=200)
content = models.CharField(verbose_name=u'日记本内容', max_length=1000)
share_num = models.IntegerField(verbose_name='')
vote_num = models.IntegerField(verbose_name=u'点赞数')
reply_num = models.IntegerField(verbose_name=u'回复数')
cover = models.CharField(verbose_name='',max_length=200)
cover = models.CharField(verbose_name='', max_length=200)
is_online = models.BooleanField(verbose_name=u'是否上线')
is_deleted = models.BooleanField(verbose_name=u'是否删除')
content_level = models.CharField(verbose_name=u'内容等级',max_length=3)
create_time = models.DateTimeField(verbose_name=u'日记创建时间',default=datetime.datetime.fromtimestamp(0))
content_level = models.CharField(verbose_name=u'内容等级', max_length=3)
language_type = models.IntegerField(verbose_name=u'语种类型')
create_time = models.DateTimeField(verbose_name=u'日记创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'日记更新时间', default=datetime.datetime.fromtimestamp(0))
def topic_has_image(self):
try:
has_image = False
query_list = TopicImage.objects.filter(topic_id=self.id,is_deleted=False,is_online=True)
if len(query_list)>0:
has_image=True
query_list = TopicImage.objects.filter(topic_id=self.id, is_deleted=False, is_online=True)
if len(query_list) > 0:
has_image = True
return has_image
except:
......@@ -82,7 +86,7 @@ class Topic(models.Model):
def get_pick_id_info(self):
try:
pick_id_list = list()
query_list = PickTopic.objects.filter(topic_id=self.id,is_deleted=False)
query_list = PickTopic.objects.filter(topic_id=self.id, is_deleted=False)
for item in query_list:
pick_id_list.append(item.pick_id)
......@@ -94,23 +98,33 @@ class Topic(models.Model):
def get_topic_tag_id_list(self):
try:
topic_tag_id_list = list()
edit_tag_id_list = list()
query_results = TopicTag.objects.filter(topic_id=self.id)
for item in query_results:
topic_tag_id_list.append(item.tag_id)
tag_id_list = TopicTag.objects.filter(topic_id=self.id).values_list("tag_id", flat=True)
tag_query_results = Tag.objects.filter(id__in=tag_id_list)
for tag_item in tag_query_results:
is_online=tag_item.is_online
is_deleted=tag_item.is_deleted
collection=tag_item.collection
return topic_tag_id_list
if is_online and not is_deleted:
topic_tag_id_list.append(tag_item.id)
if collection:
edit_tag_id_list.append(tag_item.id)
return (topic_tag_id_list, edit_tag_id_list)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
return ([],[])
def get_tag_name_list(self,tag_id_list):
def get_tag_name_list(self, tag_id_list):
try:
tag_name_list = list()
query_results = Tag.objects.filter(id__in=tag_id_list).values_list("name",flat=True)
for item in query_results:
tag_name_list.append(item)
logging.info("get tag_id_list :%s" % tag_id_list)
for i in range(0, len(tag_name_list), 1000):
query_results = Tag.objects.filter(id__in=tag_id_list[i:i + 1000])
for item in query_results:
tag_name_list.append(item)
return tag_name_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -121,7 +135,7 @@ class Topic(models.Model):
offline_score = 0.0
user_is_shadow = False
#是否官方推荐用户
# 是否官方推荐用户
user_query_results = UserExtra.objects.filter(user_id=self.user_id)
if user_query_results.count() > 0:
if user_query_results[0].is_recommend:
......@@ -130,32 +144,32 @@ class Topic(models.Model):
user_is_shadow = True
# 是否官方推荐小组
if self.group and self.group.is_recommend:
offline_score += 4.0
# if self.group and self.group.is_recommend:
# offline_score += 4.0
#帖子等级
# 帖子等级
if self.content_level == '5':
offline_score += 5.0
offline_score += 6.0
elif self.content_level == '4':
offline_score += 3.0
offline_score += 5.0
elif self.content_level == '3':
offline_score += 2.0
exposure_count = ActionSumAboutTopic.objects.filter(topic_id=self.id,data_type=1).count()
exposure_count = ActionSumAboutTopic.objects.filter(topic_id=self.id, data_type=1).count()
click_count = ActionSumAboutTopic.objects.filter(topic_id=self.id, data_type=2).count()
uv_num = ActionSumAboutTopic.objects.filter(topic_id=self.id,data_type=3).count()
uv_num = ActionSumAboutTopic.objects.filter(topic_id=self.id, data_type=3).count()
if exposure_count>0:
offline_score += click_count/exposure_count
if uv_num>0:
offline_score += (self.vote_num/uv_num + self.reply_num/uv_num)
if exposure_count > 0:
offline_score += click_count / exposure_count
if uv_num > 0:
offline_score += (self.vote_num / uv_num + self.reply_num / uv_num)
"""
1:马甲账号是否对总分降权?
"""
if user_is_shadow:
offline_score = offline_score*0.5
offline_score = offline_score * 0.5
return offline_score
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return 0.0
\ No newline at end of file
return 0.0
......@@ -22,7 +22,7 @@ class User(models.Model):
verbose_name = u'用户'
db_table = 'account_user'
id = models.IntegerField(verbose_name="主键ID",primary_key=True)
id = models.IntegerField(verbose_name="主键ID", primary_key=True)
user_id = models.BigIntegerField(verbose_name=u'用户id', unique=True)
nick_name = models.CharField(verbose_name=u'昵称', max_length=255, default='')
profile_pic = models.CharField(verbose_name=u'头像', max_length=300)
......@@ -31,56 +31,58 @@ class User(models.Model):
country_id = models.CharField(verbose_name=u'国家id', max_length=40)
is_online = models.BooleanField(verbose_name="是否上线")
is_deleted = models.BooleanField(verbose_name='是否删除')
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
create_time = models.DateTimeField(verbose_name=u'创建时间', default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u'更新时间', default=datetime.datetime.fromtimestamp(0))
def get_is_recommend_flag(self):
is_shadow = False
is_recommend = False
query_sql = UserExtra.objects.filter(user_id=self.user_id,is_deleted=False,is_online=True)
query_sql = UserExtra.objects.filter(user_id=self.user_id, is_deleted=False, is_online=True)
for record in query_sql:
is_recommend = record.is_recommend
is_shadow = record.is_shadow
return (is_recommend,is_shadow)
return (is_recommend, is_shadow)
def get_latest_topic_time_val(self):
latest_topic_time_val = -1
# 获取该用户最新发帖时间
topic_records = Topic.objects.filter(user_id=self.user_id).order_by("-update_time").values_list("update_time",flat=True).first()
topic_records = Topic.objects.filter(user_id=self.user_id).order_by("-update_time").values_list("update_time",
flat=True).first()
if topic_records:
tzlc_topic_update_time = tzlc(topic_records)
latest_topic_time_val = int(time.mktime(tzlc_topic_update_time.timetuple()))
return latest_topic_time_val
def get_follow_user_id_list(self):
follow_user_id_list = list()
user_follows = self.userfollow.filter(is_online=True)
for user_follow in user_follows:
follow_user_id_list.append(user_follow.follow_id)
follow_user_detail_list = list()
sql_data_list = User.objects.filter(user_id__in=follow_user_id_list)
for detail_data in sql_data_list:
item = {
"user_id":detail_data.user_id,
"country_id":detail_data.country_id
}
follow_user_detail_list.append(item)
logging.info("get follow_user_id_list :%s" % follow_user_id_list)
for i in range(0, len(follow_user_id_list), 1000):
logging.info("get follow_user_id_list :%s" % follow_user_id_list[i:i + 1000])
sql_data_list = User.objects.filter(user_id__in=follow_user_id_list[i:i + 1000])
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
"country_id": detail_data.country_id
}
follow_user_detail_list.append(item)
return follow_user_detail_list
def get_attention_group_id_list(self):
try:
attention_group_id_list = list()
query_results = GroupUserRole.objects.filter(is_online=True,user_id=self.user_id)
query_results = GroupUserRole.objects.filter(is_online=True, user_id=self.user_id)
for item in query_results:
item_dict = {
"group_id": item.group_id,
"update_time_val":time.mktime(tzlc(item.update_time).timetuple())
"update_time_val": time.mktime(tzlc(item.update_time).timetuple())
}
attention_group_id_list.append(item_dict)
......@@ -91,24 +93,27 @@ class User(models.Model):
def get_pick_user_id_list(self):
pick_topic_id_list = list()
user_picks = self.user_pick.filter(is_deleted=False,is_pick=True)
user_picks = self.user_pick.filter(is_deleted=False, is_pick=True)
for user_pick in user_picks:
pick_topic_id_list.append(user_pick.picktopic_id)
pick_user_id_list = []
topic_sql_list = Topic.objects.filter(id__in=pick_topic_id_list)
for topic_data in topic_sql_list:
pick_user_id_list.append(topic_data.user_id)
for i in range(0, len(pick_topic_id_list), 1000):
topic_sql_list = Topic.objects.filter(id__in=pick_topic_id_list[i:i + 1000])
for topic_data in topic_sql_list:
pick_user_id_list.append(topic_data.user_id)
pick_user_id_list = tuple(pick_user_id_list)
pick_user_detail_list = list()
sql_data_list = User.objects.filter(user_id__in=pick_user_id_list)
for detail_data in sql_data_list:
item = {
"user_id":detail_data.user_id,
"country_id":detail_data.country_id
}
pick_user_detail_list.append(item)
for i in range(0, len(pick_user_id_list), 1000):
sql_data_list = User.objects.filter(user_id__in=pick_user_id_list[i:i + 1000])
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
"country_id": detail_data.country_id
}
pick_user_detail_list.append(item)
return pick_user_detail_list
......@@ -122,22 +127,22 @@ class User(models.Model):
same_group_user_id_list.append(user_items_list.user_id)
same_group_detail_list = list()
sql_data_list = User.objects.filter(user_id__in=same_group_user_id_list)
for detail_data in sql_data_list:
item = {
"user_id":detail_data.user_id,
"country_id":detail_data.country_id
}
same_group_detail_list.append(item)
for i in range(0, len(same_group_user_id_list), 1000):
sql_data_list = User.objects.filter(user_id__in=same_group_user_id_list[i:i + 1000])
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
"country_id": detail_data.country_id
}
same_group_detail_list.append(item)
return same_group_detail_list
def get_user_tag_id_list(self):
try:
user_tag_id_list = list()
query_results = AccountUserTag.objects.filter(user=self.user_id,is_deleted=False)
query_results = AccountUserTag.objects.filter(user=self.user_id, is_deleted=False)
for item in query_results:
user_tag_id_list.append(item.tag_id)
......@@ -145,4 +150,3 @@ class User(models.Model):
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
......@@ -172,57 +172,79 @@ class TypeInfo(object):
)
def insert_table_by_pk_list(self, sub_index_name, pk_list, es=None, use_batch_query_set=False):
begin = time.time()
if use_batch_query_set:
qs = self.queryset
else:
qs = self.model.objects.all()
end = time.time()
time0=end-begin
begin = time.time()
instance_list = qs.filter(pk__in=pk_list)
end = time.time()
time1=end-begin
begin = time.time()
data_list = self.bulk_get_data(instance_list)
end = time.time()
time2=end-begin
begin = time.time()
self.elasticsearch_bulk_insert_data(
sub_index_name=sub_index_name,
data_list=data_list,
es=es,
)
end = time.time()
time3=end-begin
logging.info("duan add,insert_table_by_pk_list time cost:%ds,%ds,%ds,%ds" % (time0,time1,time2,time3))
def insert_table_chunk(self, sub_index_name, table_chunk, es=None):
start_clock = time.clock()
start_time = time.time()
instance_list = list(table_chunk)
stage_1_time = time.time()
data_list = self.bulk_get_data(instance_list)
stage_2_time = time.time()
es_result = ESPerform.es_helpers_bulk(
es_cli=es,
data_list=data_list,
sub_index_name=sub_index_name,
auto_create_index=True
)
stage_3_time = time.time()
end_clock = time.clock()
return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
'{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
'{response}').format(
datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
index_prefix=sub_index_name,
type_name=self.name,
pk_start=repr(table_chunk.get_pk_start()),
pk_stop=repr(table_chunk.get_pk_stop()),
count=len(instance_list),
stage_1_duration=stage_1_time - start_time,
stage_2_duration=stage_2_time - stage_1_time,
stage_3_duration=stage_3_time - stage_2_time,
clock_duration=end_clock - start_clock,
response=es_result,
)
def insert_table_chunk(self, sub_index_name, table_chunk, es=None):
try:
start_clock = time.clock()
start_time = time.time()
instance_list = list(table_chunk)
stage_1_time = time.time()
data_list = self.bulk_get_data(instance_list)
stage_2_time = time.time()
es_result = ESPerform.es_helpers_bulk(
es_cli=es,
data_list=data_list,
sub_index_name=sub_index_name,
auto_create_index=True
)
logging.info("es_helpers_bulk,sub_index_name:%s,data_list len:%d" % (sub_index_name,len(data_list)))
stage_3_time = time.time()
end_clock = time.clock()
return ('{datetime} {index_prefix} {type_name:10s} {pk_start:>15s} {pk_stop:>15s} {count:5d} '
'{stage_1_duration:6.3f} {stage_2_duration:6.3f} {stage_3_duration:6.3f} {clock_duration:6.3f} '
'{response}').format(
datetime=datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%f'),
index_prefix=sub_index_name,
type_name=self.name,
pk_start=repr(table_chunk.get_pk_start()),
pk_stop=repr(table_chunk.get_pk_stop()),
count=len(instance_list),
stage_1_duration=stage_1_time - start_time,
stage_2_duration=stage_2_time - stage_1_time,
stage_3_duration=stage_3_time - stage_2_time,
clock_duration=end_clock - start_clock,
response=es_result,
)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None
_get_type_info_map_result = None
......
......@@ -57,7 +57,9 @@ class TagTransfer(object):
tag_type_list.append(tag_type_id)
res["tag_type"] = tag_type_list
res["collection"] = instance.collection
res["is_ai"] = instance.is_ai
res["is_own"] = instance.is_own
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -6,6 +6,8 @@ import logging
import traceback
from libs.tools import tzlc
import time
import re
import datetime
class TopicTransfer(object):
......@@ -32,13 +34,50 @@ class TopicTransfer(object):
res["group_id"] = -1
res["share_num"] = instance.share_num
begin = time.time()
res["pick_id_list"] = instance.get_pick_id_info()
res["tag_list"] = instance.get_topic_tag_id_list()
end = time.time()
time0 = (end-begin)
begin = time.time()
(topic_tag_id_list, edit_tag_id_list) = instance.get_topic_tag_id_list()
res["tag_list"] = topic_tag_id_list
res["edit_tag_list"] = edit_tag_id_list
end = time.time()
time1 = (end-begin)
begin = time.time()
res["tag_name_list"] = instance.get_tag_name_list(res["tag_list"])
end = time.time()
time2 = (end-begin)
begin = time.time()
res["offline_score"] = instance.get_topic_offline_score()
end = time.time()
time3 = (end-begin)
begin = time.time()
res["manual_score"] = instance.drop_score
res["has_image"] = instance.topic_has_image()
res["has_video"] = instance.has_video
res["language_type"] = instance.language_type
end = time.time()
time4 = (end-begin)
# # 片假名
# re_jp_pian_words = re.compile(u"[\u30a0-\u30ff]+")
# m_pian = re_jp_pian_words.search(instance.content, 0)
#
# # 平假名
# re_jp_ping_words = re.compile(u"[\u3040-\u309f]+")
# m_ping = re_jp_ping_words.search(instance.content, 0)
# if m_pian or m_ping:
# res["language_type"] = 10
# else:
# res["language_type"] = instance.language_type
create_time = instance.create_time
tzlc_create_time = tzlc(create_time)
......@@ -51,6 +90,7 @@ class TopicTransfer(object):
res["update_time"] = tzlc_update_time
res["update_time_val"] = int(time.mktime(tzlc_update_time.timetuple()))
logging.info("test topic transfer time cost,time0:%d,time1:%d,time2:%d,time3:%d,time4:%d" % (time0,time1,time2,time3,time4))
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -8,29 +8,31 @@ import time
from libs.tools import tzlc
from trans2es.models.user import User
class UserTransfer(object):
@classmethod
def get_follow_user_id_list(cls,userInstance):
def get_follow_user_id_list(cls, userInstance):
follow_user_id_list = list()
user_follows = userInstance.userfollow.filter(is_online=True)
for user_follow in user_follows:
follow_user_id_list.append(user_follow.follow_id)
follow_user_detail_list = list()
sql_data_list = User.objects.filter(user_id__in=follow_user_id_list,is_online=True,is_deleted=False)
for detail_data in sql_data_list:
item = {
"user_id":detail_data.user_id,
"country_id":detail_data.country_id
}
follow_user_detail_list.append(item)
for i in range(0, len(follow_user_id_list), 1000):
sql_data_list = User.objects.filter(user_id__in=follow_user_id_list[i:i + 1000], is_online=True,
is_deleted=False)
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
"country_id": detail_data.country_id
}
follow_user_detail_list.append(item)
return follow_user_detail_list
@classmethod
def get_user_data(cls,instance):
def get_user_data(cls, instance):
try:
res = dict()
......@@ -80,10 +82,9 @@ class UserTransfer(object):
res["attention_user_id_list"] = []
res["attention_group_id_list"] = []
res["pick_user_id_list"] = []
res["same_group_user_id_list"] = []
res["same_group_user_id_list"] = []
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {}
\ No newline at end of file
return {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment