# coding=utf8 from __future__ import unicode_literals, absolute_import, print_function import datetime import json import pili from django.conf import settings from django.db.models import Q from gm_rpcd.internals.exceptions import RPCDFaultException from gm_types.gaia import LIVE_QINIU_STATUS_TYPE, LIVE_MSG_TYPE, LIVE_LIST_TYPE, FILTER_WORD_TYPE from gm_types.mimas import ATTENTION_NOTIFY_TYPE, LIVE_CLARITY from live.tasks import FakeFansManager, push_follow_user, save_replay_url, add_live_robot_comment, \ add_live_robot_user_enter from talos.logger import qiniu_logger, info_logger from talos.statistic import diary_view_increase_num, topic_view_increase_num from talos.rpc import ( bind, bind_context, ) from talos.decorators import cache_page, list_interface from talos.rpc import CODES, gen from talos.cache.live import live_msg_cache from talos.services.doctor import DoctorService from live.managers import LiveMsgManager, get_live_notice_status from live.managers import ( get_live_notice_info_array_for_live_list, get_live_notice_info_for_index, get_live_replay_list_by_user_id, get_live_notice_info_for_index_v1, get_live_notice_info_for_index_v2, ) from social.models import UserFollow from live.models import LiveChannel from live.models import LiveStream from live.models import LiveWhiteList from live.models import ZhiboConfig from live.models import Problem from live.models import LiveFloatingConfig from live.services import LiveService from talos.services import get_user_from_context from talos.services.user import UserService from talos.services.goods import GoodsService from talos.tasks.follow_msg_create import create_information_db from talos.tools.filterword_tool import filterword_by_custom from talos.tasks.follow_msg_create import create_information_db @bind_context('mimas/live/live_enable') def check_live_enable(ctx, _user_id=None): if _user_id is None: user = get_user_from_context(ctx) if not user: return {'live_enable': False} _user_id = user.id if not _user_id: return {'live_enable': False} live_enable = LiveWhiteList.objects.filter(user_id=_user_id, is_online=True).exists() return {'live_enable': live_enable} @bind_context('mimas/live/get_clannel_url/audience') def get_stream_url(ctx, channel): user = get_user_from_context(ctx) stream = LiveStream.objects.filter(channel_id=channel).order_by('-updated_time').first() if not stream: return LiveChannel.objects.get(id=channel).empty_data(user) return stream.data_live(user) @bind_context('mimas/live/get_clannels_url/audience') def get_streams_url(ctx, ids): user = get_user_from_context(ctx) q = Q(pk__in=ids) & (Q(status=True, is_finish=False) | Q(topic__isnull=False, is_finish=True, topic__is_online=True)) streams = LiveStream.objects.filter(q) if not streams: return [] return LiveStream.batch_data_live(streams, user) @bind('mimas/live/get_floating_live') def get_floating_live(is_test=False): """获取悬浮窗展示的直播 如果运营设置显示的直播正在直播中,返回该直播 否则取商户白名单里最新开始的直播 """ lfc = LiveFloatingConfig() config_stream_id = lfc.get() live = LiveService.current_living_with_floating_config(config_stream_id, is_test=is_test) if not live: lives = LiveService.current_living_with_whitelist(is_test=is_test) if lives: live = lives[0] data = {} if live: data = { 'channel_id': live.channel_id, 'title': live.title, 'url': live.url, 'stream_id': live.id, } return data @bind('mimas/live/has_living_now') def check_has_living_now(is_test=False): """判断当前是否有正在进行的直播""" ret = LiveService.has_living_now(is_test=is_test) return {'has_living_now': ret} @bind_context('mimas/live/feeds/v2') def get_live_feed_v2(ctx, start_num, count, is_finish=False): user = get_user_from_context(ctx) data = [] if not is_finish: query = Q(status=True, is_finish=False) streams = LiveStream.objects.filter(query).order_by('-id')[start_num: start_num + count] person_ids = [stream.channel.person_id for stream in streams] doctors = DoctorService.list_doctors_by_person_ids(person_ids) data = LiveStream.batch_data(streams, user) for item in data: doctor = doctors.get(item.get("person_id")) item["doctor"] = doctor return data query = Q(topic__isnull=False, is_finish=is_finish, topic__is_online=True) streams = LiveStream.objects.filter(query).order_by('-id')[start_num: start_num + count] ids = [] for stream in streams: ids.append(stream.topic_id) topic_view_increase_num(ids) data.extend(LiveStream.batch_replay_data(streams, user)) return data @bind_context('mimas/live/get_clannel_url/anchor', login_required=True) def get_clannel_url_anchor(ctx, channel): user = get_user_from_context(ctx) stream = LiveStream.objects.filter(channel_id=channel).order_by('-updated_time').first() stream.set_first_user_enter_time_cache() if not stream: gen(CODES.LIVE_IS_NOT_FOUND) if user.person_id != stream.channel.person_id: gen(CODES.OPERATION_NOT_SUPPORTED) return stream.data(user) @bind_context('mimas/live/live_create', login_required=True) def create_live_stream(ctx, title, cover_url, tag_id, hospital_id, service_id, notice='', clarity=LIVE_CLARITY.DEFAULT, is_test=False): user = get_user_from_context(ctx) if not user: gen(CODES.OPERATION_NOT_SUPPORTED) if not LiveWhiteList.objects.filter(user_id=user.id).exists(): gen(CODES.OPERATION_NOT_SUPPORTED) # 对直播标题进行敏感词过滤,本次不对数字进行过滤 filterword_by_custom(filter_type=FILTER_WORD_TYPE.TOPIC_CONTENT, content=title, isfilternum=False) mac = pili.Mac(settings.QINIU_ACCESS_KEY, settings.QINIU_SECRET_KEY) if not LiveChannel.objects.filter(person_id=user.person_id).exists(): channel = LiveChannel() channel.user_id = user.id channel.person_id = user.person_id else: channel = LiveChannel.objects.get(person_id=user.person_id) channel.updated_time = datetime.datetime.now() channel.save() if not LiveStream.objects.filter(channel=channel).exists(): stream = LiveStream.create_stream(user, channel, title, cover_url, tag_id, hospital_id, service_id, notice, clarity, is_test=is_test) is_create = True else: stream = channel.now_stream stream, is_create = LiveStream.create_or_update_stream(stream, user, channel, title, cover_url, tag_id, hospital_id, service_id, notice, clarity, is_test=is_test) url = pili.rtmp_publish_url(settings.QINIU_LIVE_DOMAIN, settings.QINIU_HUB_NAME, stream.stream_key, mac, settings.QINIU_LIVE_TIMEOUT) data = {'url': url, 'channel': channel.id} live_msg_cache.set('channel' + str(stream.channel.id), json.dumps(stream.data_for_redis())) if is_create: # 添加假用户进入 # add_live_robot_user_enter(stream_id=stream.id) push_follow_user.delay(user_id=user.id, user_nick_name=user.nickname, channel_id=channel.id) # 粉丝消息页面通知数据保存到db result = create_information_db.delay( user_id=user.id, source_type=ATTENTION_NOTIFY_TYPE.LIVE, business_id=stream.id, ) info_logger.info('开始保存给粉丝的消息:{},直播id:{}'.format(result.task_id, stream.id)) return data @bind('mimas/live/callback') def qiniu_callback(data): qiniu_logger.info(json.dumps(data)) call_data = json.loads(data) stream_id = call_data['data']['id'] stream_id = stream_id.split('.')[-1] stream = LiveStream.objects.get(stream_key=stream_id) status = call_data['data']['status'] channel = stream.channel if status == LIVE_QINIU_STATUS_TYPE.DISCONNECTED: stream.status = False channel.status = False elif status == LIVE_QINIU_STATUS_TYPE.CONNECTED: stream.status = True channel.status = True stream.updated_time = datetime.datetime.now() channel.updated_time = datetime.datetime.now() stream.save() channel.save() @bind_context('mimas/live/sendmsg', login_required=True) def send_live_msg(ctx, channel, msg, is_vest_send=False): user = get_user_from_context(ctx) # 对直播互动进行敏感词过滤,本次不对数字进行过滤 try: filterword_by_custom(filter_type=FILTER_WORD_TYPE.TOPIC_REPLY, content=msg, isfilternum=False) except RPCDFaultException: gen(CODES.LIVE_DANMU_CONTAIN_FORBID_WORD) if not LiveMsgManager.check_msg_enable(user): gen(CODES.LIVE_MSG_TOO_FREQUENTLY) if UserService.user_in_black_list(user_id=user.id): return gen(CODES.LIVE_IS_BAN_SEND_MSG) stream = LiveChannel.objects.get(id=channel).now_stream if not stream: gen(CODES.LIVE_IS_NOT_FOUND) live_msg_manager = LiveMsgManager() msg = live_msg_manager.add_send_msg(stream, user, msg, is_vest_send) return msg @bind_context('mimas/live/getmsg') def get_live_msg(ctx, channel, id): live_msg_manager = LiveMsgManager() user = get_user_from_context(ctx) channel = channel.strip() if channel else channel stream = LiveChannel.objects.get(id=channel).now_stream if not stream: gen(CODES.LIVE_IS_NOT_FOUND) data = live_msg_manager.get_user_send_msg(stream, id, user) return data @bind_context('mimas/live/feed') def get_live_feed(ctx, start_num, count): user = get_user_from_context(ctx) data = [] if start_num == 0: streams = LiveStream.objects.select_related('topic').filter( status=True, is_finish=False ).order_by('-created_time') for stream in streams: data.append(stream.data(user)) streams = LiveStream.objects.filter( topic__isnull=False, topic__is_online=True ).order_by('-id')[start_num: start_num + count] topic_ids = [] diary_ids = [] for stream in streams: topic_ids.append(stream.topic_id) diary_ids.append(stream.topic.diary_id) topic_view_increase_num(topic_ids) diary_view_increase_num(diary_ids) for stream in streams: data.append(stream.replay_data_for_list(user)) return data @bind_context('mimas/live/feed/v1') def get_live_feed_v1(ctx, start_num, count, type=LIVE_LIST_TYPE.INDEX): user = get_user_from_context(ctx) data = [] if start_num == 0: streams = LiveStream.objects.filter(status=True, is_finish=False).order_by('-created_time') data = LiveStream.batch_data(streams, user) notice_info = get_live_notice_info_array_for_live_list(user, type) data = data + notice_info streams = LiveStream.objects.filter(topic__isnull=False, topic__is_online=True).order_by('-id')[start_num: start_num + count] ids = [] for stream in streams: ids.append(stream.topic_id) topic_view_increase_num(ids) data.extend(LiveStream.batch_replay_data(streams, user)) return data @bind_context('mimas/live/like', login_required=True) def set_live_like(ctx, channel_id=None): user = get_user_from_context(ctx) user_id = user.id key = 'live_like:{user_id}:{channel_id}'.format(user_id=user_id, channel_id=channel_id) has_like = live_msg_cache.get(key) if has_like: gen(CODES.LIVE_LIKE_TIME_LIMIT) if not LiveMsgManager.check_msg_enable(user): gen(CODES.LIVE_MSG_TOO_FREQUENTLY) if UserService.user_in_black_list(user_id=user.id): return gen(CODES.LIVE_IS_BAN_SEND_MSG) stream = LiveChannel.objects.get(id=channel_id).now_stream if not stream: gen(CODES.LIVE_IS_NOT_FOUND) live_msg_manager = LiveMsgManager() msg = u'点亮了你的房间' msg = live_msg_manager.add_live_like_msg(stream, user, msg, type=LIVE_MSG_TYPE.LIKE_MSG) live_msg_cache.setex(key, 60 * 2, json.dumps('like')) return msg @bind_context('mimas/live/enter') def person_enter_live_room(ctx, channel): user = get_user_from_context(ctx) live_channel = LiveChannel.objects.filter(id=channel).first() if not live_channel: gen(CODES.LIVE_IS_NOT_FOUND) if user: live_msg_manager = LiveMsgManager() live_msg_manager.add_enter_msg(live_channel.now_stream, user) live_channel.add_view_num() info_logger.info("live_enter: user_id={user_id}, stream_id={stream_id}".format(user_id=user and user.id or '', stream_id=live_channel.now_stream.id)) FakeFansManager.add_live_enter_person_num_for_sleep_user(live_channel.now_stream.id) return '' @bind_context('mimas/live/leave') def person_leave_live_room(ctx, channel): user = get_user_from_context(ctx) if user: pass stream = LiveChannel.objects.get(id=channel).now_stream live_msg_manager = LiveMsgManager() if user: live_msg_manager.add_leave_msg(stream, user) ''' ctime = ts_now_as_score() noti_key = 'noti_key' + channel if channel.person.user == user: notification_cache.zadd(noti_key, ctime, False) ''' stream.channel.sub_view_num() return '' @bind_context('mimas/live/change_title', login_required=True) def change_channel_title(ctx, channel, title): user = get_user_from_context(ctx) if not user: gen(CODES.OPERATION_NOT_SUPPORTED) filterword_by_custom(filter_type=FILTER_WORD_TYPE.TOPIC_CONTENT, content=title, isfilternum=False) stream = LiveChannel.objects.get(id=channel).now_stream if stream.channel.person_id != user.person_id: gen(CODES.OPERATION_NOT_SUPPORTED) stream.title = title stream.updated_time = datetime.datetime.now() stream.save() live_msg_cache.set( 'channel' + str(stream.channel.id), json.dumps(stream.data_for_redis()) ) return '' @bind('mimas/live/get_channel_status') @cache_page(10) def get_channel_status(channel): channel = LiveChannel.objects.get(id=channel) if not channel: return {'status': False} stream = channel.now_stream if not stream: return {'status': False} return {'status': stream.live_status_stream} @bind_context('mimas/live/get_live_notice') def get_live_notice(ctx): user = get_user_from_context(ctx) return get_live_notice_info_for_index(user) @bind_context('mimas/live/get_live_notice_v1') def get_live_notice_v1(ctx, just_notice=False): user = get_user_from_context(ctx) return get_live_notice_info_for_index_v1(user, just_notice=True) @bind_context('mimas/live/get_live_notice_v2') def get_live_notice_v2(ctx, live_ids): user = get_user_from_context(ctx) return get_live_notice_info_for_index_v2(user, live_ids) @bind('mimas/live/get_live_notice_ship') def get_live_notice_ship(): now = datetime.datetime.now() config = ZhiboConfig.objects.filter(is_online=True, start_time__lte=now, end_time__gte=now).order_by('-start_time').first() if config: return config.data_for_ship() return '' @bind_context('mimas/live/finish', login_required=True) def finish_live(ctx, channel_id): user = get_user_from_context(ctx) try: channel = LiveChannel.objects.get(id=channel_id, person_id=user.person_id) except: gen(CODES.OPERATION_NOT_SUPPORTED) stream = channel.now_stream stream.is_finish = True stream.finish_time = datetime.datetime.now() stream.save(update_fields=['is_finish', 'finish_time']) save_replay_url.delay(stream_id=stream.id) @bind_context('mimas/live/get_replay_info_by_topic_id') def get_replay_topic_info(ctx, topic_id): user = get_user_from_context(ctx) try: topic = Problem.objects.get(id=topic_id) except Problem.DoesNotExist: gen(CODES.TOPIC_NOT_FOUND) stream = topic.stream data = stream.replay_data(user) data['share_data'] = topic.get_topic_share_data_from_db() return data @bind('mimas/live/get_replay_danmu') def get_replay_danmu(topic_id): try: topic = Problem.objects.get(id=topic_id) except Problem.DoesNotExist: gen(CODES.TOPIC_NOT_FOUND) stream = topic.stream return stream.replay_danmu @bind('mimas/live/get_services_by_hospital') @cache_page(30 * 60) def get_services_by_hospital(hospital_id, q=None): services = GoodsService.get_services_by_hospital_id(hospital_id=hospital_id, q=q) return services @bind('mimas/live/get_person_live_count') def get_person_live_count(user_id): try: problem_ids = Problem.objects.filter(user_id=user_id, is_online=True).values_list('id', flat=True) count = 0 if problem_ids: count = LiveStream.objects.filter(topic__id__in=problem_ids).count() return count except LiveStream.DoesNotExist: return 0 @bind_context('mimas/live/get_list_by_user_id') @list_interface(offset_name='start_num', limit_name='count') def get_live_list_by_user_id(ctx, user_id, start_num=0, count=10, is_hospital=False): """get replay list.""" viewer_user = get_user_from_context(ctx) return get_live_replay_list_by_user_id( user_id, view_user=viewer_user, start_num=start_num, count=count, need_service_or_live_reply=not is_hospital ) @bind_context('mimas/live/get_channel_info_by_person_id') def get_channel_info_by_person_id(ctx, person_id): viewer_user = get_user_from_context(ctx) try: channel = LiveChannel.objects.get(person_id=person_id) return channel.empty_data(viewer_user) except LiveChannel.DoesNotExist: return {} @bind_context('mimas/live/get_channel_info_by_zhiboconfig_id') def get_channel_info_by_zhiboconfig_id(ctx, zhiboconfig_id): # 什么鬼结构 try: zc = ZhiboConfig.objects.get(id=zhiboconfig_id) except ZhiboConfig.DoesNotExist: return {} user = UserService.get_user_by_user_id(zc.anchor_user_id) viewer_user = get_user_from_context(ctx) try: channel = LiveChannel.objects.get(person_id=user.person_id) return channel.empty_data(viewer_user) except LiveChannel.DoesNotExist: return {} @bind_context('mimas/user/follow/live/list') def user_follow_live_list(ctx, offset=0, count=10): """ 关注的用户的直播列表 :param ctx: :param offset: :param count: :return: """ user = get_user_from_context(ctx) if not user: return gen(CODES.LOGIN_REQUIRED) data = [] user_follow_ids = list(UserFollow.objects.filter( user_id=user.id, bond=True).order_by('-id').values_list('follow_id', flat=True)) stream_users = UserService.get_users_by_user_ids(user_follow_ids) persion_ids = [] user_ids = [] for _id, user in stream_users.items(): persion_ids.append(str(user.person_id)) user_ids.append(int(user.id)) channel_ids = list(LiveChannel.objects.filter(person_id__in=persion_ids).values_list('id', flat=True)) if offset == 0: streams = LiveStream.objects.filter( status=True, is_finish=False, channel_id__in=channel_ids).order_by('-id') data = LiveStream.batch_data(streams, user) notice_info = get_live_notice_status(user, user_ids) data = data + notice_info streams = LiveStream.objects.filter( channel_id__in=channel_ids, topic_id__isnull=False, topic__is_online=True ).order_by('-id')[offset: offset+count] data.extend(LiveStream.batch_replay_data(streams, user)) return data @bind_context('mimas/user/my_live') def get_my_live_by_user_id(ctx, user_id, offset=0, count=10): """用户主页帖子 直播""" viewer = get_user_from_context(ctx) user = UserService.get_user_by_user_id(user_id=user_id) data = [] if offset == 0: streams = LiveStream.objects.filter(status=True, is_finish=False, channel__person_id=user.person_id).order_by('-created_time') data = LiveStream.batch_data(streams, viewer) notice_info = get_live_notice_info_array_for_live_list(user, type, channel_person_ids=[user.person_id]) data = data + notice_info streams = LiveStream.objects.filter(topic__isnull=False, channel__person_id=user.person_id, topic__is_online=True)\ .order_by('-id')[offset: offset + count] ids = [] for stream in streams: ids.append(stream.topic_id) topic_view_increase_num(ids) data.extend(LiveStream.batch_replay_data(streams, user)) if offset == 0: data = data[:count] return data @bind('mimas/live/floating_config/get') def get_live_floating_config(): lfc = LiveFloatingConfig() ret = lfc.get() return {'floating_stream_id': ret} @bind('mimas/live/floating_config/set') def set_live_floating_config(stream_id): """设置直播悬浮窗展示的channel_id""" lfc = LiveFloatingConfig() lfc.set(stream_id) return {} @bind('mimas/live/get_topics_views') def get_topics_views(topic_ids): """ 根据topic_ids获取对应的浏览信息 """ topics = Problem.objects.filter(id__in=topic_ids) return {str(topic.id): topic.view_num for topic in topics} @bind('mimas/live/notimsg') def live_notimsg(stream_id, user_id, username, msg, msg_type): """ 内部发送直播系统弹幕 """ live_msg_manager = LiveMsgManager() live_msg_manager.add_system_msg(stream_id, user_id, username, msg, msg_type) return {"success": True}