# -*- coding: UTF-8 -*- # coding=utf8 from __future__ import unicode_literals, absolute_import, print_function import datetime import json import requests import time from django.conf import settings from django.db import IntegrityError from django.db.models import Q from gm_types.gaia import LIVE_LIST_TYPE from gm_types.gaia import LIVE_MSG_TYPE from live.models import LiveChannel, LiveStream, ZhiboConfig from live.models import LiveMsg from talos.cache.live import live_msg_cache, live_user_enter_check from talos.cache.live import live_user_msg_check from talos.manager.topic import topic_list_manager from talos.services.user import UserService from talos.services import DoctorService from qa.utils.time import get_timestamp_or_none def get_or_create_channel_by_user_id(user_id): try: user = UserService.get_user_by_user_id(user_id) channel, _ = LiveChannel.objects.get_or_create(person_id=user.person_id, user_id=user_id) return channel except IntegrityError: channel = LiveChannel.objects.get(person_id=user.person_id) return channel def get_calc_times(url): url = settings.QINIU_REPLAY_LIVE_DOMAIN + url times = [] r = requests.get(url) lines = r.text.split('\n') for line in lines: if line.startswith('/fragments'): temp = line.split('/')[-1].split('.')[0] t = temp.split('-') times.append({'start': int(int(t[0]) / 1000), 'end': int(int(t[1]) / 1000)}) if len(times) >= 2: atime = times[-1] btime = times[-2] if atime['start'] < btime['end']: atime['start'] = btime['end'] return times def create_stream_replay_danmu(stream_id): ''' #具体算法实现参考 http://wiki.gengmei.cc/pages/viewpage.action?pageId=4129172 ''' result = {} stream = LiveStream.objects.get(id=stream_id) msgs = LiveMsg.objects.filter(stream_id=stream_id) start_time = stream.created_time items = get_calc_times(stream.save_replay_url) live_start_time = items[0]['start'] need_del_time = 0 for i in range(0, len(items)): live_time = items[i]['end'] - live_start_time live_before_time = items[i]['start'] - live_start_time if i > 0: need_del_time = need_del_time + items[i]['start'] - items[i - 1]['end'] for msg in msgs: msg_time = int((msg.created_time - start_time).total_seconds()) if live_before_time <= msg_time < live_time: if result.get(str(msg_time - need_del_time)): result[str(msg_time - need_del_time)].append(msg.data()) else: result[str(msg_time - need_del_time)] = [msg.data()] stream.replay_danmu = json.dumps(result) stream.save(update_fields=['replay_danmu']) def get_live_notice_info_for_index(user): now = datetime.datetime.now() config = ZhiboConfig.objects.filter( is_online=True, start_time__lte=now, end_time__gte=now, is_shouyejingxuan=True).order_by('-start_time').first() if config: config = config.data_for_backend(user) channel_id = config['channel_id'] channel = LiveChannel.objects.get(id=channel_id) stream = channel.now_stream if stream: if stream.live_status_stream: return stream.data(user) else: return config else: return config return None def get_live_notice_info_for_index_v1(user, just_notice=False): now = datetime.datetime.now() zhibo = ZhiboConfig.objects.filter( is_online=True, start_time__lte=now, end_time__gte=now, is_shouyejingxuan=True).order_by('-start_time').first() if not zhibo: return None doctor_info = DoctorService.get_doctor_by_user_id(user_id=zhibo.anchor_user_id) doctor = None if doctor_info: doctor = { "id": doctor_info.id, "user_id": doctor_info.user_id, "name": doctor_info.name, "portrait": doctor_info.portrait, } stream = get_or_create_channel_by_user_id(zhibo.anchor_user_id).now_stream if stream and stream.live_status_stream: if just_notice: # 直播开始了不展示 return None conf = zhibo.data_for_backend(user, trailer=False) data = stream.data_v1(user) data['content'] = conf['content'] data['desc'] = conf['desc'] data['conf_id'] = conf['id'] data['choice_cover_url'] = conf['choice_cover_url'] else: data = zhibo.data_for_backend(user) data["doctor"] = doctor return data def get_live_notice_info_for_index_v2(user, live_ids): now = datetime.datetime.now() zhibos = ZhiboConfig.objects.filter( id__in=live_ids, is_online=True, start_time__lte=now, end_time__gte=now, ) if not zhibos: return [] res = [] for zhibo in zhibos: stream = get_or_create_channel_by_user_id(zhibo.anchor_user_id).now_stream doctor = {} if zhibo.anchor_user_id: doctor_info = DoctorService.get_doctor_by_user_id(user_id=zhibo.anchor_user_id) if doctor_info: doctor = { "id": doctor_info.id, "user_id": doctor_info.user_id, "name": doctor_info.name, "portrait": doctor_info.portrait, "hospital_id": doctor_info.hospital_id, } if stream and stream.live_status_stream: conf = zhibo.data_for_backend(user, trailer=False) data = stream.data_v1(user) data['content'] = conf['content'] data['desc'] = conf['desc'] data['conf_id'] = conf['id'] data['zhibo_time'] = conf['zhibo_time'] data['choice_cover_url'] = conf['choice_cover_url'] else: data = zhibo.data_for_backend(user) data['topic_id'] = stream.topic_id if stream and stream.topic else None data["doctor"] = doctor res.append(data) return res def get_live_notice_info_array_for_live_list(user, live_type, channel_person_ids=[]): # channel_person_ids 渠道对应的person_id result = [] now = datetime.datetime.now() configs = ZhiboConfig.objects.filter( is_online=True, start_time__lte=now, end_time__gte=now).order_by('-start_time') if live_type == LIVE_LIST_TYPE.INDEX: configs = configs.filter(is_shouyejingxuan=True) elif live_type == LIVE_LIST_TYPE.INDEX_VIDEO: configs = configs.filter(is_shouyezhibo=True) elif live_type == LIVE_LIST_TYPE.VIDEO_LIST: configs = configs.filter(is_zhiboliebiao=True) for config in configs: config = config.data_for_backend(user) channel_id = config['channel_id'] if channel_person_ids: channel = LiveChannel.objects.filter(id=channel_id, person_id__in=channel_person_ids).order_by('updated_time').first() if not channel: continue else: channel = LiveChannel.objects.get(id=channel_id) stream = channel.now_stream # 如果没在直播 就添加预告状态 在直播的剔除预告 if stream: if not stream.live_status_stream: result.append(config) else: result.append(config) return result def get_live_replay_list_by_user_id(user_id, view_user, start_num=0, count=10, need_service_or_live_reply=True): if not user_id: return [] ls = LiveStream.objects.select_related('topic').filter( topic__user_id=user_id, topic__is_online=True ).order_by('-created_time')[start_num: start_num + count] if not ls: return [] topic_objs = [l.topic for l in ls] data = topic_list_manager.get_list_data_by_topic_objs( topic_objs, view_user.id, need_service_or_live_reply=need_service_or_live_reply ) for live_topic in data: for l in ls: if l.topic.id == live_topic['problem']['topic_id']: live_topic['live_update_time'] = get_timestamp_or_none(l.updated_time) return data class LiveMsgManager(object): def __init__(self): self.msg_cache = live_msg_cache @staticmethod def get_live_redis_key(id): return 'msg' + str(id) @staticmethod def get_live_like_redis_key(id): return 'live_like' + str(id) def get_user_send_msg(self, stream, msg_id, user): if settings.LIVE_MSG_REDIS_FLAG: return self._get_user_send_msg_redis(stream, msg_id, user) else: return self._get_user_send_msg_mysql(stream, msg_id, user) def _get_user_send_msg_redis(self, stream, msg_id, user): key = self.get_live_redis_key(stream.id) newest_id = self.msg_cache.llen(key) # id 为空 为第一次进入 直接返回无数据即可 或者消息id 等于最新的消息数量 if msg_id == '' or str(msg_id) == str(newest_id): return { 'news': [], 'newest_id': newest_id, 'audience_num': stream.channel.live_view_num, 'title': stream.title } msg_id = int(msg_id) msgs = self.msg_cache.lrange(key, msg_id, newest_id) news = [] for msg in msgs: msg = json.loads(msg.decode()) msg_id += 1 if user and user.id == msg.get('user_id', 0): continue else: msg['id'] = msg_id if msg.get('user_id'): del msg['user_id'] news.append(msg) return { 'news': news, 'newest_id': newest_id, 'audience_num': stream.live_view_num, 'title': stream.title } @staticmethod def _get_user_send_msg_mysql(stream, msg_id, user): newest_msg = LiveMsg.objects.filter(stream=stream).order_by('-id').first() newest_id = newest_msg.id if msg_id == '': now = datetime.datetime.now() msgs = LiveMsg.objects.filter(stream=stream, created_time__gte=now) else: msgs = LiveMsg.objects.filter(stream=stream, id__gt=msg_id) if user: msgs = msgs.filter(~Q(person_id=user.person_id)) msgs = msgs.order_by('id')[:30] news = [] for msg in msgs: news.append(msg.data()) return { 'news': news, 'newest_id': newest_id, 'audience_num': stream.channel.live_view_num, 'title': stream.title } def add_system_msg(self, stream_id, user_id, username, msg, msg_type): """ 发送 系统弹幕. """ k = self.get_live_redis_key(stream_id) msg = { 'text': msg, 'name': username, 'type': msg_type, 'user_id': 0 } self.msg_cache.rpush(k, json.dumps(msg)) def add_enter_msg(self, stream, user): # if not self.check_user_enter_enable(stream, user): # return if settings.LIVE_MSG_REDIS_FLAG: self._add_enter_msg_redis(stream, user) else: self._add_enter_msg_mysql(stream, user) def _add_enter_msg_redis(self, stream, user): k = self.get_live_redis_key(stream.id) if stream.channel.person_id == user.person_id: msg = { 'text': u'主播回来啦,马上恢复直播', 'name': '', 'type': LIVE_MSG_TYPE.NOTIFI_MSG, 'user_id': '0' } self.msg_cache.rpush(k, json.dumps(msg)) else: user_name = self.format_msg_username(user.nickname, is_sensitive=False) msg = { 'text': u'进入房间', 'name': user_name, 'type': LIVE_MSG_TYPE.ENTER_MSG, 'user_id': '0' } self.msg_cache.rpush(k, json.dumps(msg)) stream.set_first_user_enter_time_cache() def format_msg_username(self, username: str, is_sensitive=True): """ 格式化 弹幕消息中的 用户名. """ length = len(username) if length == 1: if is_sensitive: return "*" else: return username elif length == 2: if is_sensitive: return username[0] + "*" else: return username elif length <= 5: if is_sensitive: return username[0] + (length - 2) * "*" + username[-1] else: return username return username[:2] + "..." + username[-3:] @staticmethod def _add_enter_msg_mysql(stream, user): if stream.channel.person_id == user.person_id: msg = LiveMsg.objects.create(person_id=user.person_id, stream=stream, msg=u'主播回来啦,马上恢复直播', type=LIVE_MSG_TYPE.NOTIFI_MSG) else: msg = LiveMsg.objects.create(person_id=user.person_id, stream=stream, msg=u'进入房间', type=LIVE_MSG_TYPE.ENTER_MSG) return msg.data() def add_leave_msg(self, stream, user): if settings.LIVE_MSG_REDIS_FLAG: self._add_leave_msg_redis(stream, user) else: self._add_leave_msg_mysql(stream, user) @staticmethod def _add_leave_msg_mysql(stream, user): if stream.channel.person_id == user.person_id: LiveMsg.objects.create(person_id=user.person_id, stream=stream, msg=u'主播离开一会,不要走开哇', type=LIVE_MSG_TYPE.NOTIFI_MSG) def _add_leave_msg_redis(self, stream, user): k = self.get_live_redis_key(stream.id) if stream.channel.person_id == user.person_id: msg = { 'text': u'主播离开一会,不要走开哇', 'name': '', 'type': LIVE_MSG_TYPE.NOTIFI_MSG, 'user_id': 0 } self.msg_cache.rpush(k, json.dumps(msg)) def add_send_msg(self, stream, user, msg, is_vest_send=False, type=LIVE_MSG_TYPE.TEXT_MSG): if settings.LIVE_MSG_REDIS_FLAG: msg = self._add_send_msg_redis(stream, user, msg, is_vest_send, type) else: msg = self._add_send_msg_mysql(stream, user, msg, is_vest_send) return msg @staticmethod def _add_send_msg_mysql(stream, user, msg, is_vest_send=False): msg = LiveMsg.objects.create(person_id=user.person_id, stream=stream, msg=msg, is_vest_send=is_vest_send) return msg.data() def _add_send_msg_redis(self, stream, user, msg, is_vest_send=False, type=LIVE_MSG_TYPE.TEXT_MSG): self._add_send_msg_mysql(stream, user, msg, is_vest_send) msg = { 'id': 0, # 用来占位 'text': msg, 'name': user.nickname, 'type': type, 'user_id': user.id } key = self.get_live_redis_key(stream.id) self.msg_cache.rpush(key, json.dumps(msg)) return msg def add_live_like_msg(self, stream, user, msg, type=LIVE_MSG_TYPE.TEXT_MSG): if settings.LIVE_MSG_REDIS_FLAG: msg = self._add_live_like_msg_redis(stream, user, msg, type) else: msg = self._add_send_msg_mysql(stream, user, msg) return msg def _add_live_like_msg_redis(self, stream, user, msg, type=LIVE_MSG_TYPE.TEXT_MSG): msg = { 'id': 0, # 用来占位 'text': msg, 'name': user.nickname, 'type': type, 'user_id': user.id } live_like_key = self.get_live_like_redis_key(stream.id) timestamp = int(time.time()) time_key = str(timestamp) self.msg_cache.zadd(live_like_key, timestamp, time_key) key = self.get_live_redis_key(stream.id) self.msg_cache.rpush(key, json.dumps(msg)) return msg @staticmethod def check_msg_enable(user): if settings.SEND_MSG_TIME == 0: return True if live_user_msg_check.get(str(user.id)): return False else: live_user_msg_check.setex(str(user.id), settings.SEND_MSG_TIME, settings.SEND_MSG_TIME) return True @staticmethod def user_enter_stream_key(stream, user): return str(stream.id) + ':' + str(user.id) @staticmethod def check_user_enter_enable(stream, user): if live_user_enter_check.get(LiveMsgManager.user_enter_stream_key(stream, user)): return False else: live_user_enter_check.setex(LiveMsgManager.user_enter_stream_key(stream, user), settings.ENTER_MSG_TIME, settings.ENTER_MSG_TIME) return True def clear_redis(self, stream_id): self.msg_cache.delete(self.get_live_redis_key(stream_id)) self.msg_cache.delete(self.get_live_like_redis_key(stream_id)) def get_live_notice_status(user, persion_ids): result = [] now = datetime.datetime.now() configs = ZhiboConfig.objects.filter( is_online=True, start_time__lte=now, end_time__gte=now, anchor_user_id__in=persion_ids ).order_by('-id') for config in configs: config = config.data_for_backend(user) channel_id = config['channel_id'] channel = LiveChannel.objects.get(id=channel_id) stream = channel.now_stream # 如果没在直播 就添加预告状态 在直播的剔除预告 if stream: if not stream.live_status_stream: result.append(config) else: result.append(config) return result