Commit b8f43e9a authored by 段英荣's avatar 段英荣

Merge branch 'dev' into 'master'

Dev

See merge request !12
parents 020bc43e 29f2b897
# !/usr/bin/env python
# encoding=utf-8
from __future__ import absolute_import
import os
# set the default Django settings module for the 'celery' program.
# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'physical.settings')
import raven
from raven.contrib.celery import register_signal, register_logger_signal
from celery import Celery
from django.conf import settings
class Celery(Celery):
"""wrap for celery.Celery."""
def on_configure(self):
# check if sentry settings provided
if not settings.SENTRY_CELERY_ENDPOINT:
return
client = raven.Client(settings.SENTRY_CELERY_ENDPOINT)
# register a custom filter to filter out duplicate logs
register_logger_signal(client)
# hook into the Celery error handler
register_signal(client)
app = Celery('physical_tasks')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
......@@ -12,5 +12,6 @@
<element value="search.views.pick"/>
<element value="search.views.group"/>
<element value="search.views.user"/>
<element value="injection.data_sync.tasks"/>
</config>
</gm_rpcd_config>
default_app_config = 'injection.data_sync.apps.DataSyncApp'
\ No newline at end of file
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
from django.apps import AppConfig
class DataSyncApp(AppConfig):
name = 'injection.data_sync'
label = 'injected_data_sync'
......@@ -4,17 +4,25 @@ from celery import shared_task
from django.conf import settings
from django.core import serializers
from trans2es.type_info import get_type_info_map
from rpc.all import get_rpc_remote_invoker
#from rpc.all import get_rpc_remote_invoker
from libs.es import ESPerform
import logging
import traceback
@shared_task
def write_to_es(es_type, pk_list, configuration, use_batch_query_set=False):
pk_list = list(frozenset(pk_list))
type_info_map = get_type_info_map()
type_info = type_info_map[es_type]
type_info.insert_table_by_pk_list(
sub_index_name=es_type,
pk_list=pk_list,
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli()
)
def write_to_es(es_type, pk_list, use_batch_query_set=False):
try:
pk_list = list(frozenset(pk_list))
type_info_map = get_type_info_map()
type_info = type_info_map[es_type]
logging.info("duan add,es_type:%s" % str(es_type))
type_info.insert_table_by_pk_list(
sub_index_name=es_type,
pk_list=pk_list,
use_batch_query_set=use_batch_query_set,
es=ESPerform.get_cli()
)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
from __future__ import unicode_literals, absolute_import, print_function
import pymysql
from _celery import app as celery_app
pymysql.install_as_MySQLdb()
#__all__ = ('celery_app',)
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'physical.settings')
app = Celery('physical')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
app.conf.broker_url = settings.CELERY_BROKER_URL
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
......@@ -4,13 +4,13 @@ from __future__ import unicode_literals, print_function, absolute_import
import itertools
from django.conf import settings
import logging
class CeleryTaskRouter(object):
queue_task_map = {
"gaia-dbmw":{
"tapir-alpha":[
'injection.data_sync.tasks.write_to_es',
}
]
}
# Map[TaskName, QueueName]
......@@ -28,5 +28,6 @@ class CeleryTaskRouter(object):
return "slow"
"""
logging.info("duan add,task is:%s" % str(task))
queue_name_or_none = self.task_queue_map.get(task)
return queue_name_or_none
\ No newline at end of file
......@@ -24,17 +24,38 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SECRET_KEY = '^j3sg)sj8rc@du74%fb$c2926tv!!4g(kp-=rx1)c5!1&1(dq='
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
DEBUG = False
ALLOWED_HOSTS = []
# Application definition
BROKER_URL = "redis://127.0.0.1:6379/0"
SENTRY_CELERY_ENDPOINT="http://60b0004c8884420f8067fb32fc3ed244:20f97fc73ffa4aad9735d0e6542a6d78@sentry.igengmei.com/140"
BROKER_URL = "redis://127.0.0.1:6379/8"
# CELERY_SEND_EVENTS = True
# CELERY_SEND_TASK_SENT_EVENT = True
#
# CELERY_DEFAULT_EXCHANGE = 'celery'
# CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
# CELERY_DEFAULT_ROUTING_KEY = 'celery'
#
# CELERY_QUEUES = {
# 'celery': {
# 'exchange': CELERY_DEFAULT_EXCHANGE,
# 'routing_key': CELERY_DEFAULT_ROUTING_KEY,
# },
# 'order': {
# 'exchange': 'order',
# 'routing_key': 'order',
# },
# }
CELERY_BROKER_URL = "redis://127.0.0.1:6379/8"
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ROUTES = ['physical.celery_task_router.CeleryTaskRouter']
INSTALLED_APPS = [
INSTALLED_APPS = (
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
......@@ -43,9 +64,10 @@ INSTALLED_APPS = [
'django.contrib.staticfiles',
'trans2es',
'search',
'injection.data_sync'
]
'injection.data_sync',
)
"""
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
......@@ -55,9 +77,6 @@ MIDDLEWARE = [
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'physical.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
......@@ -73,6 +92,23 @@ TEMPLATES = [
},
},
]
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
"""
ROOT_URLCONF = 'physical.urls'
WSGI_APPLICATION = 'physical.wsgi.application'
......@@ -88,6 +124,7 @@ DATABASES = {
'PASSWORD': 'Gengmei123',
'HOST': 'rm-2ze5k2we69904787l.mysql.rds.aliyuncs.com',
'PORT': '3306',
'CONN_MAX_AGE': None,
'OPTIONS': {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4", # 为?~F?~T??~L~Aemoji表?~C~E
......@@ -103,37 +140,33 @@ ES_INFO_LIST = [
ES_INDEX_PREFIX="gm-dbmw"
MIDDLEWARE_CLASSES = (
'gm_tracer.middleware.TracerMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'helios.DjangoL5dMiddleware',
)
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/1.10/topics/i18n/
LANGUAGE_CODE = 'en-us'
# LANGUAGE_CODE = 'en-us'
#
TIME_ZONE = 'Asia/Shanghai'
#
USE_I18N = True
USE_L10N = True
USE_TZ = True
#
# USE_TZ = True
# Static files (CSS, JavaScript, Images)
......
......@@ -28,7 +28,7 @@ class User(models.Model):
profile_pic = models.CharField(verbose_name=u'头像', max_length=300)
gender = models.SmallIntegerField(verbose_name=u'性别')
city_id = models.CharField(verbose_name=u'城市id', max_length=60)
country_id = models.CharField(verbose_name='国家id', max_length=40)
country_id = models.CharField(verbose_name=u'国家id', max_length=40)
is_online = models.BooleanField(verbose_name="是否上线")
is_deleted = models.BooleanField(verbose_name='是否删除')
create_time = models.DateTimeField(verbose_name=u'创建时间',default=datetime.datetime.fromtimestamp(0))
......@@ -37,7 +37,7 @@ class User(models.Model):
def get_is_recommend_flag(self):
is_shadow = False
is_recommend = False
query_sql = UserExtra.objects.filter(user_id=self.user_id)
query_sql = UserExtra.objects.filter(user_id=self.user_id,is_deleted=False,is_online=True)
for record in query_sql:
is_recommend = record.is_recommend
is_shadow = record.is_shadow
......@@ -45,17 +45,13 @@ class User(models.Model):
return (is_recommend,is_shadow)
def get_latest_topic_time_val(self):
# 获取该用户最新发帖时间
latest_topic_time_val = -1
topic_records = Topic.objects.filter(user_id=self.user_id).order_by("-update_time")
check_index = 0
for record in topic_records:
topic_update_time = record.update_time
tzlc_topic_update_time = tzlc(topic_update_time)
# 获取该用户最新发帖时间
topic_records = Topic.objects.filter(user_id=self.user_id).order_by("-update_time").values_list("update_time",flat=True).first()
if topic_records:
tzlc_topic_update_time = tzlc(topic_records)
latest_topic_time_val = int(time.mktime(tzlc_topic_update_time.timetuple()))
check_index += 1
if check_index >= 1:
break
return latest_topic_time_val
......@@ -141,12 +137,12 @@ class User(models.Model):
try:
user_tag_id_list = list()
query_results = AccountUserTag.objects.filter(user=self.user_id)
query_results = AccountUserTag.objects.filter(user=self.user_id,is_deleted=False)
for item in query_results:
user_tag_id_list.append(item.tag_id)
return user_tag_id_list
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None
return []
......@@ -137,33 +137,29 @@ class TypeInfo(object):
return data_list
def elasticsearch_bulk_insert_data(self, sub_index_name, data_list, es=None):
if es is None:
es = get_es_list_by_type(self.type)
if not isinstance(es, (list, tuple,)):
es = [es]
index = ESPerform.get_official_index_name(sub_index_name=sub_index_name,index_flag="write")
bulk_actions = []
for data in data_list:
bulk_actions.append({
'_op_type': 'index',
'_index': index,
'_type': self.type,
'_id': data['id'],
'_source': data,
})
es_result = None
if bulk_actions:
for t in es:
try:
es_result = elasticsearch.helpers.bulk(client=t, actions=bulk_actions)
except Exception as e:
traceback.print_exc()
es_result = 'error'
return es_result
# assert (es is not None)
# index = ESPerform.get_official_index_name(sub_index_name=sub_index_name,index_flag="write")
# bulk_actions = []
# for data in data_list:
# bulk_actions.append({
# '_op_type': 'index',
# '_index': index,
# '_type': "_doc",
# '_id': data['id'],
# '_source': data,
# })
#
# es_result = None
# if bulk_actions:
# for t in es:
# try:
# es_result = elasticsearch.helpers.bulk(client=t, actions=bulk_actions)
# except Exception as e:
# traceback.print_exc()
# es_result = 'error'
return ESPerform.es_helpers_bulk(es,data_list,sub_index_name,True)
def elasticsearch_bulk_insert(self, sub_index_name, instance_iterable, es=None):
data_list = self.bulk_get_data(instance_iterable)
......
......@@ -6,9 +6,29 @@ import logging
import traceback
import time
from libs.tools import tzlc
from trans2es.models.user import User
class UserTransfer(object):
@classmethod
def get_follow_user_id_list(cls,userInstance):
follow_user_id_list = list()
user_follows = userInstance.userfollow.filter(is_online=True)
for user_follow in user_follows:
follow_user_id_list.append(user_follow.follow_id)
follow_user_detail_list = list()
sql_data_list = User.objects.filter(user_id__in=follow_user_id_list,is_online=True,is_deleted=False)
for detail_data in sql_data_list:
item = {
"user_id":detail_data.user_id,
"country_id":detail_data.country_id
}
follow_user_detail_list.append(item)
return follow_user_detail_list
@classmethod
def get_user_data(cls,instance):
try:
......@@ -22,13 +42,23 @@ class UserTransfer(object):
res["city_id"] = instance.city_id
res["country_id"] = instance.country_id
res["is_online"] = instance.is_online
res["is_deleted"] = instance.is_deleted
(is_recommend,is_shadow) = instance.get_is_recommend_flag()
res["is_recommend"] = is_recommend
res["is_shadow"] = is_shadow
try:
(is_recommend, is_shadow) = instance.get_is_recommend_flag()
res["is_recommend"] = is_recommend
res["is_shadow"] = is_shadow
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
res["is_recommend"] = False
res["is_shadow"] = False
latest_topic_time_val = instance.get_latest_topic_time_val()
res["latest_topic_time_val"] = latest_topic_time_val
try:
latest_topic_time_val = instance.get_latest_topic_time_val()
res["latest_topic_time_val"] = latest_topic_time_val
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
res["latest_topic_time_val"] = -1
tzlc_create_time = tzlc(instance.create_time)
res["create_time"] = tzlc_create_time
......@@ -38,13 +68,22 @@ class UserTransfer(object):
res["update_time"] = tzlc_update_time
res["update_time_val"] = int(time.mktime(tzlc_update_time.timetuple()))
res["tag_list"] = instance.get_user_tag_id_list()
res["attention_user_id_list"] = instance.get_follow_user_id_list()
res["attention_group_id_list"] = instance.get_attention_group_id_list()
res["pick_user_id_list"] = instance.get_pick_user_id_list()
res["same_group_user_id_list"] = instance.get_same_group_user_id_list()
try:
res["tag_list"] = instance.get_user_tag_id_list()
res["attention_user_id_list"] = cls.get_follow_user_id_list(userInstance=instance)
res["attention_group_id_list"] = instance.get_attention_group_id_list()
res["pick_user_id_list"] = instance.get_pick_user_id_list()
res["same_group_user_id_list"] = instance.get_same_group_user_id_list()
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
res["tag_list"] = []
res["attention_user_id_list"] = []
res["attention_group_id_list"] = []
res["pick_user_id_list"] = []
res["same_group_user_id_list"] = []
return res
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return None
\ No newline at end of file
return {}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment