# coding=utf-8 import datetime import hashlib import json import pili import random import requests import time import os import ffmpy from urllib.parse import urljoin from celery import shared_task # from celery_once import QueueOnce from django.conf import settings from django.db.models import Q from django.utils import timezone from gm_upload import ( set_video_watermark, fetch_picture_and_save_to_qiniu, video_clipping, upload_file, video_delete ) from gm_upload.utils.qiniu_tool import QiniuTool from gm_types.gaia import VIDEO_CODE_STATUS, QINIU_VIDEO_HANDLE_STATUS_CODE, LIVE_MSG_TYPE from gm_types.push import PUSH_INFO_TYPE, AUTOMATED_PUSH from gm_protocol import GmProtocol from live.managers import LiveMsgManager from live.managers import create_stream_replay_danmu from live.models import LiveStream, LiveChannel, LiveComment, LiveMsg from social.models import SocialInfo from talos.cache.base import community_video_handle_cache from talos.cache.gaia import ( sleep_action_fans_cache, sleep_noaction_fans_cache, ) from talos.cache.live import ( live_msg_cache, live_other_kv_cache, live_fake_vote_cache, live_fake_comment_cache, live_fake_comment_turns_cache, live_robot_ratelimit_cache ) from talos.libs.datetime_utils import get_timestamp_or_none from talos.models.topic.video import Video, VideoLibrary from talos.logger import info_logger from talos.rpc import logging_exception, get_current_rpc_invoker from talos.services import UserService, UserConvertService from utils.UploadVideoPicture import UploadVideoPicture from utils.rpc import get_rpc_invoker from utils.push import push_task_to_user_multi ts_now_as_score = lambda: get_timestamp_or_none(timezone.now()) @shared_task def save_replay_url(stream_id=None): mac = pili.Mac(settings.QINIU_ACCESS_KEY, settings.QINIU_SECRET_KEY) client = pili.Client(mac) hub = client.hub(settings.QINIU_HUB_NAME) if stream_id: streams = LiveStream.objects.filter(id=stream_id) else: two_day_ago = datetime.datetime.now() - datetime.timedelta(days=2) streams = LiveStream.objects.filter(save_replay_url='', status=0, created_time__gt=two_day_ago) for stream in streams: timedetla = datetime.datetime.now() - stream.updated_time if not stream.is_finish and timedetla.seconds < settings.LIVE_TIME_OUT: continue try: s = hub.get(stream.stream_key) url = s.save_as(0, ts_now_as_score()) stream.save_replay_url = url['fname'] if stream.is_finish: stream.save(update_fields=['save_replay_url']) else: # todo 结束状态一致,给七牛云发一个直播结束的信息 stream.is_finish = True stream.finish_time = datetime.datetime.now() stream.save(update_fields=['save_replay_url', 'is_finish', 'finish_time']) stream.create_topic() stream.clear_redis_info() create_stream_replay_danmu(stream.id) except: logging_exception() @shared_task def check_live_status(): mac = pili.Mac(settings.QINIU_ACCESS_KEY, settings.QINIU_SECRET_KEY) client = pili.Client(mac) hub = client.hub(settings.QINIU_HUB_NAME) streams = LiveStream.objects.filter(status=True) for stream in streams: try: # qiniu s = hub.get(stream.stream_key.encode()) s.status() except pili.errors.APIError: logging_exception() stream.set_status(False) except: logging_exception() ss = hub.list(liveonly=True) for i in ss["items"]: key = i.key stream = LiveStream.objects.filter(stream_key=key, status=False).first() if stream: stream.set_status(True) @shared_task def upload_video_picture_to_qiniu(id): video = Video.objects.get(id=id) video_pic = video.get_video_info()['video_pic'] video_pic = fetch_picture_and_save_to_qiniu(video_pic) video.video_pic = video_pic video.save(update_fields=['video_pic']) @shared_task def upload_video_picture_to_qiniu_v2(id): """ upload_video_picture_to_qiniu传入的是一个图片,upload_video_picture_to_qiniu_v2传入的是视频 :param id: :return: """ video = Video.objects.get(id=id) video_url = video.video_url video.video_pic = UploadVideoPicture(video_url) video.save(update_fields=['video_pic']) @shared_task def set_water_mark_to_video(id): video = Video.objects.get(id=id) video.persistent_status = VIDEO_CODE_STATUS.WAITING url = video.video_url if not isinstance(url, bytes): url = url.encode('utf-8') pid = set_video_watermark( video.video_url, hashlib.md5(url).hexdigest() + '.mp4', water_mark_url=settings.WATER_MARK_URL_FOR_VIDEO ) video.persistentId = pid video.save(update_fields=['persistentId', 'persistent_status']) @shared_task def check_water_mark_video_is_finish(): videos = Video.objects.filter(persistent_status=VIDEO_CODE_STATUS.WAITING) for video in videos: try: if video.persistentId: result = json.loads(requests.get('http://api.qiniu.com/status/get/prefop?id=' + video.persistentId).text) # 状态码0成功,1等待处理,2正在处理,3处理失败,4通知提交失败。 # 如果请求失败,返回包含如下内容的JSON字符串{"error": "<ErrMsg string>"} code = result.get('code') error = result.get('error', '') if error: continue if code not in (0, 3): continue elif code == 3: video.persistent_status = VIDEO_CODE_STATUS.FAIL video.save(update_fields=['persistent_status']) else: if result['items'][0]['code'] == 0: water_url = result['items'][0]['key'] # 刷新cdn缓存 QiniuTool.refresh_qiniu_resource_cache([urljoin(settings.VIDEO_HOST, water_url)]) video.water_url = water_url video.persistent_status = VIDEO_CODE_STATUS.SUCCESS video.save(update_fields=['persistent_status', 'water_url']) else: video.persistent_status = result['items'][0]['code'] video.save(update_fields=['persistent_status']) except: logging_exception() @shared_task def set_stream_to_redis(): channels = LiveChannel.objects.all() for channel in channels: stream = channel.now_stream if stream: live_msg_cache.set('channel' + str(stream.channel.id), json.dumps(stream.data_for_redis())) @shared_task def add_test_redis_msg(channel_id, count, msg=None): if not msg: msg = { 'id': 0, # 用来占位 'text': 'hellow', 'name': '1111111', 'type': '2', 'user_id': 0 } stream = LiveChannel.objects.get(id=channel_id).now_stream for i in range(0, count): key = LiveMsgManager.get_live_redis_key(stream.id) msg['text'] = str(i) live_msg_cache.rpush(key, json.dumps(msg)) @shared_task def add_test_redis_like_msg(channel_id, count): msg = { 'id': 0, # 用来占位 'text': 'live_like', 'name': 'live_like', 'type': '3', 'user_id': 0 } add_test_redis_msg(channel_id, count, msg) stream = LiveChannel.objects.get(id=channel_id).now_stream for i in range(0, count): key = LiveMsgManager.get_live_like_redis_key(stream.id) timestamp = int(time.time()) - count + i w_str = 'live_like' + str(i) live_msg_cache.zadd(key, timestamp, w_str) @shared_task def calc_sleep_user_for_prepare(stream_id): fake_fan_manager = FakeFansManager(stream_id) k = random.uniform(0.3, 0.5) fake_fan_manager.init_random_num(k) max_num = int(1129 * k) t1_max_num = int(max_num / 3) t2_max_num = t1_max_num * 2 action_user = fake_fan_manager.get_sleep_action_fans() no_action_user = fake_fan_manager.get_sleep_noaction_fans() random.shuffle(action_user) random.shuffle(no_action_user) # init all data sleep_user = action_user[0:t1_max_num] + no_action_user[0:t2_max_num] random.shuffle(sleep_user) fake_fan_manager.init_live_enter_person_num() fake_fan_manager.set_live_action_user(action_user[t1_max_num:t1_max_num + 5]) fake_fan_manager.set_live_mix_action_user(sleep_user) calc_add_sleep_user_fans.apply_async( args=(stream_id,), countdown=2 * 60 ) @shared_task def calc_add_sleep_user_fans(stream_id): fake_fan_manager = FakeFansManager(stream_id) stream = LiveStream.objects.get(id=stream_id) if stream.live_status_stream: # 每隔2min 添加人数 num = fake_fan_manager.get_live_enter_person_num() k = fake_fan_manager.get_random_num() add_num = int(num * k) fan_ids = fake_fan_manager.get_live_mix_action_user(add_num) calc_add_sleep_user_fans.apply_async( args=(stream_id,), countdown=2 * 60 ) add_sleep_user_fans.apply_async( args=(stream_id, fan_ids), countdown=5 * 60 ) else: # 结束 fan_ids = fake_fan_manager.get_live_action_user(5) add_sleep_user_fans(stream_id, fan_ids) @shared_task def add_sleep_user_fans(stream_id, ids): stream = LiveStream.objects.get(id=stream_id) user = UserService.get_user_by_person_id(person_id=stream.channel.person_id) social = SocialInfo(user.id) for id in ids: social.add_fans(id) class FakeFansManager(object): def __init__(self, stream_id): self.sleep_action_fans_cache = sleep_action_fans_cache self.live_other_kv_cache = live_other_kv_cache self.sleep_noaction_fans_cache = sleep_noaction_fans_cache self.stream_id = stream_id @staticmethod def _get_live_add_person_random_key(stream_id): return str(stream_id) + ':k' @staticmethod def _get_live_enter_person_key(stream_id): return str(stream_id) + ':person_num' @staticmethod def add_live_enter_person_num_for_sleep_user(stream_id): live_other_kv_cache.incr(FakeFansManager._get_live_enter_person_key(stream_id), 1) @staticmethod def _get_live_sleep_user_key(stream_id): return str(stream_id) + ':noaction' @staticmethod def _get_live_action_user_key(stream_id): return str(stream_id) + ':action' def init_live_enter_person_num(self): self.live_other_kv_cache.setex(self._get_live_enter_person_key(self.stream_id), 12 * 3600, 0) def get_live_enter_person_num(self): num = int(self.live_other_kv_cache.get(self._get_live_enter_person_key(self.stream_id))) self.init_live_enter_person_num() return num def set_live_action_user(self, action_user): self.live_other_kv_cache.lpush( self._get_live_action_user_key( self.stream_id), *action_user) def get_live_action_user(self, num): users = [] for i in range(0, num): users.append(self.live_other_kv_cache.lpop( self._get_live_action_user_key(self.stream_id))) return users def set_live_mix_action_user(self, noaction_user): self.live_other_kv_cache.lpush( self._get_live_sleep_user_key(self.stream_id), *noaction_user) def get_live_mix_action_user(self, num): users = [] for i in range(0, num): users.append(self.live_other_kv_cache.lpop( self._get_live_sleep_user_key( self.stream_id))) return users def init_random_num(self, k): self.live_other_kv_cache.setex( self._get_live_add_person_random_key(self.stream_id), 12 * 3600, k) def get_random_num(self): return float(self.live_other_kv_cache.get( self._get_live_add_person_random_key(self.stream_id))) def set_all_sleep_user(self, action_user, sleep_list): self.sleep_action_fans_cache.set('sleep_user', json.dumps(action_user)) self.sleep_noaction_fans_cache.set('sleep_user', json.dumps(sleep_list)) def get_sleep_action_fans(self): return json.loads(self.sleep_action_fans_cache.get('sleep_user')) def get_sleep_noaction_fans(self): return json.loads(self.sleep_noaction_fans_cache.get('sleep_user')) def add_fake_vote_num(stream_id): stream_id = str(stream_id) num = live_fake_vote_cache.get(stream_id) if num is None: num = 0 live_fake_vote_cache.setex(stream_id, 60 * 10, int(num) + int(random.randint(30, 50))) @shared_task def add_fake_vote_num_list(): now = datetime.datetime.now() - datetime.timedelta(days=1) streams = LiveStream.objects.filter(status=True, created_time__gte=now) for ss in streams: add_fake_vote_num(ss.id) @shared_task def get_qiniu_persistent_ids(source_id, video_type, url_list): """ 获取 视频资源处理,七牛返回的 persistent_id :param source_id: 数据来源id :param video_type: 数据来源类型 :param url_list: 待处理的视频列表 :return: """ def cleaned_video_url(video_url): return video_url.replace(settings.VIDEO_HOST, '') def get_new_video_name(url): if not isinstance(url, bytes): url = url.encode('utf-8') return hashlib.md5(url).hexdigest() + '.mp4' for _video_url in url_list: video_url = cleaned_video_url(_video_url) new_name = get_new_video_name(video_url) pid = set_video_watermark( video_url, new_name, water_mark_url=settings.WATER_MARK_URL_FOR_VIDEO ) video_obj, status = VideoLibrary.objects.get_or_create(**{ "source_id": source_id, "video_type": video_type, "raw_video_url": video_url, }) video_obj.persistent_id = pid video_obj.persistent_status = VIDEO_CODE_STATUS.WAITING video_obj.save(update_fields=['persistent_id', 'persistent_status']) v = "{_id}:{_type}".format(_id=source_id, _type=video_type) community_video_handle_cache.sadd("video_handle_cache", v) # 把要处理的字段缓存,用于异步处理 @shared_task def check_community_video_water_mark_url_is_finish(): videos = VideoLibrary.objects.filter(persistent_status=VIDEO_CODE_STATUS.WAITING) for video in videos: try: if video.persistent_id: result = json.loads(requests.get('http://api.qiniu.com/status/get/prefop?id=' + video.persistent_id).text) code = result['code'] if code != QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS: continue else: if result['items'][0]['code'] == QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS: water_video_url = result['items'][0]['key'] # 刷新cdn缓存 QiniuTool.refresh_qiniu_resource_cache([urljoin(settings.VIDEO_HOST, water_video_url)]) video.water_video_url = water_video_url video.persistent_status = VIDEO_CODE_STATUS.SUCCESS video.save(update_fields=['persistent_status', 'water_video_url']) else: continue except: logging_exception() @shared_task def video_fsm_runner(): """ 状态机 考虑封装成类,根据状态给对应worker添加任务 :return: """ # 截取视频前两秒 clipping_id_list = list(Video.objects.filter( persistent_clip_status=VIDEO_CODE_STATUS.NOSTART ).values_list("id", flat=True)) for video_id in clipping_id_list: set_clipping_to_video.delay(video_id) # 检查七牛云截取状态 check_id_list = list(Video.objects.filter( persistent_clip_status=VIDEO_CODE_STATUS.WAITING ).values_list("id", flat=True)) for video_id in check_id_list: check_video_clipping_is_finish.delay(video_id) # 将七牛云截取成功的 mp4 转换为 webP 动图并上传 set_id_list = list(Video.objects.filter( persistent_clip_status=VIDEO_CODE_STATUS.OPERATING_LOCAL ).values_list("id", flat=True)) for video_id in set_id_list: set_video_webp_pic.delay(video_id) @shared_task def set_clipping_to_video(video_id): video = Video.objects.get(id=video_id) if video.persistent_clip_status != VIDEO_CODE_STATUS.NOSTART: return hash_key = hashlib.md5(str(time.time()).encode("utf8")).hexdigest() pid = video_clipping( video.video_url, new_filename='{}_clip.mp4'.format(hash_key), video_type='mp4', water_mark_url=None # settings.WATER_MARK_URL_FOR_VIDEO ) video.persistent_clip_status = VIDEO_CODE_STATUS.WAITING video.persistent_clip_id = pid video.save(update_fields=['persistent_clip_id', 'persistent_clip_status']) @shared_task def check_video_clipping_is_finish(video_id): try: video = Video.objects.get(id=video_id) if video.persistent_clip_status != VIDEO_CODE_STATUS.WAITING: return if video.persistent_clip_id: result = json.loads(requests.get( settings.QINIU_VIDEO_INQUIRE_HOST + video.persistent_clip_id ).text) # 状态码0成功,1等待处理,2正在处理,3处理失败,4通知提交失败。 # 如果请求失败,返回包含如下内容的JSON字符串{"error": "<ErrMsg string>"} code = result.get('code') error = result.get('error', '') if error: return if code not in (QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS, QINIU_VIDEO_HANDLE_STATUS_CODE.PROCESSING_FAIL): return elif code == QINIU_VIDEO_HANDLE_STATUS_CODE.PROCESSING_FAIL: video.persistent_clip_status = VIDEO_CODE_STATUS.FAIL video.save(update_fields=['persistent_clip_status']) else: if result['items'][0]['code'] == QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS: mp4_key = result['items'][0]['key'] # 刷新cdn缓存 QiniuTool.refresh_qiniu_resource_cache([ urljoin(settings.VIDEO_HOST, mp4_key) ]) video.intercept_video_url = mp4_key video.persistent_clip_status = VIDEO_CODE_STATUS.OPERATING_LOCAL video.save(update_fields=['persistent_clip_status', 'intercept_video_url']) else: video.persistent_status = result['items'][0]['code'] video.save(update_fields=['persistent_clip_status']) except: logging_exception() @shared_task def set_video_webp_pic(video_id): video = Video.objects.get(id=video_id) if video.persistent_clip_status != VIDEO_CODE_STATUS.OPERATING_LOCAL: return mp4_key = video.intercept_video_url hash_key = hashlib.md5(str(time.time()).encode("utf8")).hexdigest() input_path = os.path.join(settings.VIDEO_CONVERT_PATH, "{}.mp4".format(hash_key)) output_path = os.path.join(settings.VIDEO_CONVERT_PATH, '{}.webp'.format(hash_key)) try: clipping_video_url = settings.VIDEO_HOST + mp4_key res = requests.get(clipping_video_url) # 文件存储到本地 with open(input_path, 'wb') as f: f.write(res.content) # 使用底层 FFmpeg 库进行转码 ff = ffmpy.FFmpeg( inputs={input_path: None}, outputs={output_path: settings.VIDEO_FF_ARGS} ) ff.run() # 上传webP图片 video.webp_url = upload_file(output_path) video.persistent_clip_status = VIDEO_CODE_STATUS.SUCCESS video.save(update_fields=['webp_url', 'persistent_clip_status']) # 删除中间数据 video_delete(mp4_key) # 刷新缓存 QiniuTool.refresh_qiniu_resource_cache( [urljoin(settings.VIDEO_HOST, mp4_key)] ) except: logging_exception() video.persistent_clip_status = VIDEO_CODE_STATUS.FAIL video.save(update_fields=['persistent_clip_status']) finally: if os.path.exists(input_path): os.remove(input_path) if os.path.exists(output_path): os.remove(output_path) @shared_task def push_follow_user(user_id, channel_id, user_nick_name=""): push_msg = "你关注的{nick_name}正在直播,速来围观>>>".format(nick_name=user_nick_name) try: user_ids = get_rpc_invoker()['api/user/get_all_fans'](user_id=user_id).unwrap() except Exception: logging_exception() return push_url = GmProtocol().get_live_list(channel_id) p_type = AUTOMATED_PUSH.FOLLOWED_USER_POSTED_LIVE kwargs = { "user_ids": user_ids, "platform": ['android', 'iPhone'], "alert": push_msg, "extra": { 'type': PUSH_INFO_TYPE.GM_PROTOCOL, 'msgType': 4, 'pushUrl': push_url, 'push_url': push_url, }, "push_type": p_type, } push_task_to_user_multi(**kwargs) @shared_task def add_live_robot_comment(stream, comment_count=None) -> None: """ 给直播发送机器人弹幕 """ # 15秒内有评论就不给发了 msg = LiveMsg.objects.filter(stream_id=stream.id, type=LIVE_MSG_TYPE.TEXT_MSG).last() if msg and (datetime.datetime.now() - msg.created_time).seconds < 15: return # 找到假用户 user_id = live_fake_comment_cache.rpop(str(stream.id)) if not user_id: return # 找到假评论 turns = live_fake_comment_turns_cache.incr(str(stream.id)) if turns == 1: # 第一次设置过期时间 live_fake_comment_turns_cache.expire(str(stream.id), 60*60*24) if comment_count is None: comment_count = LiveComment.objects.filter(is_deleted=False).count() if not comment_count: # 没有弹幕 return start = (turns - 1) % comment_count comment = LiveComment.objects.filter(is_deleted=False).order_by("weight")[start:start + 1].first() puppet_user = UserService.get_user_by_user_id(user_id=user_id) # 发送评论 live_msg_manager = LiveMsgManager() live_msg_manager.add_send_msg(stream, puppet_user, msg=comment.comment, is_vest_send=True) info_logger.info("add_live_robot_comment: stream:{stream}, puppet_user:{puppet_user}, msg:{msg}".format( stream=stream.id, puppet_user=puppet_user.nickname, msg=comment.comment )) @shared_task def add_live_robot_user_enter(stream_id: int, run_num=None) -> None: """ 给直播发送机器人进入动作 """ # 直播结束就不给发了 stream = LiveStream.objects.using(settings.SLAVE_DB_NAME).filter(id=stream_id).last() if stream.is_finish: return # 防止任务堆积处理, 限流 current_time = int(time.time()) expire_time = 60 * 60 * 24 key = "last-fake-enter-time:{}".format(stream_id) last_enter_time = live_robot_ratelimit_cache.get(key) # 上一次机器人进入小于 10s 则跳过 if last_enter_time and (current_time - int(last_enter_time) < 10): return live_robot_ratelimit_cache.set(key, current_time, expire_time) # 产品逻辑:机器人每10-30秒触发2-4次 if not run_num: run_num = random.randrange(1, 3) # 找到假用户 rpc = get_current_rpc_invoker() user_ids = rpc['hera/zhubo/get_puppet_user_ids'](num=run_num).unwrap() puppet_users = UserService.get_users_by_user_ids(user_ids=user_ids) # 添加进入动作 live_msg_manager = LiveMsgManager() for puppet_user in puppet_users.values(): live_msg_manager.add_enter_msg(stream, puppet_user) info_logger.info("add_live_robot_user_enter: stream:{stream}, puppet_user:{puppet_user}, msg:{msg}".format( stream=stream.id, puppet_user=puppet_user.nickname, msg="进入直播间" )) # 把用户存起来用作评论 live_fake_comment_cache.lpush(str(stream.id), *user_ids, 60*60*24) @shared_task def add_live_robot(): two_day_ago = datetime.datetime.now() - datetime.timedelta(days=1) streams = LiveStream.objects.using(settings.SLAVE_DB_NAME).filter( is_finish=False, created_time__gt=two_day_ago ) stream_ids = [stream.id for stream in streams] if not stream_ids: return comment_total_count = LiveComment.objects.filter(is_deleted=False).count() for stream in streams: countdown = random.randrange(1, 20) add_live_robot_user_enter.apply_async(args=[stream.id], countdown=countdown) add_live_robot_comment(stream, comment_total_count) info_logger.info("add_live_robot: stream_ids: {stream_ids} success".format(stream_ids=stream_ids))