Commit a807b8fa authored by zhanglu's avatar zhanglu

init card service

parents
# Created by https://www.gitignore.io/api/python,django,pycharm
# Edit at https://www.gitignore.io/?templates=python,django,pycharm
### Django ###
*.log
*.pot
*.pyc
__pycache__/
local_settings.py
pims/settings_local.py
db.sqlite3
db.sqlite3-journal
media
*/migrations/
.devcontainer/
# If your build process includes running collectstatic, then you probably don't need or want to include staticfiles/
# in your Git repository. Update and uncomment the following line accordingly.
# <django-project-name>/staticfiles/
### Django.Python Stack ###
# Byte-compiled / optimized / DLL files
*.py[cod]
*$py.class
# C extensions
*.so
*.env
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
### PyCharm ###
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### PyCharm Patch ###
# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721
# *.iml
# modules.xml
# .idea/misc.xml
# *.ipr
# Sonarlint plugin
.idea/**/sonarlint/
# SonarQube Plugin
.idea/**/sonarIssues.xml
# Markdown Navigator plugin
.idea/**/markdown-navigator.xml
.idea/**/markdown-navigator/
### Python ###
# Byte-compiled / optimized / DLL files
# C extensions
# Distribution / packaging
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
# Installer logs
# Unit test / coverage reports
# Translations
# Scrapy stuff:
# Sphinx documentation
# PyBuilder
# pyenv
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
# celery beat schedule file
# SageMath parsed files
# Spyder project settings
# Rope project settings
# Mr Developer
# mkdocs documentation
# mypy
# Pyre type checker
# End of https://www.gitignore.io/api/python,django,pycharm
.idea/*
.venv
venv/
.vscode/
.DS_Store/
static_files/~$*.xlsx
sqls/sqitch.conf
celerybeat-schedule.db
celerybeat.pid
node_modules/
rpcd.env
apidoc/
card/settings_local.py
FROM ccr.ccs.tencentyun.com/gm-base/py3.8-buster:v1.0.1
RUN cat /etc/apt/sources.list
RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list
RUN cat /etc/apt/sources.list
RUN apt-get clean
RUN apt-get update
RUN apt-get install -y --no-install-recommends \
# 业务相关依赖和安装工具
libjemalloc-dev \
git \
g++ \
gcc \
binutils \
clang \
cmake \
coreutils \
dpkg-dev \
dpkg \
findutils \
libc-dev \
libffi-dev \
libtirpc-dev \
make \
ncurses-dev \
pax-utils \
tcl-dev \
tk \
tk-dev \
python3-dev \
librdkafka-dev \
libxml2-dev \
libxslt-dev \
librdkafka-dev \
libaio-dev \
libffi-dev \
gfortran \
swig \
liblapack-dev \
musl-dev \
&& ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1
# ENV LD_PRELOAD $LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
WORKDIR /srv/apps/card/
ENV POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/srv/apps/card" \
VENV_PATH="/srv/apps/card/.venv"
# prepend poetry and venv to path
ENV PATH="$VENV_PATH/bin:$PATH"
# 提前安装 依赖, 使用 docker cache
COPY ./pyproject.toml ./poetry.lock /srv/apps/card/
RUN poetry install -v
COPY . .
RUN poetry run pip install -i https://pypi-1593408770345:241c0cbfa8437730ad982ece687b5a06e78dcf83@gengmei-pypi.pkg.coding.net/tob/pypi/simple gm-types -U
RUN poetry run pip install -i https://pypi-1593408770345:241c0cbfa8437730ad982ece687b5a06e78dcf83@gengmei-pypi.pkg.coding.net/tob/pypi/simple redis
RUN mkdir -p /data/log/card/app/
ENV GM_RPCD_MODE "deploy"
ENV RPCD_SETTINGS_MODULE "card.settings"
CMD poetry run gunicorn fastapi_rpcd.asgi:application -w 1 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000 --worker-tmp-dir /dev/shm
FROM ccr.ccs.tencentyun.com/gm-base/py3.8-buster:v1.0.1
RUN cat /etc/apt/sources.list
RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list
RUN cat /etc/apt/sources.list
RUN apt-get clean
RUN apt-get update
RUN apt-get install -y --no-install-recommends \
# 业务相关依赖和安装工具
libjemalloc-dev \
git \
g++ \
gcc \
binutils \
clang \
cmake \
coreutils \
dpkg-dev \
dpkg \
findutils \
libc-dev \
libffi-dev \
libtirpc-dev \
make \
ncurses-dev \
pax-utils \
tcl-dev \
tk \
tk-dev \
python3-dev \
librdkafka-dev \
libxml2-dev \
libxslt-dev \
librdkafka-dev \
libaio-dev \
libffi-dev \
gfortran \
swig \
liblapack-dev \
musl-dev \
&& ln -s /usr/lib/x86_64-linux-musl/libc.so /lib/libc.musl-x86_64.so.1
# ENV LD_PRELOAD $LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
WORKDIR /srv/apps/card/
RUN python -m venv "/srv/apps/card/venv" \
&& pip config set global.index-url https://pypi-1593408770345:241c0cbfa8437730ad982ece687b5a06e78dcf83@gengmei-pypi.pkg.coding.net/tob/pypi/simple
ENV VIRTUAL_ENV "/srv/apps/card/venv"
ENV PATH "$VIRTUAL_ENV:$PATH"
# 编译安装 xlearn
COPY ./gm_xlearn /tmp/gm_xlearn
RUN cd /tmp/gm_xlearn \
&& mkdir /tmp/gm_xlearn/build \
&& cd build \
&& cmake ../ \
&& cp /tmp/gm_xlearn/gm_lib/libxlearn_api.so /tmp/gm_xlearn/build/lib \
&& cd python-package/ \
&& bash -c "source /srv/apps/card/venv/bin/activate && pip install scipy && ./install-python.sh" \
&& /srv/apps/card/venv/bin/python -c "import xlearn;"
# 提前安装 依赖, 使用 docker cache
COPY ./pyproject.toml ./poetry.lock /srv/apps/card/
RUN poetry install
COPY . .
RUN poetry update gm-types
RUN mkdir -p /data/log/card/app/
ENV GM_RPCD_MODE "deploy"
ENV RPCD_SETTINGS_MODULE "card.settings"
CMD poetry run gunicorn fastapi_rpcd.asgi:application -w 1 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000 --worker-tmp-dir /dev/shm
@Library('gm-pipeline-library') _
pipeline {
agent any
options {
// Console output add timestamps
timestamps()
// Disallow concurrent executions of the Pipeline
disableConcurrentBuilds()
// On failure, retry the entire Pipeline the specified number of times.
retry(1)
}
parameters {
choice(name: 'CACHE', choices: ['', '--no-cache'], description: 'docker build 是否使用cache,默认使用,不使用为--no-cache')
}
environment {
// Image Tag branch.time.hash
TAG = dockerTag()
// Image Full Tag
IMAGE = "${DOCKER_REGISTRY}/gm-backend/card:$TAG"
}
stages {
stage("Begin") {
steps {
dingNotify "before"
}
}
stage('Build Image') {
steps {
sh "docker build . ${params.CACHE} -t $IMAGE -f ./Dockerfile"
}
}
stage('Deploy') {
steps {
sh "docker push $IMAGE"
}
}
}
post {
always {
dingNotify "after", "${currentBuild.currentResult}"
}
}
}
# card
## 本地启动
1. 准备 Python3.8 虚拟环境
2. 激活虚拟环境,安装 poetry 管理Python依赖
3. 执行 poetry install 安装依赖
4. 创建日志目录 mkdir -p /data/log/card/app (若本地没有gm-config,则需要先配置 /etc/gm-config )
4. 使用 poetry run gunicorn fastapi_rpcd.asgi:application -w 1 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000 启动服务
{
"name": "card APIs",
"version": "0.1.1",
"description": "",
"title": "",
"url" : ""
}
__version__ = "0.1.0"
import os
from card.utils.log_utils import info_logger
info_logger.info("starting card, pid:{}".format(os.getpid()))
from gm_protocol import GmProtocol
from card import settings
gm_protocol = GmProtocol(settings.API_HOST, settings.WEB_API_HOST)
# -*- coding: utf-8 -*-
import os
from typing import Dict
from pydantic import BaseModel
LOG_DIR = "/data/log/card/app/"
class LoggingSettings(BaseModel):
version: int = 1
disable_existing_loggers: bool = False
formatters: Dict = {
"verbose": {
"format": "%(asctime)s %(levelname)s %(module)s.%(funcName)s Line:%(lineno)d %(message)s"
},
"simple": {"format": "%(levelname)s %(message)s"},
"profile": {"format": "%(asctime)s %(message)s"},
"raw": {"format": "%(message)s"},
}
handlers: Dict = {
"null": {"level": "DEBUG", "class": "logging.NullHandler", },
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "verbose",
},
"info_handler": {
"level": "INFO",
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "info.log"),
"formatter": "verbose",
},
"error_handler": {
"level": "ERROR",
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "error_logger.log"),
"formatter": "verbose",
},
"profile_handler": {
"level": "INFO",
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "profile.log"),
"formatter": "profile",
},
"exception_handler": {
"level": "INFO",
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "exception.log"),
"formatter": "verbose",
},
"tracer_handler": {
"level": "INFO",
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "tracer.log"),
"formatter": "raw",
},
"sql_handler": {
"level": "INFO",
"class": "logging.handlers.RotatingFileHandler",
"filename": os.path.join(LOG_DIR, "sql.log"),
"formatter": "verbose",
},
}
loggers: Dict = {
"fastapi.request": {
"handlers": ["error_handler"],
"level": "ERROR",
"propagate": False,
},
"info_logger": {
"handlers": ["info_handler", "console"],
"level": "DEBUG",
"propagate": False,
},
"error_logger": {
"handlers": ["error_handler"],
"level": "ERROR",
"propagate": False,
},
"exception_logger": {
"handlers": ["exception_handler"],
"level": "ERROR",
"propagate": False,
},
"sql_logger": {
"handlers": ["sql_handler", "console"],
"level": "DEBUG",
"propagate": False,
},
}
LOGGING = LoggingSettings().dict()
# -*- coding: utf-8 -*-
def setup():
"""
Configure the settings (this happens as a side effect of accessing the
first setting), configure logging and populate the app registry.
"""
from card.utils.log_utils import configure_logging
configure_logging()
import time
import asyncio
from typing import List, Any, Dict
from fastapi_rpcd.all import Context, bind # type: ignore
from fastapi_rpcd.all import AsyncRPCInvoker, get_current_rpc_invoker
class RPC(object):
def __init__(self, name: str, method: str, future: Any, **kwargs: Any) -> None:
self.name: str = name
self.method: str = method
self.kwargs: Dict = kwargs
self.future: Any = future
class RPCCalls(object):
"""rpc批量请求"""
def __init__(self):
self.client: AsyncRPCInvoker = get_current_rpc_invoker()
self.rpcs: Dict[str, RPC] = {}
self._done: bool = False
self.result: Dict[str, Any] = {}
def add(self, name: str, method: str, **kwargs: Any) -> None:
future = self.client[method](**kwargs)
rpc = RPC(name=name, method=method, future=future, **kwargs)
self.rpcs[name] = rpc
def __getitem__(self, name: str) -> Any:
if name not in self.rpcs:
raise Exception("method not found")
if not self._done:
raise Exception("those rpcs must gather before use them")
future = self.result[name]
return future.unwrap()
async def gather(self) -> List:
futures_dict = {rpc.name: rpc.future for rpc in self.rpcs.values()}
self._done = True
names, futures = [], []
for name, future in futures_dict.items():
names.append(name)
futures.append(future)
for idx, item in enumerate(await asyncio.gather(*futures)):
self.result[names[idx]] = item
# -*- coding: utf-8 -*-
import asyncio
import sentry_sdk
from card import settings
from card.utils import redis
async def startup_redis_pool():
await redis.connect()
async def shutdown_redis_pool():
await redis.disconnect()
async def sentry_sdk_init():
""" 初始化 sentry. """
try:
sentry_sdk.init(settings.RAVEN_CONFIG["dsn"])
except:
pass
fastapi_on_startup = [startup_redis_pool, sentry_sdk_init]
fastapi_on_shutdown = [shutdown_redis_pool]
# -*- coding: utf-8 -*-
import copy
import operator
empty = object()
def new_method_proxy(func):
def inner(self, *args):
if self._wrapped is empty:
self._setup()
return func(self._wrapped, *args)
return inner
def unpickle_lazyobject(wrapped):
"""
Used to unpickle lazy objects. Just return its argument, which will be the
wrapped object.
"""
return wrapped
class LazyObject:
"""
A wrapper for another class that can be used to delay instantiation of the
wrapped class.
By subclassing, you have the opportunity to intercept and alter the
instantiation. If you don't need to do that, use SimpleLazyObject.
"""
# Avoid infinite recursion when tracing __init__ (#19456).
_wrapped = None
def __init__(self):
# Note: if a subclass overrides __init__(), it will likely need to
# override __copy__() and __deepcopy__() as well.
self._wrapped = empty
__getattr__ = new_method_proxy(getattr)
def __setattr__(self, name, value):
if name == "_wrapped":
# Assign to __dict__ to avoid infinite __setattr__ loops.
self.__dict__["_wrapped"] = value
else:
if self._wrapped is empty:
self._setup()
setattr(self._wrapped, name, value)
def __delattr__(self, name):
if name == "_wrapped":
raise TypeError("can't delete _wrapped.")
if self._wrapped is empty:
self._setup()
delattr(self._wrapped, name)
def _setup(self):
"""
Must be implemented by subclasses to initialize the wrapped object.
"""
raise NotImplementedError(
"subclasses of LazyObject must provide a _setup() method"
)
# Because we have messed with __class__ below, we confuse pickle as to what
# class we are pickling. We're going to have to initialize the wrapped
# object to successfully pickle it, so we might as well just pickle the
# wrapped object since they're supposed to act the same way.
#
# Unfortunately, if we try to simply act like the wrapped object, the ruse
# will break down when pickle gets our id(). Thus we end up with pickle
# thinking, in effect, that we are a distinct object from the wrapped
# object, but with the same __dict__. This can cause problems (see #25389).
#
# So instead, we define our own __reduce__ method and custom unpickler. We
# pickle the wrapped object as the unpickler's argument, so that pickle
# will pickle it normally, and then the unpickler simply returns its
# argument.
def __reduce__(self):
if self._wrapped is empty:
self._setup()
return (unpickle_lazyobject, (self._wrapped,))
def __copy__(self):
if self._wrapped is empty:
# If uninitialized, copy the wrapper. Use type(self), not
# self.__class__, because the latter is proxied.
return type(self)()
else:
# If initialized, return a copy of the wrapped object.
return copy.copy(self._wrapped)
def __deepcopy__(self, memo):
if self._wrapped is empty:
# We have to use type(self), not self.__class__, because the
# latter is proxied.
result = type(self)()
memo[id(self)] = result
return result
return copy.deepcopy(self._wrapped, memo)
__bytes__ = new_method_proxy(bytes)
__str__ = new_method_proxy(str)
__bool__ = new_method_proxy(bool)
# Introspection support
__dir__ = new_method_proxy(dir)
# Need to pretend to be the wrapped class, for the sake of objects that
# care about this (especially in equality tests)
__class__ = property(new_method_proxy(operator.attrgetter("__class__"))) # type: ignore
__eq__ = new_method_proxy(operator.eq)
__lt__ = new_method_proxy(operator.lt)
__gt__ = new_method_proxy(operator.gt)
__ne__ = new_method_proxy(operator.ne)
__hash__ = new_method_proxy(hash)
# List/Tuple/Dictionary methods support
__getitem__ = new_method_proxy(operator.getitem)
__setitem__ = new_method_proxy(operator.setitem)
__delitem__ = new_method_proxy(operator.delitem)
__iter__ = new_method_proxy(iter)
__len__ = new_method_proxy(len)
__contains__ = new_method_proxy(operator.contains)
class SimpleLazyObject(LazyObject):
"""
A lazy object initialized from any function.
Designed for compound objects of unknown type. For builtins or objects of
known type, use django.utils.functional.lazy.
"""
def __init__(self, func):
"""
Pass in a callable that returns the object to be wrapped.
If copies are made of the resulting SimpleLazyObject, which can happen
in various circumstances within Django, then you must ensure that the
callable can be safely run more than once and will return the same
value.
"""
self.__dict__["_setupfunc"] = func
super().__init__()
def _setup(self):
self._wrapped = self._setupfunc()
# Return a meaningful representation of the lazy object for debugging
# without evaluating the wrapped object.
def __repr__(self):
if self._wrapped is empty:
repr_attr = self._setupfunc
else:
repr_attr = self._wrapped
return "<%s: %r>" % (type(self).__name__, repr_attr)
def __copy__(self):
if self._wrapped is empty:
# If uninitialized, copy the wrapper. Use SimpleLazyObject, not
# self.__class__, because the latter is proxied.
return SimpleLazyObject(self._setupfunc)
else:
# If initialized, return a copy of the wrapped object.
return copy.copy(self._wrapped)
def __deepcopy__(self, memo):
if self._wrapped is empty:
# We have to use SimpleLazyObject, not self.__class__, because the
# latter is proxied.
result = SimpleLazyObject(self._setupfunc)
memo[id(self)] = result
return result
return copy.deepcopy(self._wrapped, memo)
# -*- coding: utf-8 -*-
from card import rpcd
def initialize():
from card.views import registe_views
registe_views()
rpcd.setup()
# fire all binds
# -*- coding: utf-8 -*-
from typing import Any, Dict
from os.path import abspath, dirname, join as path_join
from gm_types.gaia import TAG_V3_TYPE
# redis配置
REDIS_DSN = "redis://redis-service:6379/0"
REDIS_URL = REDIS_DSN
REDIS_URL2 = REDIS_DSN
REDIS_URL3 = REDIS_DSN
REDIS_URL4 = REDIS_DSN
LBS_REDIS_URL = REDIS_DSN
RAVEN_CONFIG = {
'dsn': 'http://29e77782db3c4429857cb1b6d69d946300:d55eafe28ae64f6e8c4a82fa3bc50fdd00@sentry.gengmei.cc/80',
}
API_HOST = "http://backend.paas-test.env/"
HTTPS_HOST = "http://backend.paas-test.env/"
WEB_API_HOST = "http://m.paas-test.env/"
# Ignore view path's some module
IGNORE_VIEW_PATHS = (
"tests", "libs", "utils", "validation", "schemas", "format"
)
TIME_ZONE = 'Asia/Shanghai'
# 在首页feed展示的标签类型
FEED_SHOW_TAG_TYPES = [
TAG_V3_TYPE.NORMAL,
TAG_V3_TYPE.SECOND_APPEAL,
TAG_V3_TYPE.CARD_DISPLAY,
TAG_V3_TYPE.SUPERSTAR
]
VIDEO_TAG_ID = 4706 # 视频tag
TRACTATE_TAG_DISPLAY_NUM_CONTROL = 5
EXPERT_MEMBERSHIP_IMG = 'https://heras.igengmei.com/2019/06/27/1469deaf2b'
try:
from card.log_settings import LOG_DIR, LOGGING
from card.settings_local import *
except ModuleNotFoundError:
pass
from typing import Any, Dict
#redis配置
REDIS_DSN = "redis://redis.paas-test.env:6379/0"
REDIS_URL=REDIS_DSN
REDIS_URL2=REDIS_DSN
REDIS_URL3=REDIS_DSN
REDIS_URL4=REDIS_DSN
LBS_REDIS_URL=REDIS_DSN
RAVEN_CONFIG = {
'dsn': 'http://29e77782db3c4429857cb1b6d69d9463:d55eafe28ae64f6e8c4a82fa3bc50fdd@sentry.gengmei.cc/80',
}
API_HOST = "http://backend.paas-test.env/"
HTTPS_HOST = "http://backend.paas-test.env/"
WEB_API_HOST = "http://m-test.igengmei.com/"
TIME_ZONE = 'Asia/Shanghai'
try:
from card.log_settings import LOG_DIR, LOGGING
from card.settings_local import *
except ModuleNotFoundError:
pass
from gm_protocol import GmProtocol # type: ignore
from card import settings
gm_protocol = GmProtocol(api_host=settings.API_HOST, msite_host=settings.WEB_API_HOST)
import logging
import logging.config
import traceback
import sentry_sdk # type: ignore
exception_logger = logging.getLogger("exception_logger")
info_logger = logging.getLogger("info_logger")
err_logger = logging.getLogger("error_logger")
sql_logger = logging.getLogger("sql_logger")
def logging_exception():
traceback.format_exc()
err_logger.error(traceback.format_exc())
try:
sentry_sdk.capture_exception()
except:
pass
def configure_logging():
""" 日志组件初始化. """
from card.log_settings import LOGGING
logging.config.dictConfig(LOGGING)
def log_response(url):
import functools
def fun_tool(f):
result = None
@functools.wraps(f)
async def wrapped(*args):
nonlocal result,url
try:
result =await f(*args)
info_logger.info("{} request_params:{} response:{}".format(url,args[1].dict(),result))
return result
except:
logging_exception()
return result
return wrapped
return fun_tool
\ No newline at end of file
# -*- coding: utf-8 -*-
from typing import Optional
import aioredis # type: ignore
from card import settings
from aioredis import ConnectionsPool # type: ignore
class _LazyRedisPool:
def __init__(self, redis_dsn: str, **options):
self._pool: Optional[ConnectionsPool] = None
self._connected = False
self._redis_dsn = redis_dsn
self._options = options
def __getattribute__(self, name): # type: ignore
if name.startswith("_"):
return object.__getattribute__(self, name)
return getattr(object.__getattribute__(self, '_pool'), name)
async def _connect(self) -> None:
""" 初始化 连接池. """
self._pool = await aioredis.create_redis_pool(self._redis_dsn, **self._options)
self._connected = True
async def _disconnect(self) -> None:
self._pool.close()
await self._pool.wait_closed()
self._connected = False
# fastapi 服务启动会通过 connect() 初始化为 aioredis.ConnectionsPool
redis_client = _LazyRedisPool(settings.REDIS_URL)
redis_client2 = _LazyRedisPool(settings.REDIS_URL2)
redis_client3 = _LazyRedisPool(settings.REDIS_URL3)
redis_client4 = _LazyRedisPool(settings.REDIS_URL4)
redis_ctr_push = _LazyRedisPool(settings.REDIS_URL4)
pool = _LazyRedisPool(settings.REDIS_URL3, maxsize=30)
# FIXME: aioredis 没有 decode_responses 参数, 需要注意该参数是否兼容以前的代码
# `encoding`: Codec to use for response decoding.
decode_pool = _LazyRedisPool(settings.REDIS_URL3, maxsize=30, encoding="utf8")
lbs_redis_client = _LazyRedisPool(settings.LBS_REDIS_URL)
async def connect():
""" 初始化 redis 连接池. """
await redis_client._connect()
await redis_client2._connect()
await redis_client3._connect()
await redis_client4._connect()
await redis_ctr_push._connect()
await pool._connect()
await decode_pool._connect()
await lbs_redis_client._connect()
async def disconnect():
""" 销毁连接池 """
await redis_client._disconnect()
await redis_client2._disconnect()
await redis_client3._disconnect()
await redis_client4._disconnect()
await redis_ctr_push._disconnect()
await pool._disconnect()
await decode_pool._disconnect()
await lbs_redis_client._disconnect()
import redis
from card import settings
pool = redis.ConnectionPool.from_url(settings.REDIS_URL3, port=6379, max_connections=30)
sync_decode_pool = redis.ConnectionPool.from_url(settings.REDIS_URL3, port=6379, max_connections=30,decode_responses=True)
# -*- coding: utf-8 -*-
from typing import Any, Dict, Optional
from fastapi_rpcd.all import Schema # type: ignore
from fastapi_rpcd.all import RPCDFaultException, get_current_rpc_invoker
from gm_types.error import ERROR # type: ignore
class User(Schema):
user_id: Optional[int] = None
person_id: Optional[str] = None
nick_name: Optional[str] = None
portrait: Optional[str] = None
last_name: Optional[str] = None
def gen(code):
raise RPCDFaultException(
code=code, message=ERROR.getDesc(code),
)
def dict_to_user_obj(user_dict: Dict[str, Any]) -> User:
return User(**user_dict)
async def get_current_logined_user() -> Optional[User]:
client_context = get_current_rpc_invoker()
try:
defered = await client_context["passport/info/by_session"]()
res = defered.unwrap()
except Exception:
return None
return dict_to_user_obj(res)
from pkgutil import iter_modules, walk_packages
from inspect import getmembers
from card import views
from card.settings import IGNORE_VIEW_PATHS
from card.utils.log_utils import info_logger
def registe_views():
"""导入相关模块"""
for mod_info in walk_packages(views.__path__,
prefix=views.__name__+'.',
onerror=lambda x: None):
if mod_info.ispkg or mod_info.name.split('.')[-1] in IGNORE_VIEW_PATHS:
continue
info_logger.info(
"Loading views in module: {} ...".format(mod_info.name))
__import__(mod_info.name)
import time
import asyncio
from fastapi_rpcd.all import Context, bind # type: ignore
from fastapi_rpcd.all import get_current_rpc_invoker
from card.rpcd.batch_rpc import RPCCalls
from .schemas import DemoRequest, DemoResponse
from .schemas import DiaryListRequest, DiaryList
from .format import DiaryFormat
@bind("card/demo/mul")
async def card_demo_mul(ctx: Context, request: DemoRequest) -> DemoResponse:
rpc_calls = RPCCalls()
rpc_calls.add("query", "poseidon/promotion/commodity/promotion/query", commodity_ids=[1], user_id=1)
rpc_calls.add("price", "poseidon/promotion/commodity/price", commodity_ids=[1], user_id=1)
await rpc_calls.gather()
query = rpc_calls["query"]
price = rpc_calls["price"]
resp = DemoResponse(
service_name="card", process_time=int(time.time()),
output=request.echo
)
return resp
@bind("card/diary/list_by_ids")
async def diary_list_by_ids(ctx: Context, request: DiaryListRequest) -> DiaryList:
if not request.diary_ids:
return DiaryList(diaries=[])
user_id = request.user_id
res = (await get_current_rpc_invoker()["diary/simple_diary_list"](
diary_ids=request.diary_ids,
# user_id=user_id,
)).unwrap()
return DiaryFormat(request.version, request.device_id).format_feed_card(res)
from typing import Dict, List
from gm_types.gaia import SEARCH_TAB_TYPE
from gm_upload.utils.image_utils import Picture
from card.views.schemas import User, Tag
from card.views.utils.tag import transform_tags
from card.libs.protocol import gm_protocol
from card.views.utils.format import BaseFormat
from .schemas import DiaryList, DiaryInfo
class DiaryFormat(BaseFormat):
def __init__(self, version, device_id):
self.version = version
self.device_id = device_id
def format_feed_card(self, diary_list: List[Dict]) -> DiaryList:
res: List[DiaryInfo] = []
for item in diary_list:
user = User(**item["user"])
self.modify_user(user)
item["user"] = user
for image in item.get("images", []):
image_url = Picture.get_aspectscale_path(image["image"])
image["image_url"] = self.revert_image(image_url)
diary: DiaryInfo = DiaryInfo(**item)
diary.gm_url = gm_protocol.get_diary_detail(id=diary.id)
tags_v3 = item["tags_v3"]
# if not hit_gray(self.version, Rules.TAG_MIGRATE_724, self.device_id):
# tags_v3 = []
tags = transform_tags(
tags=item["tags"],
tab_type=SEARCH_TAB_TYPE.DIARY,
filter_free_tag=True,
tags_v3=tags_v3,
)
diary.tags = [Tag(**tag) for tag in tags]
if item.get("video_url", ""):
diary.video = self.get_video(
video_cover_url=self.revert_image(item.get("video_cover", "")),
video_url=item.get("video_url", ""),
short_video_url=item.get("short_video_url", ""),
)
res.append(diary)
return DiaryList(diaries=res)
from typing import List, Optional
from fastapi_rpcd.all import Schema
from pydantic import Field
from card.views.schemas import User, Tag, Video
class DemoRequest(Schema):
echo: str = Field(..., description="输入参数")
class DemoResponse(Schema):
service_name: str = Field(..., description="服务名字")
process_time: int = Field(..., description="服务处理时间")
output: str = Field(..., description="输出参数")
class DiaryListRequest(Schema):
user_id: int = Field(None, description="当前用户ID")
diary_ids: List[int] = Field([], description="日记本ID列表")
version: str = Field(None, description="当前app version")
device_id: str = Field(None, description="当前用户设备ID")
class DiaryImage(Schema):
image_url: str = Field(..., description="图片地址")
desc: str = Field(..., description="描述:Before/After")
# "image_wide": "https://pic.igengmei.com/2018/11/03/1623/99b3374ffe11-wide",
# "image_slimwidth": "https://pic.igengmei.com/2018/11/03/1623/99b3374ffe11-slimwidth",
# "small_wide": "https://pic.igengmei.com/2018/11/03/1623/99b3374ffe11-smallwide",
# "image_thumb": "https://pic.igengmei.com/2018/11/03/1623/99b3374ffe11-thumb",
# "image_half": "https://pic.igengmei.com/2018/11/03/1623/99b3374ffe11-half",
class DiaryInfo(Schema):
"""日记本信息"""
id: int = Field(..., description="日记本ID")
title: str = Field(..., description="日记本标题")
content: str = Field(..., description="内容")
service_id: str = Field(None, description="日记关联美购ID")
is_voted: bool = Field(False, description="是否点赞")
vote_num: int = Field(0, description="点赞数")
reply_num: int = Field(0, description="评论数")
gm_url: str = Field(None, description="跳转协议")
tags: Optional[List[Tag]] = Field([], description="标签")
user: User = Field(..., description="用户信息")
images: Optional[List[DiaryImage]] = Field([], description="术前术后图")
video: Optional[Video] = Field({}, description="日记视频")
class DiaryList(Schema):
"""日记本卡片列表信息"""
diaries: Optional[List[DiaryInfo]] = Field([], description="日记本列表")
from typing import List
from gm_types.gaia import USER_TYPE, LIVE_STATUS
from gm_types.merchant.live import LIVE_PLAYER_ENTRANCE
from card import settings
from card.libs.protocol import gm_protocol
from card.views.utils.format import BaseFormat
from .schemas import LiveInfo, LiveList
class LiveFormat(BaseFormat):
def __init__(self, version, device_id):
self.version = version
self.device_id = device_id
@classmethod
def _get_live_gm_url(cls, status, topic_id, channel_id):
"""获取直播相关的gm_url。"""
gm_url = ''
if status == LIVE_STATUS.NOLIVE:
gm_url = gm_protocol.get_live_playback(topic_id=topic_id)
elif status == LIVE_STATUS.LIVE:
gm_url = gm_protocol.get_live_list(channel_id=channel_id, entrance=LIVE_PLAYER_ENTRANCE.FEED)
elif status == LIVE_STATUS.NOTICE:
url = settings.API_HOST + "/hybrid/broadcast/prevue/"
gm_url = gm_protocol.get_webview(url)
return gm_url
def format_live(self, info):
"""直播、预告信息"""
live = {
"id": info["id"],
"notice_id": info.get("conf_id"),
"channel_id": info.get("channel_id"),
"stream_id": info.get("stream_id"),
"status": info.get("status"),
"zhibo_time": info.get("zhibo_time", 0),
"topic_id": info.get("topic_id"),
"images": {
"image_url": self.revert_image(info.get("cover_url", '')),
"height": info.get("height", 0),
"width": info.get("width", 0),
},
"content": info.get("desc", '') or info.get("title", ''),
}
user = {
"user_id": info.get("user_id"),
"user_name": info.get("uname", ""),
"portrait": info.get("user_portrait", ""),
"user_level": info.get("user_level", {}),
"user_type": USER_TYPE.NORMAL,
"gm_url": gm_protocol.get_user_homepage(info.get("user_id")),
}
if info.get("doctor"):
_did = info.get('doctor', {}).get('id')
user["doctor_id"] = _did
user["user_type"] = USER_TYPE.EXPERT
user["user_level"]["membership_icon"] = settings.EXPERT_MEMBERSHIP_IMG
user["gm_url"] = gm_protocol.get_expert_home(_did)
live["user"] = user
if live["status"] == LIVE_STATUS.NOLIVE and not live["topic_id"]:
return None
live["gm_url"] = self._get_live_gm_url(live["status"], live["topic_id"], live["channel_id"])
return LiveInfo(**live)
def format_feed_card(self, lives) -> LiveList:
res: List[LiveInfo] = []
for item in lives:
live = self.format_live(info=item)
if live:
res.append(live)
return LiveList(lives=res)
from typing import Any, List
from fastapi_rpcd.all import Context, bind # type: ignore
from fastapi_rpcd.all import get_current_rpc_invoker
from .schemas import LiveListRequest, LiveList, LiveInfo, LiveNoticeListRequest, LiveUser
from .format import LiveFormat
async def format_lives(version, device_id, res: List[Any]) -> LiveList:
lives = LiveFormat(version, device_id).format_feed_card(lives=res)
doctor_ids = [live.user.doctor_id for live in lives.lives if live.user.doctor_id]
practice_time = (
await get_current_rpc_invoker()['api/doctors/practice_time'](doctor_ids=doctor_ids)
).unwrap_or({})
for live in lives.lives:
if not live.user.doctor_id:
continue
_practice_year = practice_time.get(live.user.doctor_id, 0)
if not _practice_year:
continue
live.user.doctor_practice_desc = '从业{}年'.format(_practice_year)
return lives
@bind("card/live/list_by_ids")
async def live_list_by_ids(ctx: Context, request: LiveListRequest) -> LiveList:
if not request.live_ids:
return LiveList(lives=[])
user_id = request.user_id
res = (await get_current_rpc_invoker()["mimas/live/get_clannels_url/audience"](
ids=request.live_ids,
# user_id=user_id,
)).unwrap()
return await format_lives(request.version, request.device_id, res)
@bind("card/live_notice/list_by_ids")
async def live_notice_list_by_ids(ctx: Context, request: LiveNoticeListRequest) -> LiveList:
if not request.live_notice_ids:
return LiveList(lives=[])
user_id = request.user_id
res = (await get_current_rpc_invoker()["mimas/live/get_live_notice_v2"](
live_ids=request.live_notice_ids,
# user_id=user_id,
)).unwrap()
return await format_lives(request.version, request.device_id, res)
from typing import List, Optional
from fastapi_rpcd.all import Schema
from pydantic import Field
from card.views.schemas import User
class LiveListRequest(Schema):
user_id: Optional[int] = Field(None, description="用户ID")
live_ids: List[int] = Field([], description="ID列表")
version: Optional[str] = Field('', description="app版本")
device_id: Optional[str] = Field('', description="设备ID")
class LiveNoticeListRequest(Schema):
user_id: Optional[int] = Field(None, description="用户ID")
live_notice_ids: List[int] = Field([], description="ID列表")
version: Optional[str] = Field('', description="app版本")
device_id: Optional[str] = Field('', description="设备ID")
class Image(Schema):
height: int = Field(0, description="图片地址")
width: str = Field(0, description="图片地址")
image_url: str = Field(..., description="图片地址")
class LiveUser(User):
doctor_practice_desc: Optional[str] = Field('', description="从业n年")
doctor_id: Optional[str] = Field(None, description="从业n年")
class LiveInfo(Schema):
"""直播相关信息"""
id: int = Field(..., description="ID")
notice_id: Optional[int] = Field(None, description="预告ID")
content: str = Field(..., description="内容")
gm_url: str = Field(None, description="跳转协议")
user: Optional[LiveUser] = Field({}, description="用户信息")
id: int = Field(..., description="ID")
zhibo_time: Optional[int] = Field(0, description="直播时间")
images: Optional[Image] = Field({}, description="图片")
topic_id: Optional[int] = Field(None, description="关联帖子ID")
status: Optional[int] = Field(None, description="关联帖子ID")
stream_id: Optional[int] = Field(None, description="stream id")
class LiveList(Schema):
"""直播相关信息列表"""
lives: Optional[List[LiveInfo]] = Field([], description="列表")
default_pic = [
{
"image_url": "https://heras.igengmei.com/2020/02/17/b50cc3abdc",
"outline_hex": "D1E3D8",
"inner_hex": "778F80",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/a44af791df",
"outline_hex": "D1E3D8",
"inner_hex": "778F80",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/2e55e07f09",
"outline_hex": "D1E3D8",
"inner_hex": "778F80",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/a5f0f498de",
"outline_hex": "D1E3D8",
"inner_hex": "778F80",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/0f393c973a",
"outline_hex": "D1E3D8",
"inner_hex": "778F80",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/dd02257598",
"outline_hex": "404655",
"inner_hex": "554269",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/6ff880547f",
"outline_hex": "404655",
"inner_hex": "554269",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/aca696419b",
"outline_hex": "404655",
"inner_hex": "554269",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/2ab2162114",
"outline_hex": "404655",
"inner_hex": "554269",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/cadc4b872a",
"outline_hex": "404655",
"inner_hex": "554269",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/572b9d83c4",
"outline_hex": "482424",
"inner_hex": "603836",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/160d66e9b7",
"outline_hex": "482424",
"inner_hex": "603836",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/f061ad58f9",
"outline_hex": "482424",
"inner_hex": "603836",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/c715bca973",
"outline_hex": "482424",
"inner_hex": "603836",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/e2dce83374",
"outline_hex": "482424",
"inner_hex": "603836",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/5884cc88c5",
"outline_hex": "FDE8CD",
"inner_hex": "6D604A",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/6ff57d40d9",
"outline_hex": "FDE8CD",
"inner_hex": "6D604A",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/7ef53a76cc",
"outline_hex": "FDE8CD",
"inner_hex": "6D604A",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/941d1aa23d",
"outline_hex": "FDE8CD",
"inner_hex": "6D604A",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/7041ad8029",
"outline_hex": "FDE8CD",
"inner_hex": "6D604A",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/f858db8b98",
"outline_hex": "A7B5CE",
"inner_hex": "3E5463",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/607f22328c",
"outline_hex": "A7B5CE",
"inner_hex": "3E5463",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/74481b511b",
"outline_hex": "A7B5CE",
"inner_hex": "3E5463",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/ef82043ebd",
"outline_hex": "A7B5CE",
"inner_hex": "3E5463",
"width": 525,
"height": 417,
},
{
"image_url": "https://heras.igengmei.com/2020/02/17/fc77bb4da5",
"outline_hex": "A7B5CE",
"inner_hex": "3E5463",
"width": 525,
"height": 417,
},
]
from card.views.schemas import video
import random
from typing import List
from gm_types.gaia import SEARCH_TAB_TYPE
from gm_upload.utils.image_utils import Picture
from card.views.schemas import User, Tag
from card.views.utils.tag import transform_tags
from card.libs.protocol import gm_protocol
from card.views.utils.format import BaseFormat
from .schemas import AnswerList, AnswerInfo
from .default_pic import default_pic
class AnswerFormat(BaseFormat):
def __init__(self, version, device_id):
self.version = version
self.device_id = device_id
@classmethod
def deal_with_pic_hex(cls, image):
if image.get("image_url"):
return image
pic_info = random.choice(default_pic)
return pic_info
def format_feed_card(self, answer_list) -> AnswerList:
res: List[AnswerInfo] = []
for item in answer_list:
user = User(**item["user"])
self.modify_user(user)
item["user"] = user
answer: AnswerInfo = AnswerInfo(**item)
answer.gm_url = gm_protocol.get_question_detail(item["question_id"], top_answer_id=item["answer_id"])
tags_v3 = item["tags_v3"]
# if not hit_gray(self.version, Rules.TAG_MIGRATE_724, self.device_id):
# tags_v3 = []
tags = transform_tags(
tags=item["tags"],
tab_type=SEARCH_TAB_TYPE.ALL,
filter_free_tag=True,
tags_v3=tags_v3,
)
answer.tags = [Tag(**tag) for tag in tags]
_intact_answer_images = item.pop("intact_answer_images", [])
_question_intact_question_images = item.pop("question_intact_question_images", [])
header_images = item.pop('header_images', [])
_image = {}
if item.get("cover_url"):
_image = {
"image": item.get("cover_url"),
}
elif header_images:
_image = header_images[0]
elif _intact_answer_images:
_image = _intact_answer_images[0]
elif _question_intact_question_images:
_image = _question_intact_question_images[0]
if _image:
image_url = Picture.get_aspectscale_path(_image.pop("image", ""))
_image["image_url"] = self.revert_image(image_url)
image = self.deal_with_pic_hex(_image)
answer.images = image # 此处是一个image对象
video_info = {}
if item.get("video_cover_list"):
video_info = item.get("video_cover_list")[0]
elif item.get("question_videos"):
video_info = item.get("question_videos")[0]
if video_info.get("video_url", ''):
video = self.get_video(
video_cover=self.revert_image(video_info.get("video_pic", '')),
video_url=video_info.get("video_url", ''),
short_video_url=video_info.get("short_video_url", ''),
width=video_info.get("width", 0),
height=video_info.get("height", 0),
)
answer.video = video
res.append(answer)
return AnswerList(answers=res)
from fastapi_rpcd.all import Context, bind # type: ignore
from fastapi_rpcd.all import get_current_rpc_invoker
from .schemas import AnswerListRequest, AnswerList
from .format import AnswerFormat
@bind("card/answer/list_by_ids")
async def diary_list_by_ids(ctx: Context, request: AnswerListRequest) -> AnswerList:
if not request.answer_ids:
return AnswerList(answers=[])
user_id = request.user_id
res = (await get_current_rpc_invoker()["qa/question_answer/list_by_ids_v3"](
answer_ids=request.answer_ids,
# user_id=user_id,
)).unwrap()
return AnswerFormat(request.version, request.device_id).format_feed_card(res)
from typing import List, Optional
from fastapi_rpcd.all import Schema
from pydantic import Field
from card.views.schemas import User, Tag, Video
class AnswerListRequest(Schema):
user_id: int = Field(None, description="当前用户ID")
answer_ids: List[int] = Field([], description="问答ID列表")
version: Optional[str] = Field('', description="app版本")
device_id: Optional[str] = Field('', description="设备ID")
class QaImage(Schema):
image_url: str = Field(..., description="图片地址")
image_webp: str = Field(..., description="webp图片地址")
height: int = Field(0, description="图片地址")
width: str = Field(0, description="图片地址")
image_url: str = Field(..., description="图片地址")
class AnswerInfo(Schema):
"""问答信息"""
id: int = Field(..., description="问答ID")
title: str = Field(..., description="问答标题")
content: str = Field(..., description="内容")
service_id: str = Field(None, description="日记关联美购ID")
is_voted: bool = Field(False, description="是否点赞")
vote_num: int = Field(0, description="点赞数")
answer_num: int = Field(0, description="对应问题回答数")
comment_num: int = Field(0, description="评论数")
gm_url: str = Field(None, description="跳转协议")
tags: Optional[List[Tag]] = Field([], description="标签")
user: User = Field(..., description="用户信息")
images: Optional[QaImage] = Field({}, description="图片")
video: Optional[Video] = Field({}, description="日记视频")
class AnswerList(Schema):
"""问答卡片列表信息"""
answers: Optional[List[AnswerInfo]] = Field([], description="问答列表")
from .tag import *
from .user import *
from .video import *
\ No newline at end of file
from fastapi_rpcd.all import Schema
from pydantic import Field
class Tag(Schema):
id: int = Field(None, description="标签ID")
name: str = Field(None, description="标签名")
type: str = Field(None, description="标签类型")
gm_url: str = Field(None, description="跳转协议")
from enum import IntEnum
from typing import Any, List, Optional
from fastapi_rpcd.all import Schema
from pydantic import Field
class UserTypeEnum(IntEnum):
NORMAL = 0 # 普通用户
EXPERT = 1 # 专家
OFFICER = 2 # 机构
class UserTypeEnum(IntEnum):
NORMAL = 0 # 通用户
EXPERT = 1 # 专家
OFFICER = 2 # 机构
class UserLevel(Schema):
membership_icon: str = Field(None, description="达人icon")
level_icon: str = Field(None, description="等级icon")
constellation_icon: str = Field(None, description="星座icon")
class User(Schema):
"""用户信息"""
id: int = Field(None, description="用户ID")
user_id: int = Field(None, description="用户ID")
user_name: str = Field(None, description="用户名")
portrait: str = Field(None, description="用户头像")
gm_url: str = Field(None, description="用户协议跳转")
user_type: UserTypeEnum = Field(..., description="用户类型, 0:普通用户, 1:专家, 2:机构")
doctor_id: str = Field(None, description="医生ID")
hospital_id: Optional[str] = Field(None, description="医院ID")
membership_level: Optional[str] = Field(None, description="用户名")
user_level: UserLevel= Field(None, description="用户Level相关信息")
from enum import IntEnum
from typing import Any, List, Optional
from fastapi_rpcd.all import Schema
from pydantic import Field
class Video(Schema):
"""视频信息"""
video_cover_url: str = Field(..., description="视频封面")
video_url: str = Field(..., description="视频地址")
short_video_url: str = Field(..., description="视频短视频url")
width: Optional[int] = Field(0, description="封面图片宽")
heigth: Optional[int] = Field(0, description="封面图片高")
from typing import List
from card import settings
from card.libs.protocol import gm_protocol
from card.views.utils.format import BaseFormat
from .schemas import SpecialPoolInfo, SpecialPoolList, VisualPageInfo, VisualPageList
class SpecialPoolFormat(BaseFormat):
def __init__(self, version, device_id):
self.version = version
self.device_id = device_id
def format_special_pool(self, info):
special_pool = {
"id": info["id"],
"visual_page_id": info["visual_page_id"],
"content": '',
"images": {
"width": 350,
"height": 350,
"image_url": info.get("home_feed_cover_image", ''),
},
"gm_url": gm_protocol.get_webview(settings.API_HOST +
'/phantom/visual_special/{}?usepage=wk'.format(info["visual_page_id"]))
}
return SpecialPoolInfo(**special_pool)
def format_feed_card(self, special_pools) -> SpecialPoolList:
res: SpecialPoolList = []
for item in special_pools:
res.append(self.format_special_pool(info=item))
return SpecialPoolList(special_pools=res)
class VisualPageFormat(BaseFormat):
def __init__(self, version, device_id):
self.version = version
self.device_id = device_id
def format_visual_page(self, info):
visual_page = {
"id": info["id"],
"content": '',
"images": {
"width": 350,
"height": 350,
"image_url": info.get("home_feed_cover_image", ''),
},
"gm_url": gm_protocol.get_webview(settings.API_HOST +
'/phantom/visual_special/{}?usepage=wk'.format(info["id"]))
}
return VisualPageInfo(**visual_page)
def format_feed_card(self, visual_pages) -> VisualPageList:
res: List[VisualPageInfo] = []
for item in visual_pages:
res.append(self.format_visual_page(info=item))
return VisualPageList(visual_pages=res)
from typing import List, Optional
from fastapi_rpcd.all import Schema
from pydantic import Field
class SpecialPoolListRequest(Schema):
special_pool_ids: List[int] = Field([], description="ID列表")
version: Optional[str] = Field('', description="app版本")
device_id: Optional[str] = Field('', description="设备ID")
class VisualPageListRequest(Schema):
visualpage_ids: List[int] = Field([], description="ID列表")
version: Optional[str] = Field('', description="app版本")
device_id: Optional[str] = Field('', description="设备ID")
class Image(Schema):
height: int = Field(0, description="图片地址")
width: str = Field(0, description="图片地址")
image_url: str = Field(..., description="图片地址")
class SpecialPoolInfo(Schema):
"""自定义专题信息"""
id: int = Field(..., description="ID")
visual_page_id: int = Field(..., description="自定义专题ID")
content: str = Field(..., description="内容")
gm_url: str = Field(None, description="跳转协议")
images: Optional[Image] = Field({}, description="图片")
class SpecialPoolList(Schema):
"""自定义专题信息列表"""
special_pools: Optional[List[SpecialPoolInfo]] = Field([], description="列表")
class VisualPageInfo(Schema):
"""自定义专题信息"""
id: int = Field(..., description="ID")
content: str = Field(..., description="内容")
gm_url: str = Field(None, description="跳转协议")
images: Optional[Image] = Field({}, description="图片")
class VisualPageList(Schema):
"""自定义专题信息列表"""
visual_pages: Optional[List[VisualPageInfo]] = Field([], description="列表")
from fastapi_rpcd.all import Context, bind # type: ignore
from fastapi_rpcd.all import get_current_rpc_invoker
from .schemas import SpecialPoolListRequest, SpecialPoolList, VisualPageList, VisualPageListRequest
from .format import SpecialPoolFormat, VisualPageFormat
@bind("card/special_pool/list_by_ids")
async def special_pools(ctx: Context, request: SpecialPoolListRequest) -> SpecialPoolList:
if not request.special_pool_ids:
return SpecialPoolList(special_pools=[])
res = (await get_current_rpc_invoker()["api/special_pool/info"](
ids=request.special_pool_ids,
)).unwrap()
return SpecialPoolFormat(request.version, request.device_id).format_feed_card(special_pools=res)
@bind("card/visualpage/list_by_ids")
async def visualpages(ctx: Context, request: VisualPageListRequest) -> VisualPageList:
if not request.visualpage_ids:
return VisualPageList(visualpages=[])
res = (await get_current_rpc_invoker()["poseidon/visual/get_visualpage_by_ids"](
visualpage_ids=request.visualpage_ids,
)).unwrap()
visualpage_list = VisualPageFormat(request.version, request.device_id).format_feed_card(visual_pages=res)
ids = [item.id for item in visualpage_list.visual_pages]
res = await _get_hera_tab_operate_img(sorted(ids))
for visualpage in visualpage_list.visual_pages:
image_url = res.get(str(visualpage.id), '')
visualpage.images.image_url = image_url
return visualpage_list
async def _get_hera_tab_operate_img(virsual_page_ids):
res = (await get_current_rpc_invoker()["api/hera_tab_operate/image"](
ids=virsual_page_ids
)).unwrap_or({})
return res
\ No newline at end of file
from card.views.utils.html import gm_decode_html
import copy
from typing import List
from gm_types.gaia import TAG_VERSION
from gm_types.mimas import TRACTATE_COVER_TYPE
from gm_upload.utils.image_utils import Picture
from card import settings
from card.views.schemas import User, Tag
from card.views.utils.tag import feed_tag_v3_show_filter, transform_tags
from card.libs.protocol import gm_protocol
from card.views.utils.format import BaseFormat
from .schemas import Image, TractateList, TractateInfo
class TractateFormat(BaseFormat):
def __init__(self, version, device_id):
self.version = version
self.device_id = device_id
@classmethod
def convert_tags(cls, tags, tag_version):
ts: List[Tag] = []
for tag in tags:
ts.append(Tag(**{
"id": tag.get("id"),
"name": tag.get("name"),
"type": tag.get("tag_type"),
"gm_url": gm_protocol.get_polymer_detail(tag_id=tag.get("id"), new_tag=tag_version),
}))
return ts
@classmethod
def get_media_info(cls, video_list, images_list, content_videos, content_images):
"""
转换用户帖卡片媒体信息
:param video_list:
:param images_list:
:param kwargs:
:return:
"""
def package_media_info(media_info):
_image = cls.revert_image(media_info.get("image_url", ''))
return {
"image_url": _image,
"video_url": media_info.get("video_url", ''),
"width": media_info.get("width", 0),
"height": media_info.get("height", 0),
"video_cover_url": _image,
"short_video_url": media_info.get("short_video_url", ""),
}
video_list = video_list + content_videos
images_list = images_list + content_images
video = {}
if video_list:
video_dic = video_list[0]
video_dic["image_url"] = video_dic.pop("video_cover_url", "")
video = package_media_info(video_dic)
images = []
if images_list:
# 取前9张图片
for image_info in images_list[: 9]:
images.append(package_media_info(image_info))
return video, images
@classmethod
def get_video_and_image(cls, info):
video, images = cls.get_media_info(
video_list=info.pop("videos", []),
images_list=info.pop("images", []),
content_videos=info.pop("content_videos", []),
content_images=info.pop("content_images", []),
)
image = copy.deepcopy(video if video else images and images[0] or {})
# 封面图逻辑:
# 有视频但是封面是图片,将video下的video_cover_url, short_video_url 替换为封面图
# 有视频且封面是短视频,逻辑不变
# 没有视频,封面是图片,替换图片第一张的image_url为封面图
# 客户端卡片展示取值逻辑:ios有视频取video 没有视频取images; Android都取images
cover_url = info.pop('cover_url', '')
cover_type = info.pop('cover_type', TRACTATE_COVER_TYPE.NO_COVER)
if all([video, cover_type == TRACTATE_COVER_TYPE.IMAGE]):
cover_url = Picture.get_aspectscale_path(cover_url)
video['video_cover_url'] = cover_url
video['short_video_url'] = cover_url
image['video_cover_url'] = cover_url
image['short_video_url'] = cover_url
elif all([not video, cover_type == TRACTATE_COVER_TYPE.IMAGE]):
image['image_url'] = Picture.get_aspectscale_path(cover_url)
return video, image
def format_tractate(self, info, tag_v3_display):
"""处理用户帖的基础数据"""
tractate_data = {
"id": info["tractate_id"],
"title": info["title"],
"content": gm_decode_html(info["content"]),
"is_voted": info.get("is_voted", False),
"vote_num": info.get("is_voted", 0),
"reply_amount": info.get("reply_amount", 0),
"user": info["user"],
}
tractate = TractateInfo(**tractate_data)
video, images = self.get_video_and_image(info)
if video:
tractate.video = self.get_video(
video_cover=video.get("video_cover_url", ''),
video_url=video.get("video_url", ''),
short_video_url=video.get("short_video_url", ''),
width=video.get("width", '0'),
height=video.get("height", '0'),
)
if images:
tractate.images = Image(**images)
tags_v2 = self.convert_tags(info.pop("new_tags", []), tag_version=TAG_VERSION.V2)
tags_v3 = feed_tag_v3_show_filter(info.pop("tags_v3", []))
tags_v3 = self.convert_tags(tags_v3, tag_version=TAG_VERSION.V3)
if tag_v3_display and tags_v3:
_tags = tags_v3
else:
_tags = tags_v2
tractate.tags = _tags[:settings.TRACTATE_TAG_DISPLAY_NUM_CONTROL]
return tractate
def format_feed_card(self, tractate_list) -> TractateList:
res: TractateList = []
for item in tractate_list:
res.append(self.format_tractate(info=item, tag_v3_display=True))
return TractateList(tractates=res)
from typing import List, Optional
from fastapi_rpcd.all import Schema
from pydantic import Field
from card.views.schemas import User, Tag, Video
class TractateListRequest(Schema):
user_id: int = Field(None, description="当前用户ID")
tractate_ids: List[int] = Field([], description="ID列表")
version: Optional[str] = Field('', description="app版本")
device_id: Optional[str] = Field('', description="设备ID")
class Image(Schema):
image_webp: str = Field('', description="webp图片地址")
height: int = Field(0, description="图片地址")
width: str = Field(0, description="图片地址")
image_url: str = Field(..., description="图片地址")
class TractateInfo(Schema):
"""问答信息"""
id: int = Field(..., description="ID")
title: str = Field(..., description="标题")
content: str = Field(..., description="内容")
is_voted: bool = Field(False, description="是否点赞")
vote_num: int = Field(0, description="点赞数")
reply_amount: int = Field(0, description="评论数")
gm_url: str = Field(None, description="跳转协议")
tags: Optional[List[Tag]] = Field([], description="标签")
user: User = Field(..., description="用户信息")
images: Optional[Image] = Field({}, description="图片")
video: Optional[Video] = Field({}, description="视频")
class TractateList(Schema):
"""问答卡片列表信息"""
tractates: Optional[List[TractateInfo]] = Field([], description="列表")
from fastapi_rpcd.all import Context, bind # type: ignore
from fastapi_rpcd.all import get_current_rpc_invoker
from .schemas import TractateListRequest, TractateList
from .format import TractateFormat
@bind("card/tractate/list_by_ids")
async def diary_list_by_ids(ctx: Context, request: TractateListRequest) -> TractateList:
if not request.tractate_ids:
return TractateList(tractates=[])
user_id = request.user_id
res = (await get_current_rpc_invoker()["mimas/tractate/list_by_ids"](
tractate_ids=request.tractate_ids,
need_special_info=True,
# user_id=user_id,
)).unwrap()
tractates = res.get("tractate_list", [])
return TractateFormat(request.version, request.device_id).format_feed_card(tractate_list=tractates)
from typing import Optional
from gm_types.gaia import USER_TYPE
from card.views.schemas import User, Video
from card.libs.protocol import gm_protocol
from card import settings
class BaseFormat(object):
@classmethod
def revert_image(cls, image):
"""传入图片地址,返回原图(不带后缀)"""
if not image:
return ""
return image.rsplit('-', 1)[0]
@classmethod
def modify_user(cls, user: User) -> User:
if user.user_type == USER_TYPE.NORMAL:
user.gm_url = gm_protocol.get_user_homepage(user.user_id)
elif user.user_type == USER_TYPE.EXPERT:
user.user_level.membership_icon = settings.EXPERT_MEMBERSHIP_IMG
user.gm_url = gm_protocol.get_expert_home(user.doctor_id)
@classmethod
def get_video(cls,
video_cover: str, video_url: str, short_video_url: str,
width: Optional[int]=0, height: Optional[int]=0) -> Video:
return Video(
**{
"video_cover_url": cls.revert_image(video_cover),
"video_url": video_url,
"short_video_url": short_video_url,
"width": width,
"height": height,
}
)
import re
from html.parser import HTMLParser
from lxml import html
def _get_rich_text(rich_text):
"""
富文本标签转成标签
:param rich_text:
:return:
"""
h = HTMLParser()
rich_text = h.unescape(rich_text.replace("&amp;", "&").replace("\n", "<br>")) # 富文本标签转成标签对象
return rich_text
def gm_decode_html(rich_text, reserve_spaces=False):
"""
匹配富文本信息
:param rich_text: 包含html标签(实体标签)的文本信息
:param reserve_spaces: 是否保留空格,多个空格替换为一个空格
:return: 仅展示 纯文本
"""
if not rich_text:
return ""
rich_text = _get_rich_text(rich_text)
element_obj = html.fromstring(rich_text) # 转成 element 对象处理标签
_text = html.tostring(element_obj, encoding="unicode", method="text") # 仅获取文本
safe_text = _text.replace("\n", "").replace("\r", "").replace("\t", "")
if reserve_spaces:
safe_text = re.sub(" +", " ", safe_text)
else:
safe_text = safe_text.replace(" ", "")
return safe_text
\ No newline at end of file
from collections import defaultdict
from gm_types.gaia import TAG_TYPE, TAG_VERSION
from card import settings
from card.libs.protocol import gm_protocol
def feed_tag_v3_show_filter(tags):
"""feed按标签类型过滤需要展示的v3标签"""
type2tag = defaultdict(list)
for tag in tags:
tag_type = tag["tag_type"]
if tag_type in settings.FEED_SHOW_TAG_TYPES:
type2tag[tag_type].append(tag)
tags_filtered_sorted = []
for tag_type in settings.FEED_SHOW_TAG_TYPES:
tags_filtered_sorted.extend(type2tag[tag_type])
return tags_filtered_sorted
def transform_tags(tags, tab_type, filter_free_tag=False, tags_v3=[]):
if tags_v3:
tags_v3 = feed_tag_v3_show_filter(tags_v3)
if tags_v3:
tag = tags_v3[0]
tag["gm_url"] = gm_protocol.get_polymer_detail(
tag_id=tag["id"], new_tag=TAG_VERSION.V3
)
return [tag]
tags = filter(lambda item: item["id"] != settings.VIDEO_TAG_ID, tags)
exclude_tag_types = [TAG_TYPE.CITY, TAG_TYPE.PROVINCE, TAG_TYPE.COUNTRY]
tags = list(filter(lambda item: item["type"] not in exclude_tag_types, tags))
if filter_free_tag:
tags = list(filter(lambda item: not item["type"] == TAG_TYPE.FREE, tags))
tags.sort(key=lambda x: (x["type"] != TAG_TYPE.YUNYING, -x["id"]))
if not tags:
return []
tag = tags[0]
if tag["type"] == TAG_TYPE.YUNYING: # or hit_gray(self.version, Rules.NEW_POLYMER)
tag["gm_url"] = gm_protocol.get_polymer_detail(tag_id=tag["id"])
else:
tag["gm_url"] = gm_protocol.get_search_result(query=tag["name"], tab_type=tab_type)
return [tag]
\ No newline at end of file
pipeline {
agent {
docker {
reuseNode true
registryUrl 'https://gengmei-docker.pkg.coding.net'
registryCredentialsId "${env.DOCKER_REGISTRY_CREDENTIALS_ID}"
image 'tob/image/ci-image:v1.0.1'
args '-v /root/.cache/pypoetry/virtualenvs:/root/.cache/pypoetry/virtualenvs'
}
}
stages {
stage('检出') {
steps {
checkout([$class: 'GitSCM', branches: [[name: env.GIT_BUILD_REF]],
userRemoteConfigs: [[url: env.GIT_REPO_URL, credentialsId: env.CREDENTIALS_ID]]])
}
}
stage("Poetry Install") {
steps {
echo "install..."
sh 'git reset $MR_TARGET_SHA --soft && git restore --staged -- .'
sh 'poetry install'
sh 'cp card/settings_local.py.template card/settings_local.py'
sh 'cp rpcd.env.template rpcd.env'
}
}
stage('Code Check') {
parallel {
stage("Format") {
steps {
echo "formating..."
sh "poetry run scripts/format"
}
}
stage("Lint") {
steps {
echo "linting..."
sh "poetry run scripts/lint"
}
}
}
}
stage('发布文档') {
steps {
script {
echo '生成 API 文档'
sh 'apidoc -i card -o apidoc'
echo '发布到 CODING API 文档'
sh 'curl --fail -X POST -H "Authorization: token ${CODING_API_DOCS_DEPLOY_TOKEN}" -H "Accept:application/json" ${CODING_API_DOCS_DEPLOY_URL} -F "filename=@apidoc/api_data.json"'
echo '完成.'
}
}
}
}
}
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
[tool.poetry]
name = "card"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
[[tool.poetry.source]]
name = "coding"
url = "https://pypi-1593408770345:241c0cbfa8437730ad982ece687b5a06e78dcf83@gengmei-pypi.pkg.coding.net/tob/pypi/simple/"
default = true
[tool.poetry.dependencies]
python = "^3.8"
gm-types = "^7.74.18"
gm-protocol = "*"
raven = "6.10.0"
python-dateutil = "2.4.2"
uvicorn = "0.11.5"
gunicorn = "20.0.4"
aioredis = "1.3.1"
mkdocs = "1.0.4"
markdown-include = "0.5.1"
mkdocs-material = "4.6.3"
fastapi-rpcd = "1.6.23"
gm-client = "0.4.2"
pytest = "6.0.1"
pytest-asyncio = "0.14.0"
pytest-cov = "2.10.0"
pydantic = "1.6.1"
isort = "5.4.1"
autoflake = "1.3.1"
black = "19.10b0"
mypy = "0.770"
gm-upload = "0.0.91"
lxml = "4.6.1"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
#!/usr/bin/env bash
set -e
set -x
scripts/format
scripts/lint
scripts/test
\ No newline at end of file
#!/usr/bin/env bash
set -e
set -x
uvicorn fastapi_rpcd.asgi:application --reload
\ No newline at end of file
#!/usr/bin/env bash
echo "用法一, 校验 git changes not staged文件: ./script/format"
echo "用法一, 校验 指定文件: ./script/format app"
declare -a filteredFiles
if [ $# -eq 0 ]; then
files=`git diff --diff-filter=d --name-only | grep -E '^app.*$'`
if [ -z "${files}" ]; then
echo "no files changed to format..."
exit 0
fi
for file in "${files[@]}"
do
if [[ $file =~ \.py$ ]]; then
filteredFiles+=($file)
fi
done
else
filteredFiles=$@
fi
set -e
if [ -z "${filteredFiles}" ]; then
echo "no files changed to format..."
exit 0
else
autoflake --recursive --exclude settings* --remove-all-unused-imports --remove-unused-variables --ignore-init-module-imports --in-place ${filteredFiles[@]}
black ${filteredFiles[@]}
isort ${filteredFiles[@]}
fi
\ No newline at end of file
#!/usr/bin/env bash
echo "用法一, 校验 git changes not staged文件: ./script/lint"
echo "用法一, 校验 指定文件: ./script/lint app"
declare -a filteredFiles
if [ $# -eq 0 ]; then
files=`git diff --diff-filter=d --name-only | grep -E '^app.*$'`
if [ -z "${files}" ]; then
echo "no files changed to format..."
exit 0
fi
for file in "${files[@]}"
do
if [[ $file =~ \.py$ ]]; then
filteredFiles+=($file)
fi
done
else
filteredFiles=$@
fi
set -e
set -x
if [ -z "${filteredFiles}" ]; then
echo "no files changed to lint..."
exit
else
mypy ${filteredFiles[@]}
isort --check-only ${filteredFiles[@]}
fi
#!/usr/bin/env bash
set -e
set -x
pytest --cov=test --cov=sales_lead --cov=tests --cov-report=term-missing --cov-fail-under=45 ${@}
from card import __version__
def test_version():
assert __version__ == '0.1.0'
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment