Commit 86df2634 authored by 张宇's avatar 张宇

list_v2

parent ea0deec2
...@@ -9,6 +9,7 @@ from gm_rpcd.internals.configuration.model import Config, environ, from_property ...@@ -9,6 +9,7 @@ from gm_rpcd.internals.configuration.model import Config, environ, from_property
from gm_rpcd.internals.configuration.model_base import literal, xml_text, xml_text_list, \ from gm_rpcd.internals.configuration.model_base import literal, xml_text, xml_text_list, \
DefaultConfigPropertyWrapper, EnvironmentConfigProperty DefaultConfigPropertyWrapper, EnvironmentConfigProperty
DISPATCHER = None
GM_RPCD_APP_CONF_PATH_KEY = 'GM_RPCD_APP_CONF_PATH' GM_RPCD_APP_CONF_PATH_KEY = 'GM_RPCD_APP_CONF_PATH'
GM_RPCD_DEVELOP_CONF_PATH_KEY = 'GM_RPCD_DEVELOP_CONF_PATH' GM_RPCD_DEVELOP_CONF_PATH_KEY = 'GM_RPCD_DEVELOP_CONF_PATH'
...@@ -95,7 +96,8 @@ def setup_rpcd(): ...@@ -95,7 +96,8 @@ def setup_rpcd():
from gm_rpcd.internals.initializations import initialize from gm_rpcd.internals.initializations import initialize
# expose to module scope to make dispatcher singleton # expose to module scope to make dispatcher singleton
global DISPATCHER global DISPATCHER
DISPATCHER = initialize().dispatcher if not DISPATCHER:
DISPATCHER = initialize().dispatcher
# avoid call `setup_rpcd` repeatly # avoid call `setup_rpcd` repeatly
global setup_rpcd global setup_rpcd
......
from __future__ import unicode_literals
from gm_types.error import ERROR as CODES
class GaiaRPCFaultException(Exception):
def __init__(self, error, message, data):
self.error = error
self.message = message
self.data = data
def __repr__(self):
return "Error %d, %s" % (self.error, self.message)
class UniError(GaiaRPCFaultException):
def __init__(self, message):
self.error = CODES.UNIVERSAL
self.message = message
self.data = None
class GaiaRPCSpecificExceptionBase(GaiaRPCFaultException):
error = None
default_message = None
def __init__(self, message=None, data=None):
error = self.error
if message is None:
message = self.default_message
super(GaiaRPCSpecificExceptionBase, self).__init__(error=error, message=message, data=data)
class RPCPermanentError(GaiaRPCSpecificExceptionBase):
error = 2
default_message = "Permanent Error"
class RPCTemporaryError(GaiaRPCSpecificExceptionBase):
error = 3
default_message = "Temporary Error"
class RPCValidationError(GaiaRPCSpecificExceptionBase):
error = 17
default_message = "Params/Result Validation Error"
class RPCLoginRequiredException(GaiaRPCSpecificExceptionBase):
error = 401
default_message = "Login Required"
class RPCPermissionDeniedException(GaiaRPCSpecificExceptionBase):
error = 1001
default_message = "Permission Denied"
class RPCTagRelationCycleException(RPCPermanentError):
error = CODES.TAG_RELATION_CYCLE
default_message = CODES.getDesc(error)
class RPCStaffRequiredException(GaiaRPCSpecificExceptionBase):
error = 1002
default_message = "Staff Required"
class RPCNotFoundException(GaiaRPCSpecificExceptionBase):
error = 1404
default_message = "Not Found"
class RPCIntegrityError(GaiaRPCSpecificExceptionBase):
error = 1601
default_message = "Integrity Error"
...@@ -73,7 +73,6 @@ class V1BatchView(View): ...@@ -73,7 +73,6 @@ class V1BatchView(View):
session_key=request_v1_value.get('session_key'), session_key=request_v1_value.get('session_key'),
environment=request_v1_value.get('environment'), environment=request_v1_value.get('environment'),
) )
# print(DISPATCHER._method_table, DISPATCHER._method_table._MethodTable__method_map)
response = DISPATCHER.process_single_request(request) response = DISPATCHER.process_single_request(request)
response_v1_list.append(response_to_v1_json_value(response)) response_v1_list.append(response_to_v1_json_value(response))
......
# -*- coding: utf-8 -*-
'''
gaia api module -> 使用 app_label = 'api' 的model
'''
# -*- coding: utf-8 -*-
This diff is collapsed.
# -*- coding: utf-8 -*-
...@@ -37,6 +37,7 @@ INSTALLED_APPS = [ ...@@ -37,6 +37,7 @@ INSTALLED_APPS = [
# 'django.contrib.sessions', # 'django.contrib.sessions',
'django.contrib.messages', 'django.contrib.messages',
# 'django.contrib.staticfiles', # 'django.contrib.staticfiles',
'api'
] ]
MIDDLEWARE = [ MIDDLEWARE = [
...@@ -115,7 +116,7 @@ AUTH_PASSWORD_VALIDATORS = [ ...@@ -115,7 +116,7 @@ AUTH_PASSWORD_VALIDATORS = [
LANGUAGE_CODE = 'en-us' LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC' TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True USE_I18N = True
...@@ -203,6 +204,13 @@ LOGGING = { ...@@ -203,6 +204,13 @@ LOGGING = {
'level': 'DEBUG' if DEBUG else 'WARNING', 'level': 'DEBUG' if DEBUG else 'WARNING',
'propagate': False 'propagate': False
}, },
# debug
'kafka': {
'level': 'FATAL'
},
'sentry': {
'level': 'FATAL'
},
} }
} }
...@@ -222,3 +230,9 @@ try: ...@@ -222,3 +230,9 @@ try:
from courier.settings_local import * from courier.settings_local import *
except ModuleNotFoundError: except ModuleNotFoundError:
pass pass
COUNT_LIMIT = 100
ES_SEARCH_TIMEOUT = '10s'
DATABASE_ROUTERS = ['courier.db_routers.MessageRouter']
MESSAGE_DB_NAME = 'message'
import multiprocessing import multiprocessing
workers = multiprocessing.cpu_count() + 1 workers = multiprocessing.cpu_count() + 1
bind = '0.0.0.0:8000' bind = '0.0.0.0:8005'
proc_name = 'courier' proc_name = 'courier'
#pidfile = '/var/courier/gunicorn.pid'
timeout = 3600 timeout = 3600
preload_app = True
access_log_format = '%(t)s %(p)s %(h)s "%(r)s" %(s)s %(L)s %(b)s "%(f)s" "%(a)s"'
#pidfile = '/var/courier/gunicorn.pid'
#accesslog = '/var/log/courier/access.log' #accesslog = '/var/log/courier/access.log'
#access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' # default #access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' # default
#access_log_format = '"%({X-Real-IP}i)s" %(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"' #access_log_format = '"%({X-Real-IP}i)s" %(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s"'
#errorlog = '/var/log/courier/error.log' #errorlog = '/var/log/courier/error.log'
preload_app = True
#worker_class = 'gevent' #worker_class = 'gevent'
access_log_format = '%(t)s %(p)s %(h)s "%(r)s" %(s)s %(L)s %(b)s "%(f)s" "%(a)s"'
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from . import signal_handlers # fire sigal regist
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
class Signal(object):
def __init__(self, func=None):
self.__handlers = []
if func:
self.connect(func)
@classmethod
def create(cls, func):
return cls(func)
def connect(self, func):
self.__handlers.append(func)
return func
def __call__(self, *args, **kwargs):
for func in self.__handlers:
func(*args, **kwargs)
@Signal.create
def post_create_conversation(conversation, conversation_extra):
pass
@Signal.create
def post_touch_conversation(user, conversation):
pass
@Signal.create
def post_send_message(conversation):
pass
\ No newline at end of file
# coding=utf-8
from __future__ import unicode_literals, print_function, absolute_import
from elasticsearch import TransportError
# from message.utils.es_abstract import get_esop, get_migrate_esop
# from rpc.tool.log_tool import logging_exception, conversation_logger
from . import message_signals
def sync_conversation(conversation, conversation_extra=None):
# from message.utils.conversation import get_conversation_head_from_conversation, get_conversation_head_bulk_action
# conversation_extra = conversation_extra or {}
# conversation_head = get_conversation_head_from_conversation(
# conversation,
# conversation_extra)
# action = get_conversation_head_bulk_action(conversation_head)
# get_esop().bulk_single(action)
pass
@message_signals.post_create_conversation.connect
def post_create_conversation(conversation, conversation_extra):
sync_conversation(conversation, conversation_extra)
@message_signals.post_touch_conversation.connect
def post_touch_conversation(user, conversation):
sync_conversation(conversation)
@message_signals.post_send_message.connect
def post_send_message(conversation):
sync_conversation(conversation)
This diff is collapsed.
...@@ -5,13 +5,16 @@ git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git ...@@ -5,13 +5,16 @@ git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git
git+ssh://git@git.wanmeizhensuo.com/backend/helios.git git+ssh://git@git.wanmeizhensuo.com/backend/helios.git
git+ssh://git@git.wanmeizhensuo.com/system/gm-tracer.git git+ssh://git@git.wanmeizhensuo.com/system/gm-tracer.git
#git+ssh://git@git.wanmeizhensuo.com/system/kafka-python.git #git+ssh://git@git.wanmeizhensuo.com/system/kafka-python.git
git+ssh://git@git.wanmeizhensuo.com/backend/gm-upload.git@master
jsonschema==2.5.1
Django==3.0.1 Django==3.0.1
PyMySQL==0.9.3 PyMySQL==0.9.3
mysqlclient==1.4.6 mysqlclient==1.4.6
redis==3.3.11 redis==3.3.11
django-redis==4.11.0 django-redis==4.11.0
raven==6.10.0 raven==6.10.0
elasticsearch==2.3.0
kafka-python==1.4.7 kafka-python==1.4.7
gunicorn==20.0.4 gunicorn==20.0.4
djangorestframework==3.11.0 djangorestframework==3.11.0
\ No newline at end of file
# -*- coding: utf-8 -*-
from importlib import import_module
from django.conf import settings
from django.contrib.auth import SESSION_KEY, BACKEND_SESSION_KEY, HASH_SESSION_KEY, load_backend
from django.utils.translation import LANGUAGE_SESSION_KEY
from django.utils.crypto import constant_time_compare
from django.contrib.auth.models import AnonymousUser
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
# copy from django.contrib.sessions.middleware.SessionMiddleware
_engine = import_module(settings.SESSION_ENGINE)
_SessionStore = _engine.SessionStore
# copy and modified from django.contrib.sessions.middleware.SessionMiddleware
def get_django_session(session_key):
django_session = _SessionStore(session_key)
return django_session
# copy and modified from django.contrib.auth.get_user
def get_user_from_django_session(django_session):
user = None
try:
user_id = django_session[SESSION_KEY]
backend_path = django_session[BACKEND_SESSION_KEY]
except KeyError:
pass
else:
if backend_path in settings.AUTHENTICATION_BACKENDS:
backend = load_backend(backend_path)
user = backend.get_user(user_id)
# Verify the session
if ('django.contrib.auth.middleware.SessionAuthenticationMiddleware'
in settings.MIDDLEWARE_CLASSES and hasattr(user, 'get_session_auth_hash')):
session_hash = django_session.get(HASH_SESSION_KEY)
session_hash_verified = session_hash and constant_time_compare(
session_hash,
user.get_session_auth_hash()
)
if not session_hash_verified:
django_session.flush()
user = None
return user or AnonymousUser()
# copy and modified from django.contrib.auth.login
def login(django_session, user):
session_auth_hash = ''
assert user is not None
if hasattr(user, 'get_session_auth_hash'):
session_auth_hash = user.get_session_auth_hash()
if SESSION_KEY in django_session:
if django_session[SESSION_KEY] != user.pk or (
session_auth_hash and
django_session.get(HASH_SESSION_KEY) != session_auth_hash):
# To avoid reusing another user's session, create a new, empty
# session if the existing session corresponds to a different
# authenticated user.
django_session.flush()
else:
django_session.cycle_key()
django_session[SESSION_KEY] = user.pk
django_session[BACKEND_SESSION_KEY] = user.backend
django_session[HASH_SESSION_KEY] = session_auth_hash
django_session.save()
user_logged_in.send(sender=user.__class__, request=None, user=user)
# copy and modified from django.contrib.auth.logout
def logout(django_session, user):
if hasattr(user, 'is_authenticated') and not user.is_authenticated():
user = None
user_logged_out.send(sender=user.__class__, request=None, user=user)
# remember language choice saved to session
# for backwards compatibility django_language is also checked (remove in 1.8)
language = django_session.get(LANGUAGE_SESSION_KEY, django_session.get('django_language'))
django_session.flush()
if language is not None:
django_session[LANGUAGE_SESSION_KEY] = language
user_logged_out.send(sender=user.__class__, request=None, user=user)
This diff is collapsed.
import contextlib
import logging
import threading
from django.contrib.auth.models import AnonymousUser
from django.conf import settings
from cached_property import cached_property
from gm_logging.utils import get_exception_logging_func
from helios.rpc import create_default_invoker
from adapter.rpcd.exceptions import RPCLoginRequiredException
from . import auth
# from .tool.log_tool import logging_exception, info_logger
info_logger = logging.getLogger(__name__)
exception_logger = logging.getLogger('exception_logger')
from raven.contrib.django.raven_compat.models import client as _sentry_client
logging_exception = get_exception_logging_func(exception_logger, _sentry_client)
class Session(object):
def __init__(self, session_key=None):
assert session_key is None or isinstance(session_key, str)
if session_key == '': # make empty session_key as empty session
session_key = None
django_session = auth.get_django_session(session_key)
django_user = auth.get_user_from_django_session(django_session)
self._django_session = django_session
self._django_user = django_user
def do_login(self, user):
auth.login(self._django_session, user)
self._django_user = user
self._django_session.save()
def do_logout(self):
auth.logout(self._django_session, self._django_user)
self._django_user = auth.AnonymousUser()
self._django_session.save()
@property
def session_key(self):
return self._django_session.session_key
@property
def has_login(self):
user = self.user
info_logger.info(user)
res = user.is_authenticated()
return res
def login_required(self):
if not self.has_login:
raise RPCLoginRequiredException
def set_wechat_unionid(self, unionid):
if unionid:
sk = "wx_unionid"
self._django_session[sk] = unionid
self._django_session.save()
def get_wechat_unionid(self):
sk = "wx_unionid"
result = self._django_session.get(sk, None)
return result
def set_wechat_openid(self, wechat_appid, openid):
if wechat_appid and openid:
sk = "wx_openid_for_app_{}".format(wechat_appid)
self._django_session[sk] = openid
self._django_session.save()
def get_wechat_openid(self, wechat_appid):
result = None
if wechat_appid:
sk = "wx_openid_for_app_{}".format(wechat_appid)
result = self._django_session.get(sk, None)
return result
@property
def user_id(self):
return self.user.id
@property
def user(self):
user = self._django_user
return user if user.is_active else AnonymousUser()
@property
def groups(self):
return self.user.belong_groups.values_list('name', flat=True)
_base_invoker = create_default_invoker(
debug=settings.DEBUG
).with_config(
dump_curl=True
)
class NoCurrentContextError(Exception):
pass
def get_current_context_or_throw_exception():
context = ContextManager.get_active_context()
if context:
return context
raise NoCurrentContextError
def _do_get_rpc_remote_invoker():
context = ContextManager.get_active_context()
if context:
return context.rpc_remote
else:
return _base_invoker
def get_rpc_remote_invoker():
return _do_get_rpc_remote_invoker()
def get_gaia_local_invoker():
# TODO: gaia_loal_invoker
context = ContextManager.get_active_context()
# if context:
return context.gaia_local
# else:
# from .nested_invoker import NestedInvoker
# return NestedInvoker(ctx=context)
class Context(object):
has_session = None
logger = None
def __init__(self, session_key, request=None):
self.__session_key = session_key
self.has_session = bool(session_key)
self._request = request
@cached_property
def session(self):
return Session(session_key=self.__session_key)
# @cached_property
# def gaia_local(self):
# from .nested_invoker import NestedInvoker
# return NestedInvoker(self)
@property
def rpc(self):
try:
raise Exception(u'should not use Context.rpc, use Context.gaia_local or Context.rpc_remote')
except Exception:
if settings.DEBUG:
raise
else:
logging_exception()
return self.gaia_local
@cached_property
def rpc_remote(self):
if self._request:
client_info = self._request.client_info
else:
client_info = None
return _base_invoker.with_config(
session_key=self.__session_key,
client_info=client_info,
)
class ContextManager(object):
_active_context_local = threading.local()
@classmethod
@contextlib.contextmanager
def with_active_context(cls, context):
"""
:type context: Context
"""
acl = cls._active_context_local
previous = getattr(acl, 'context', None)
acl.context = context
try:
yield
finally:
acl.context = previous
@classmethod
def get_active_context(cls):
"""
:rtype: Context | None
"""
return getattr(cls._active_context_local, 'context', None)
class ConnectionInfo(object):
request = None
client_ip = None
def __init__(self, request, client_ip=None):
self.request = request
# Xing Ye tells me that there are these settings on proxy:
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
if not client_ip:
try:
client_ip = request.META.get('HTTP_X_FORWARDED_FOR').split(',')[0]
except Exception:
pass
self.client_ip = client_ip
class Request(object):
method = None
params = None
session_key = None
environment = None
is_nested_call = None
context = None
method_info = None
request_info = None
def __init__(self, method, params, session_key, environment, is_nested_call=False):
self.method = method
self.params = params
self.session_key = session_key
self.environment = environment
self.is_nested_call = is_nested_call
self.context = Context(session_key=session_key, request=self)
@property
def client_info(self):
# return (self.environment or {}).get(u'client_info')
request_info = self.request_info
if request_info:
return request_info.get_client_info()
return None
def create_fake_context():
return Context(session_key=None)
# -*- coding: utf-8 -*-
from typing import List, Dict
from rpc.context import get_rpc_remote_invoker
def get_doctor_id_by_user_id(user_id: int) -> int:
result = get_rpc_remote_invoker()['gaia/get_doctor_id_by_user_id'](
user_id=user_id,
).unwrap()
return result.get('user_id')
def get_message_mark_user(user_id_list: List[int],
target_user_id_list: List[int]) \
-> List[Dict]:
return get_rpc_remote_invoker()['gaia/courier/mark_user'](
user_id_list=user_id_list,
target_user_id_list=target_user_id_list
).unwrap()
\ No newline at end of file
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import itertools
import threading
import functools
import elasticsearch
from django.conf import settings
from .config import config, ESDatabaseConfig, ESTableSchema, ESVersion, \
load_mapping
from .models import ESBulkAction
class ESClientManagerInterface(object):
def search(self, table, **kwargs):
raise NotImplementedError
def index(self, table, body, **kwargs):
raise NotImplementedError
def mget(self, table, body, **kwargs):
raise NotImplementedError
def bulk_single(self, action):
raise NotImplementedError
def helper_bulk(self, action_iter, **kwargs):
raise NotImplementedError
def helper_scan(self, table, **kwargs):
raise NotImplementedError
def alter_table(self, table, drop_if_exists=False):
raise NotImplementedError
def _create_es_client(hosts):
if settings.DEBUG:
es = elasticsearch.Elasticsearch(
hosts=hosts,
# no sniffing
sniff_on_start=False,
sniff_on_connection_fail=False
)
else:
es = elasticsearch.Elasticsearch(
hosts=hosts,
# sniff before doing anything
sniff_on_start=True,
# refresh nodes after a node fails to respond
sniff_on_connection_fail=True,
# and also every 60 seconds
sniffer_timeout=60,
sniff_timeout=1
)
return es
class ESClientManager(ESClientManagerInterface):
INDEX = 'index'
DOC_TYPE = 'doc_type'
def __init__(self, database_config: ESDatabaseConfig):
# database_config = config.get_database_config(database_name)
assert isinstance(database_config, ESDatabaseConfig)
self.client = _create_es_client(hosts=database_config.hosts)
self.database_config = database_config
def get_table_index(self, table):
return self.database_config[table].index
def get_table_doc_type(self, table):
return self.database_config[table].doc_type
def params_add_table(self, table, params):
assert isinstance(params, dict)
if self.INDEX in params or self.DOC_TYPE in params:
raise ValueError(
'params contains {} or {}'.format(self.INDEX, self.DOC_TYPE))
table_config = self.database_config[table]
params[self.INDEX] = table_config.index
params[self.DOC_TYPE] = table_config.doc_type
return params
def bulk_action_to_dict(self, bulk_action):
assert isinstance(bulk_action, ESBulkAction)
d = dict(bulk_action.params)
if bulk_action.table:
table_config = self.database_config[bulk_action.table]
d[bulk_action.INDEX] = table_config.index
d[bulk_action.DOC_TYPE] = table_config.doc_type
return d
def search(self, table, **kwargs):
assert isinstance(table, ESTableSchema)
return self.client.search(
index=self.get_table_index(table),
doc_type=self.get_table_doc_type(table),
**kwargs
)
def index(self, table, body, **kwargs):
assert isinstance(table, ESTableSchema)
return self.client.index(
index=self.get_table_index(table),
doc_type=self.get_table_doc_type(table),
body=body,
**kwargs
)
def mget(self, table, body, **kwargs):
assert isinstance(table, ESTableSchema)
return self.client.mget(
index=self.get_table_index(table),
doc_type=self.get_table_doc_type(table),
body=body,
**kwargs
)
def bulk_single(self, action):
assert isinstance(action, ESBulkAction)
action_dict = self.bulk_action_to_dict(action)
expanded = elasticsearch.helpers.expand_action(action_dict)
if expanded[1] is None:
bulk_actions = (expanded[0],)
else:
bulk_actions = expanded
self.client.bulk(bulk_actions)
def helper_scan(self, table, **kwargs):
params = dict(kwargs)
self.params_add_table(table=table, params=params)
return elasticsearch.helpers.scan(
client=self.client,
**params
)
def helper_bulk(self, action_iter, **kwargs):
return elasticsearch.helpers.bulk(
client=self.client,
actions=itertools.imap(self.bulk_action_to_dict, action_iter),
**kwargs
)
def alter_table(self, table, drop_if_exists=False):
assert isinstance(table, ESTableSchema)
if self.database_config.es_version == ESVersion.V1:
mapping_name = table.mapping_v1_name
elif self.database_config.es_version == ESVersion.V2:
mapping_name = table.mapping_v2_name
else:
raise Exception('invalid es_version: {}'.format(
self.database_config.es_version))
mapping = load_mapping(mapping_name)
cl = self.client.indices
index = self.get_table_index(table)
doc_type = self.get_table_doc_type(table)
if not cl.exists(index=index):
cl.create(index=index)
if cl.exists_type(index=index, doc_type=doc_type) and drop_if_exists:
cl.delete_mapping(index=index, doc_type=doc_type)
return cl.put_mapping(index=[index], doc_type=doc_type, body=mapping)
class ESOperationType(object):
HOST_ONLY = 1
EXTENSIVE = 2
def es_operation_type_seletor(optype):
def decorator(f):
name = f.__name__
if optype == ESOperationType.HOST_ONLY:
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
return getattr(self.default_operator, name)(*args, **kwargs)
return wrapper
elif optype == ESOperationType.EXTENSIVE:
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
try:
return getattr(self.default_operator, name)(*args, **kwargs)
finally:
return getattr(self.secondary_operator, name)(*args, **kwargs)
return wrapper
else:
raise Exception('invalid operation type: {}'.format(optype))
return decorator
class ESHostBackupClientManager(ESClientManager):
def __init__(self, default, secondary):
assert isinstance(default, ESClientManager)
assert isinstance(secondary, ESClientManager)
self.default_operator = default # for host
self.secondary_operator = secondary # for backup
@es_operation_type_seletor(ESOperationType.HOST_ONLY)
def search(self, table, **kwargs):
raise NotImplementedError
@es_operation_type_seletor(ESOperationType.EXTENSIVE)
def index(self, table, body, **kwargs):
raise NotImplementedError
@es_operation_type_seletor(ESOperationType.HOST_ONLY)
def mget(self, table, body, **kwargs):
raise NotImplementedError
@es_operation_type_seletor(ESOperationType.EXTENSIVE)
def bulk_single(self, action):
raise NotImplementedError
@es_operation_type_seletor(ESOperationType.HOST_ONLY)
def helper_scan(self, table, **kwargs):
raise NotImplementedError
def alter_table(self, table, drop_if_exists=False):
raise NotImplementedError('alter_table does not work for ESOperatorCombined')
\ No newline at end of file
# -*- coding: utf-8 -*-
import json
import os
import re
import jsonschema
import six
from django.conf import settings
def load_mapping(mapping_name):
mapping_file_path = os.path.join(
settings.BASE_DIR,
'message', 'mapping', '%s.json' % (mapping_name,)
)
mapping = ''
with open(mapping_file_path, 'r') as f:
for line in f:
# 去掉注释
mapping += re.sub(r'//.*$', '', line)
mapping = json.loads(mapping)
return mapping
class ESVersion(object):
V1 = 'v1'
V2 = 'v2'
class ESTableSchema(object):
def __init__(self, table_name, mapping_v1_name, mapping_v2_name):
assert isinstance(table_name, six.string_types)
assert isinstance(mapping_v1_name, six.string_types)
self.table_name = table_name
self.mapping_v1_name = mapping_v1_name
self.mapping_v2_name = mapping_v2_name
def __repr__(self):
return '{}(table_name={})'.format(
self.__class__.__name__,
self.table_name,
)
def __reduce__(self):
raise Exception('unserializable')
def __reduce_ex__(self, *args, **kwargs):
raise Exception('unserializable')
table_message = ESTableSchema(
table_name='message',
mapping_v1_name='message.v1',
mapping_v2_name='message.v2',
)
table_conversation = ESTableSchema(
table_name='conversation',
mapping_v1_name='conversation.v1',
mapping_v2_name='conversation.v2',
)
table_schema_map = {
ts.table_name: ts
for ts in [table_conversation, table_message]
}
_config_schema = {
'$schema': 'http://json-schema.org/draft-04/schema#',
'type': 'object',
'properties': {
'order': {
'type': 'array',
'items': {
'type': 'string',
'minItems': 1,
'maxItems': 2,
'uniqueItems': True,
}
},
'database': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'name': {'type': 'string'},
'es_version': {'enum': [ESVersion.V1, ESVersion.V2]},
'hosts': {'type': 'array'},
'table': {
'type': 'array',
'items': {
'type': 'object',
'properties': {
'name': {
'type': 'string',
'enum': list(table_schema_map),
},
'index': {'type': 'string'},
'doc_type': {'type': 'string'},
},
'required': ['name', 'index', 'doc_type'],
}
}
},
'required': ['name', 'es_version', 'hosts', 'table'],
},
},
},
'required': ['order', 'database'],
}
class ESTableConfig(object):
def __init__(self, name, index, doc_type):
assert isinstance(name, six.string_types)
assert isinstance(index, six.string_types)
assert isinstance(doc_type, six.string_types)
self.name = name
self.index = index
self.doc_type = doc_type
class ESDatabaseConfig(object):
def __init__(self, name, es_version, hosts, table_list):
self.name = name
self.es_version = es_version
self.hosts = hosts
self.table_list = table_list
self.__table_map = {
table.name: table
for table in table_list
}
def __getitem__(self, item):
assert isinstance(item, ESTableSchema)
return self.__table_map[item.table_name]
class Config(object):
def __init__(self, config_data):
jsonschema.validate(config_data, _config_schema)
self.config_data = config_data
order = config_data['order']
assert isinstance(order, list)
self.order = order
database_list = []
for database_config in config_data['database']:
table_list = []
for table_config in database_config['table']:
table_list.append(ESTableConfig(
name=table_config['name'],
index=table_config['index'],
doc_type=table_config['doc_type'],
))
database_list.append(ESDatabaseConfig(
name=database_config['name'],
es_version=database_config['es_version'],
hosts=database_config['hosts'],
table_list=table_list,
))
self.database_list = database_list
self.__database_map = {
database.name: database
for database in database_list
}
def get_database_config(self, name):
return self.__database_map[name]
config = Config(settings.ES_MSG)
# -*- coding: utf-8 -*-
class ESBulkAction(object):
INDEX = '_index'
DOC_TYPE = '_type'
def __init__(self, table=None, params=None):
params = params or {}
if self.INDEX in params or self.DOC_TYPE in params:
raise ValueError('params contains {} or {}'.format(self.INDEX, self.DOC_TYPE))
self.table = table
self.params = params
# -*- coding: utf-8 -*-
import threading
from typing import Type
from .config import config
from .client import ESClientManager, ESHostBackupClientManager
_esop_instance_lock = threading.Lock()
_esop_instance = None # singleton
_esop_migrate_instance = None
def create_esop_for_database(database_name) -> ESClientManager:
database_config = config.get_database_config(database_name)
return ESClientManager(database_config=database_config)
def _create_esop() -> Type[ESClientManager]:
es_client_manager_list = [
create_esop_for_database(database_name)
for database_name in config.order
]
if len(es_client_manager_list) == 1:
return es_client_manager_list[0]
elif len(es_client_manager_list) == 2:
default, secondary = es_client_manager_list
return ESHostBackupClientManager(default=default, secondary=secondary)
else:
raise Exception('impossible')
def get_esop() -> Type[ESClientManager]:
'''be sure thread-safed singleton'''
global _esop_instance
if _esop_instance is None:
with _esop_instance_lock:
if _esop_instance is None:
_esop_instance = _create_esop()
return _esop_instance
# -*- coding: utf-8 -*-
import functools
from typing import Dict
from django.conf import settings
from gm_types.msg import CONVERSATION_TYPE, CONVERSATION_ORDER
limited_size = functools.partial(min, settings.COUNT_LIMIT)
def search_conversation_from_es(offset=0,
size=50,
filters={},
query={},
sort_type=CONVERSATION_ORDER.LAST_REPLY_TIME)\
-> dict:
res = search_conversation_es(offset, size, filters, query, sort_type)
conversation_ids = [int(s['_id']) for s in res['hits']['hits']]
total_count = res['hits']['total']
return {
'total_count': total_count,
'conversation_ids': conversation_ids,
}
def search_conversation_es(offset: int=0,
size: int=50,
filters: Dict={},
query: Dict={},
sort_type:CONVERSATION_ORDER=CONVERSATION_ORDER.LAST_REPLY_TIME):
size = limited_size(size)
filter_element_list = []
query_element_list = []
for k, v in filters.items():
if k == 'user_ids':
filter_element_list.append({
'nested': {
'path': 'users',
'query': {
'terms': {'users.id': v}
}
}
})
elif k == 'multi_filter':
filter_element_list.append({
'nested': {
'path': 'users',
'query': {
'bool': {
'must': [
{'match': {'users.is_unread': v['is_unread']}},
{'terms': {'users.id': v['user_ids']}}
]
}
}
}
})
elif k == 'multi_filter_status':
filter_element_list.append({
'nested': {
'path': 'users',
'query': {
'bool': {
'must': [
{'match': {'users.status': v['status']}},
{'terms': {'users.id': v['user_ids']}}
]
}
}
}
})
elif k == 'conversation_type':
v = int(v)
local_filters = []
local_filters.append({
'term': {'conversation_type': v}
})
if v == CONVERSATION_TYPE.MESSAGE:
local_filters.append({
'bool': {
'must_not': {
'exists': {'field': 'conversation_type'}
}
}
})
filter_element_list.append({
'bool': {
'should': local_filters
}
})
elif k == 'is_star':
filter_element_list.append({
'bool': {
'must': {
'term': {'is_star_by_doctor': v}
}
}
})
elif k == 'last_reply_time_start_gte':
filter_element_list.append({
'bool': {
'must': {
'range': {'last_reply_time': {
"gte": v
}
}
}
}
})
elif k == 'last_reply_time_end_lte':
filter_element_list.append({
'bool': {
'must': {
'range': {'last_reply_time': {
"lte": v
}
}
}
}
})
elif k == 'status':
filter_element_list.append({
'bool': {
'must': {
'term': {'status': v}
}
}
})
for k, v in query.items():
if k == 'content' and v:
query_element_list.append({
'nested': {
'path': 'messages',
'score_mode': 'max',
'query': {
'match_phrase': {'messages.content.text': v}
}
}
})
if k == 'comment' and v:
query_element_list.append({
'nested': {
'path': 'users',
'score_mode': 'max',
'query': {
'match_phrase': {'users.comment': v}
}
}
})
if k == 'user_last_name' and v:
query_element_list.append({
'nested': {
'path': 'users',
'score_mode': 'max',
'query': {
'match_phrase': {'users.last_name': v}
}
}
})
if k == 'user_id' and v:
query_element_list.append({
'nested': {
'path': 'users',
'score_mode': 'max',
'query': {
'match_phrase': {'users.id': v}
}
}
})
es_query = {
'query': {
'filtered': {
'filter': {
'bool': {
'must': filter_element_list,
},
},
'query': {
'bool': {
'should': query_element_list,
}
}
},
},
}
if sort_type == CONVERSATION_ORDER.UNREAD:
es_query['sort'] = [
{'_script': {
'lang': settings.ES_SCRIPT_LANG,
'script_file': 'sort_conversation-default',
'type': 'number',
'params': {
'context_user_id': filters['user_id'] if 'user_id' in filters else -1,
},
'order': 'desc',
}},
{'last_reply_time': {'order': 'desc'}},
]
elif sort_type == CONVERSATION_ORDER.LAST_REPLY_TIME:
es_query['sort'] = [
{'last_reply_time': {'order': 'desc'}},
]
from search.eswrapper.config import table_conversation
from search.eswrapper.shortcuts import get_esop
res = get_esop().search(
table=table_conversation,
timeout=settings.ES_SEARCH_TIMEOUT,
body=es_query,
from_=offset,
size=size,
)
return res
# -*- coding: utf-8 -*-
#!/usr/bin/env python
# coding=utf-8
#!/usr/bin/env python
# coding=utf-8
import base64
class BaseUnread(object):
_cache_time = 86400 # 缓存一天
_time_fmt = '%Y-%m-%d %H:%M:%S' # 记录用户最新操作时间存储的格式
def __init__(self, user_id=None, doctor_id=None):
assert any([user_id, doctor_id])
self.user_id = None
self.doctor_id = None
# def _reload_user(self):
# if self.user_id is not None:
# return
# assert self.doctor_id
# from hippo.models import Doctor
# self.user_id = Doctor.objects.get(id=self.doctor_id).user_id
def gen_cache_key(self, event, *args, **kwargs):
"""
1. 在用户通用缓存中,使用该方法,需要传递 参数 is_user=True
2. 子类必须要初始化self.user或者self.doctor
"""
is_user = kwargs.get('is_user', False)
if is_user:
key = u'user:{}:{}'.format(self.user_id, event)
else:
key = u'doctor:{}:{}'.format(self.doctor_id, event)
return key
def gen_order_viewtime_key(self, pre):
"""
医生最后一次访问订单列表缓存时间所用的key
"""
return u'{}:time'.format(pre)
def gen_conversation_key(self, conversation_id=None):
"""
私信未读,以hash的方式存储,返回一对值
"""
name = u'conversation:{}'.format(self.user_id)
return name, conversation_id
def gen_poll_channel(doctor_id):
"""
连接poll需要的channel_name
"""
doctor_id_str = base64.b64encode(doctor_id.encode('utf-8'))
return u"gaia:doctor:unread:{}".format(doctor_id_str)
def gen_poll_named_channel(user_id):
"""生成连接poll需要的channel_name
:param user_id: 用户ID
:rtype: unicode
"""
userid_base64 = base64.b64encode(user_id.encode('utf-8'))
return u"poll_channel:{}".format(userid_base64)
#!/usr/bin/env python
# coding=utf-8
import json
import datetime
import time
from django.contrib.auth.models import User
from gm_types.gaia import NOTIFY_EVENT, NOTIFY_OPERATION, DOCTOR_TYPE
# from api.models import Doctor
from api.tool.user_tool import get_doctor_by_user_id
from hippo.tool.chain_hospital_tools import get_master_merchant
from rpc.all import get_rpc_remote_invoker
from rpc.cache import unread_cache
from rpc.tool.log_tool import doctor_unread_logger
from services.unread.base import BaseUnread, gen_poll_channel, gen_poll_named_channel
def noti_operation(event, operation, user_id=None, doctor_id=None, data=None):
"""
是User的操作传值user_id
确认是Doctor操作,传值doctor_id
user_id和doctor_id必有一个不为空
1. event == NOTIFY_EVENT.CONVERSATION 为私信时
data = {
'conversation_id': 1518193,
'send_uid': 1938785,
'target_uid': 602329,
'conversation_type': 1,
}
注意:只处理了conversation_type == 1 的未读消息
"""
method = "{}_{}".format(event, operation)
unread_info = u'event_operation is {}, user_id is {}, doctor_id is {}, data is {}'.format(
method, user_id, doctor_id, json.dumps(data) if data else ''
)
doctor_unread_logger.info(unread_info)
if event == NOTIFY_EVENT.CONVERSATION: # 若是私信是面对User的
noti_unread = NotiUserUnread(user_id)
else:
noti_unread = NotiDoctorUnread(doctor_id)
handler = getattr(noti_unread, method)
handler(data=data)
def noti_poll(doctor_id, event, operation, content):
noti_content = {
'event': event,
'operation': operation,
'content': content,
'send_timestamp': time.time()
}
text = json.dumps(noti_content)
get_rpc_remote_invoker()['poll/named_channel/publish_text'](
named_channel=gen_poll_channel(doctor_id),
text=text,
).unwrap()
def noti_user_poll(user_id, event, operation, content):
"""发送消息通知对应的用户
:param user_id: 用户ID
:param event: 事件 example: conversation
:param operation: 操作 example: add
:param content: 数据内容
:rtype: None
"""
noti_content = {
'event': event,
'operation': operation,
'content': content,
}
text = json.dumps(noti_content)
get_rpc_remote_invoker()['poll/named_channel/publish_text'](
named_channel=gen_poll_named_channel(str(user_id)),
text=text,
).unwrap()
class _NotiUnread(BaseUnread):
_hacked_methods = []
def __getattribute__(self, name):
_hacked_methods = object.__getattribute__(self, '_hacked_methods')
if name in _hacked_methods:
def newfunc(*args, **kwargs):
_method = name.split('_')
_event = _method[0]
_operation = _method[1]
result = self._operation(_event, _operation, *args, **kwargs)
return result
return newfunc
else:
return super(_NotiUnread, self).__getattribute__(name)
def _operation(self, *args, **options):
raise NotImplementedError('subclasses of _NotiUnread must provide a _operation() method')
class NotiUserUnread(_NotiUnread):
_hacked_methods = [
# conversation_clear 是清除某个具体conversation_id会话的未读消息
'conversation_add', 'conversation_clear', # 'conversation_delete',
]
def __init__(self, user_id):
self.user_id = user_id
def _gen_user_cache_key(self, event):
"""
!!! 用户使用此获取cache_key
不要使用 gen_cache_key
"""
return self.gen_cache_key(event, is_user=True)
def _operation(self, event, operation, *args, **kwargs):
if event == NOTIFY_EVENT.CONVERSATION: # 私信
# 对私信的操作,都会带参数data={'conversation_id': xxx, 'send_uid': xxx, 'target_uid': xxx}
data = kwargs['data']
# 统计未读数
name, key = self.gen_conversation_key(data['conversation_id'])
if operation == NOTIFY_OPERATION.ADD:
unread_cache.hincrby(name, key)
elif operation == NOTIFY_OPERATION.CLEAR:
_unread_num = unread_cache.hdel(name, key)
data['unread_num'] = _unread_num
unread_cache.delete(self._gen_user_cache_key(event))
doctor = get_doctor_by_user_id(self.user_id)
if doctor:
noti_poll(doctor.id, event, operation, data)
if doctor.doctor_type == DOCTOR_TYPE.DOCTOR:
# 通知机构管理者
doctor_office = Doctor.objects.filter(
doctor_type=DOCTOR_TYPE.OFFICER, hospital_id=doctor.hospital.id
).first()
if doctor_office:
noti_poll(doctor_office.id, event, operation, data)
_merchant = doctor.merchant
if _merchant:
# 通知商户
_master_merchant = get_master_merchant(_merchant)
if _master_merchant:
noti_poll(_master_merchant.doctor.id, event, operation, data)
else:
noti_user_poll(self.user_id, event, operation, data)
class NotiDoctorUnread(_NotiUnread):
_hacked_methods = [
'order_add', 'order_delete', 'order_clear',
'refund_add', 'refund_delete', # 'refund_clear',
'reserve_add', 'reserve_delete', # 'reserve_clear',
'system_add',
'deal_add',
]
def __init__(self, doctor_id):
self.doctor_id = doctor_id
def _operation(self, event, operation, *args, **kwargs):
key = self.gen_cache_key(event, *args, **kwargs)
del_num = unread_cache.delete(key)
unread_info = u'NotiDoctorUnread._operation, key is {}, del_num is {}'.format(key, del_num)
doctor_unread_logger.info(unread_info)
if event == NOTIFY_EVENT.ORDER and operation == NOTIFY_OPERATION.CLEAR:
# 若是查看订单列表时,则清除小红点,记录查看时间
order_time_key = self.gen_order_viewtime_key(key)
now = datetime.datetime.now()
unread_cache.set(order_time_key, now.strftime(self._time_fmt))
elif event == NOTIFY_EVENT.REFUND:
# 若是 申请退款,订单状态变成 WAIT_REFUNDED = ('6', '退款处理中')
# 若是同意/拒绝了退款,订单状态改变了,此时需清除 更改订单的 未读统计
order_del_num = unread_cache.delete(self.gen_cache_key(NOTIFY_EVENT.ORDER))
order_log = u'delete order cache when operate refund, doctor_id is {}, del_num is {}'.format(
self.doctor_id, order_del_num
)
doctor_unread_logger.info(order_log)
# 通知
noti_poll(self.doctor_id, event, operation, None)
#!/usr/bin/env python
# coding=utf-8
import datetime
from django.conf import settings
# from api.models import Doctor, Order, RefundOrder, Reservation
from rpc.cache import unread_cache
# from rpc.tool.log_tool import doctor_unread_logger
from gm_types.gaia import NOTIFY_EVENT
from gm_types.gaia import ORDER_STATUS, REFUND_STATUS, RESERVATION_STATUS
from services.unread.base import BaseUnread
class _Unread(BaseUnread):
"""
1. event有新的operation,删除缓存未读数,并通知poll
2. get由client主动请求,缓存没有,计算一次
"""
# TODO: _Unread
# def _get_unread(self, event, *args, **kwargs):
# key = self.gen_cache_key(event, *args, **kwargs)
# unread = unread_cache.get(key)
# if unread is not None:
# unread = int(unread)
# else:
# unread_info = u'get_unread_from_cache, key is {}, cache is delete'.format(key)
# doctor_unread_logger.info(unread_info)
#
# return key, unread
class UserUnread(_Unread):
def __init__(self, user_id):
self.user_id = user_id
def _get_user_unread(self, event, *args, **kwargs):
"""
!!! 用户使用 _get_user_unread 获取未读
不要使用 _get_unread
"""
return self._get_unread(event, *args, is_user=True, **kwargs)
def get_conversation_unread(self, conversation_id, default=0):
"""
返回单个私信未读数
"""
name, key = self.gen_conversation_key(conversation_id)
unread = unread_cache.hget(name, key) or 0
if unread == 0 and default > 0:
unread = default
unread_cache.hsetnx(name, key, default)
else:
unread = int(unread)
return unread
def get_conversation_all_unread(self):
"""
返回私信所有未读总数
1. 先get取缓存看是否有
2. 再考虑取出hgetall所有缓存进行计算一次
"""
self._reload_user()
key, unread = self._get_user_unread(NOTIFY_EVENT.CONVERSATION)
if unread is None:
name, _ = self.gen_conversation_key()
info = unread_cache.hgetall(name)
unread = 0
for (k, v) in info.items():
unread += int(v)
unread_cache.set(key, unread, ex=self._cache_time, nx=True)
return unread
@classmethod
def get_all_conversation_unread_num_by_ids(cls, user_ids):
"""批量获取所有未读数 针对拜博大集团优化"""
result = {}
cache_key_format = 'user:{}' + ':{}'.format(NOTIFY_EVENT.CONVERSATION)
cache_key_list = []
for user_id in user_ids:
_cache_key = cache_key_format.format(user_id)
cache_key_list.append(_cache_key)
if not cache_key_list:
return result
cache_res = unread_cache.mget(cache_key_list)
for user_id, unread_num in zip(user_ids, cache_res):
if unread_num is None:
unread_num = cls(user_id=user_id).get_conversation_all_unread()
result[user_id] = int(unread_num)
return result
class DoctorUnread(UserUnread):
def __init__(self, doctor_id):
self.user_id = None
self.doctor_id = doctor_id
def get_order_unread(self):
key, unread = self._get_unread(NOTIFY_EVENT.ORDER)
if unread is None:
orders = Order.objects.filter(service__doctor_id=self.doctor_id, status=ORDER_STATUS.PAID)
order_time_key = self.gen_order_viewtime_key(key)
time_record = unread_cache.get(order_time_key)
if time_record: # 若是查看过,从上次看的时间开始计算
orders = orders.filter(pay_time__gt=datetime.datetime.strptime(time_record, self._time_fmt))
unread = orders.count()
unread_cache.set(key, unread, self._cache_time)
return unread
def get_refund_unread(self):
key, unread = self._get_unread(NOTIFY_EVENT.REFUND)
if unread is None:
# 医生只能处理48小时之内的
time_limit = datetime.datetime.now() - datetime.timedelta(hours=settings.DOCTOR_HANDLE_TIME)
unread = RefundOrder.objects.filter(
order__service__doctor_id=self.doctor_id,
status=REFUND_STATUS.PROCESSING,
lastest_apply_refund__gte=time_limit,
).count()
unread_cache.set(key, unread, self._cache_time)
return unread
def get_reserve_unread(self):
"""
预约的未读数目
"""
key, unread = self._get_unread(NOTIFY_EVENT.RESERVE)
if unread is None:
unread = Reservation.objects.filter(
schedule__doctor_id=self.doctor_id,
status=RESERVATION_STATUS.RESERVING,
).count()
unread_cache.set(key, unread, self._cache_time)
return unread
@classmethod
def get_all_conversation_unread_num_by_ids(cls, doctor_ids):
"""批量获取所有未读数 针对拜博大集团优化"""
user_ids = list(Doctor.objects.filter(id__in=doctor_ids).values_list('user_id', flat=True))
return UserUnread.get_all_conversation_unread_num_by_ids(user_ids)
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
from datetime import datetime, date, timedelta, timezone
from typing import Union
def get_timestamp_epoch(the_time: Union[date, datetime, None]):
if the_time is None:
return None
if isinstance(the_time, datetime):
pass
elif isinstance(the_time, date):
the_time = datetime(the_time.year, the_time.month, the_time.day)
else:
raise TypeError(
"datetime.datetime or datetime.date expected. [%s]" % type(
the_time))
return int(the_time.timestamp())
def get_timestamp(the_time: Union[date, datetime]):
return get_timestamp_epoch(the_time)
def get_timestamp_or_none(the_time: Union[date, datetime, None]):
return get_timestamp(the_time) if the_time is not None else None
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment