1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
import ffmpy
import hashlib
import json
import requests
import random
import datetime
from django.conf import settings
from django.db.models import F
from celery import shared_task
# from celery_once import QueueOnce
from urllib.parse import urljoin
from gm_types.push import PUSH_INFO_TYPE, AUTOMATED_PUSH
from gm_types.mimas import TRACTATE_PLATFORM, TRACTATE_DATA_TYPE, TRACTATE_CONTENT_LEVEL, APPLET_PAGE_FROM, \
APPLET_SUBSCRIBE_MSG_TYPE, TRACATE_VIDEO_URL_SOURCE, PGC_TYPE, TRACTATE_STATUS
from gm_types.gaia import (
VIDEO_CODE_STATUS,
QINIU_VIDEO_HANDLE_STATUS_CODE,
LIST_TRACTATE_FROM,
)
from gm_upload import (
set_video_watermark,
fetch_picture_and_save_to_qiniu,
video_clipping,
upload_file,
video_delete,
)
from gm_upload.utils.qiniu_tool import QiniuTool
from gm_upload.utils.image_utils import Picture
from talos.libs.image_utils import fetch_picture_and_save_to_qiniu_v2
from talos.logger import info_logger
from talos.models.soft_article.soft_article import SoftArticleVideo
from talos.models.soft_article.reply import SoftArticleReply
from talos.models.tractate.vote import TractateReplyVote
from talos.models.tractate.tractate import TractateTag
from talos.models.tractate import TractateVideo, TractateExtra, TractateVote, Tractate, TractateReply, \
TractateReplyImages
from talos.rpc import logging_exception
from utils.common import get_new_video_name, replace_video_url_for_rich_text
from utils.push import push_task_to_user_multi, vote_push, send_applet_subscribe_msg
from utils.protocol import gm_protocol
from talos.cache.base import (
tractate_pv_cache,
tractate_vote_count_cache,
vote_cache,
pgc_tractate_cache
)
from talos.services import UserService
from talos.tasks import get_sleep_user
from talos.tools.vote_tool import VoteTool
from talos.cache.base import fake_vote_cache
from communal.normal_manager import (
tag_manager,
)
from talos.rpc import get_current_rpc_invoker
from talos.models.tractate import TractateScore
from communal.tasks import intelligent_push_task
def fake_view_num(start, end):
"""
帖子的浏览量灌水
:param start:
:param end:
:return:
"""
return random.randint(start, end)
def upload_new_image_and_get_base_info(video_url):
result = {
"video_cover_url": "",
"width": 0,
"height": 0,
}
_video_cover_url = "{url}{params}".format(
url=urljoin(settings.VIDEO_HOST, video_url), params=settings.VIDEO_PIC_URL)
try:
new_video_caver_url = fetch_picture_and_save_to_qiniu_v2(_video_cover_url)
_base_data = Picture.get_image_base_info(new_video_caver_url)
if not all(_base_data.values()):
_base_data = {
"width": 0,
"height": 0,
}
result["video_cover_url"] = new_video_caver_url
result.update(_base_data)
return result
except: # 上传失败则返回默认值
return result
@shared_task
def set_tractate_rich_text_water_video_url(tractate_id, raw_video_url, water_video_url):
try:
tractate_obj = Tractate.objects.get(id=tractate_id)
except:
tractate_obj = None
if not tractate_obj:
return
rich_text, _ = replace_video_url_for_rich_text(tractate_obj.content, {raw_video_url: water_video_url})
tractate_obj.content = rich_text
tractate_obj.save(update_fields=["content"])
@shared_task
def set_tractate_video_water_mark_url(video_id):
video = TractateVideo.objects.get(id=video_id)
raw_video_url = video.raw_video_url
pid = set_video_watermark(
raw_video_url,
get_new_video_name(raw_video_url),
water_mark_url=settings.WATER_MARK_URL_FOR_VIDEO
)
video.persistent_id = pid
video.persistent_status = VIDEO_CODE_STATUS.WAITING
video.save(update_fields=["persistent_id", "persistent_status"])
@shared_task
def check_tractate_video_water_mark_url_is_finish():
videos = TractateVideo.objects.filter(persistent_status=VIDEO_CODE_STATUS.WAITING)
for video in videos:
_pid = video.persistent_id
try:
if _pid:
result = json.loads(requests.get('http://api.qiniu.com/status/get/prefop?id=' + _pid).text)
# 状态码0成功,1等待处理,2正在处理,3处理失败,4通知提交失败。
# 如果请求失败,返回包含如下内容的JSON字符串{"error": "<ErrMsg string>"}
code = result.get('code')
error = result.get('error', '')
if error:
continue
if code != QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS:
continue
else:
if result['items'][0]['code'] == QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS:
water_video_url = result['items'][0]['key']
# 刷新cdn缓存
QiniuTool.refresh_qiniu_resource_cache([urljoin(settings.VIDEO_HOST, water_video_url)])
# 上传一下有水印的视频第一帧图片
others = upload_new_image_and_get_base_info(water_video_url)
if others.get("video_cover_url", ""):
for k, v in others.items():
setattr(video, k, v)
video.water_video_url = water_video_url
video.persistent_status = VIDEO_CODE_STATUS.SUCCESS
video.save()
# 如果视频来源于帖子富文本中,则更新帖子富文本中的视频地址
if video.video_url_source == TRACATE_VIDEO_URL_SOURCE.RICH_TEXT:
set_tractate_rich_text_water_video_url.delay(video.tractate_id, video.raw_video_url, water_video_url)
else:
continue
except:
logging_exception()
@shared_task
def tractate_fake_vote(
repeat_times, tractate_id, need_view_increase=True, incr_range=[1, 1], alert='', force_push=False
):
"""
异步增加虚拟点赞量、浏览量
:param repeat_times:
:param tractate_id:
:param incr_range:
:param need_view_increase:
:param alert:
:return:
"""
now_date = datetime.datetime.now()
if not 9 <= now_date.hour < 22: # 超过晚十点灌水任务不执行
return
tractate_fake_vote_key = "tractate_fake_vote_{id}".format(id=tractate_id)
is_set = fake_vote_cache.set(tractate_fake_vote_key, 1, nx=True, ex=30)
if not is_set:
return
tractate = Tractate.objects.get(id=tractate_id)
if tractate.user_del or not tractate.is_online or tractate.content_level == TRACTATE_CONTENT_LEVEL.BAD:
info_logger.info('帖子:{}已下线, 不执行点赞灌水.'.format(tractate.id))
return
users = get_sleep_user(repeat_times)
cnt = 0
for u in users:
u_nick_name = ''
tractate_vote, created = TractateVote.objects.get_or_create(user_id=u, tractate_id=tractate.id, is_fake=True)
if created:
# 对作者增加点赞数
author = UserService.get_user_by_user_id(tractate.user_id)
author.incr_vote_count()
cnt += 1
if not u_nick_name:
vote_user = UserService.get_user_by_user_id(u)
u_nick_name = vote_user.nickname or u'更美用户'
vote_tool = VoteTool(redis_c=vote_cache, user_id=tractate.user_id, new_version=True)
vote_tool.receive_tractate_vote(tractate_vote.id)
# 点赞推送
push_url = gm_protocol.get_tractate_detail(
tractate_id=tractate.id,
tractate_detail_from=LIST_TRACTATE_FROM.NOTICE_VOTE
)
if not alert:
alert = u'{user_name}赞了你的帖子{content}'.format(
user_name=str(u_nick_name), content=tractate.content[:10])
if force_push:
vote_push(user_id=tractate.user_id, alert=alert, push_type=AUTOMATED_PUSH.TRACTATE_GET_VOTED, push_url=push_url)
if not need_view_increase:
return
# 增加可视点赞数
te, _ = TractateExtra.objects.get_or_create(tractate_id=tractate.id)
te.favor_count = F('vote_count') + cnt
te.save(update_fields=['vote_count'])
tractate_vote_count_cache.incrby(str(tractate.id), cnt)
# 每个点赞增加随机浏览量
view_num = 0
for i in range(cnt):
view_num += random.randint(*incr_range)
# 增加可视浏览数
tractate_pv_cache.incrby(str(tractate.id), view_num)
@shared_task
def reply_push(user_id, tractate_id, reply_id, content=None):
"""
:param user_id: 回复人的user_id
:param tractate_id:
:param reply_id: 发出的那条回复ID
:param content: 评论的内容
:return:
"""
if not tractate_id and not reply_id:
return
try:
reply_info = TractateReply.objects.get(id=reply_id)
except TractateReply.DoesNotExist:
return
if reply_info.replied_id:
try:
replied_info = TractateReply.objects.get(id=reply_info.replied_id)
except TractateReply.DoesNotExist:
return
push_user_id = replied_info.user_id
push_type = AUTOMATED_PUSH.TRACTATE_REPLY_GET_REPLY
content_id = replied_info.id # 二级评论传给demeter的id是二级评论id
push_image = '' # 帖子评论暂时没有图片
else:
try:
tractate = Tractate.objects.get(id=tractate_id)
except Tractate.DoesNotExist:
return
push_user_id = tractate.user_id
images = list(TractateReplyImages.objects.using(settings.SLAVE_DB_NAME).filter(
reply_id=reply_id,
).values_list('image_url', flat=True))
push_image = images and Picture(images[0]).watermarked or ''
content_id = tractate.id
push_type = AUTOMATED_PUSH.TRACTATE_GET_REPLY
push_url = gm_protocol.get_user_answer_list()
user = UserService.get_user_by_user_id(user_id=user_id)
alert = u'@{}:{}...'.format(user.nickname if user else '更美用户', content[:25])
intelligent_push_task.apply_async(
args=(
content_id, [push_user_id], push_type,
{
'type': PUSH_INFO_TYPE.GM_PROTOCOL,
'pushUrl': push_url,
'push_url': push_url,
'image': push_image,
'push_image': push_image,
}
),
kwargs={
"platform": None,
"alert": alert,
"others": {
"title": "你收到了一条新评论",
"alert": alert,
},
"labels": {
'event_type': 'push',
'event': 'tractate_received_reply'
},
},
)
@shared_task
def set_softarticle_video_water_mark_url(video_id):
# 医生后台帖子封面图修改
video = SoftArticleVideo.objects.get(id=video_id)
raw_video_url = video.raw_video_url
pid = set_video_watermark(
raw_video_url,
get_new_video_name(raw_video_url),
water_mark_url=settings.WATER_MARK_URL_FOR_VIDEO
)
video.persistent_id = pid
video.persistent_status = VIDEO_CODE_STATUS.WAITING
video.save(update_fields=["persistent_id", "persistent_status"])
@shared_task
def check_soft_article_video_water_mark_url_is_finish():
videos = SoftArticleVideo.objects.filter(persistent_status=VIDEO_CODE_STATUS.WAITING)
for video in videos:
_pid = video.persistent_id
try:
if _pid:
result = json.loads(requests.get('http://api.qiniu.com/status/get/prefop?id=' + _pid).text)
# 状态码0成功,1等待处理,2正在处理,3处理失败,4通知提交失败。
# 如果请求失败,返回包含如下内容的JSON字符串{"error": "<ErrMsg string>"}
code = result.get('code')
error = result.get('error', '')
if error:
continue
if code != QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS:
continue
else:
if result['items'][0]['code'] == QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS:
water_video_url = result['items'][0]['key']
# 刷新cdn缓存
QiniuTool.refresh_qiniu_resource_cache([urljoin(settings.VIDEO_HOST, water_video_url)])
# 上传一下有水印的视频第一帧图片
others = upload_new_image_and_get_base_info(water_video_url)
if others.get("video_cover_url", ""):
for k, v in others.items():
setattr(video, k, v)
video.water_video_url = water_video_url
video.persistent_status = VIDEO_CODE_STATUS.SUCCESS
video.save()
else:
continue
except:
logging_exception()
@shared_task
def doctor_reply_push(soft_article_id, reply_id):
"""
:param soft_article_id:
:param reply_id: 发出的那条回复ID
:return:
"""
if not soft_article_id and not reply_id:
return
try:
reply_info = SoftArticleReply.objects.get(id=reply_id)
except SoftArticleReply.DoesNotExist:
return
if reply_info.replied_id:
try:
replied_info = SoftArticleReply.objects.get(id=reply_info.replied_id, source_id=TRACTATE_PLATFORM.GM)
except SoftArticleReply.DoesNotExist:
return
user = UserService.get_user_by_user_id(user_id=reply_info.user_id)
if not user:
return
push_user_id = replied_info.user_id
content = reply_info.content if len(reply_info.content) >= 10 else reply_info.content[:10]
push_msg = "{user_name}回复了你的回复{content}".format(user_name=user.nickname, content=content)
push_type = AUTOMATED_PUSH.TRACTATE_REPLY_GET_REPLY
else:
return
push_url = gm_protocol.get_tractate_detail(
comment_id=reply_id,
tractate_id=replied_info.softarticle_id,
data_type=TRACTATE_DATA_TYPE.DOCTOR,
tractate_detail_from=LIST_TRACTATE_FROM.NOTICE_REPLY
) if reply_id else gm_protocol.get_comment_detail(tractate_id=replied_info.softarticle_id)
extra = {
'type': PUSH_INFO_TYPE.GM_PROTOCOL,
'msgType': 4,
'pushUrl': push_url,
'push_url': push_url,
}
push_task_to_user_multi(
user_ids=[push_user_id], push_type=push_type,
extra=extra,
labels={'event_type': 'push', 'event': 'question_received_answer'},
alert=push_msg,
)
@shared_task
def tractate_video_fsm_runner():
"""
状态机
考虑封装成类,根据状态给对应worker添加任务
:return:
"""
# 截取视频前两秒
clipping_id_list = list(TractateVideo.objects.filter(
persistent_clip_status=VIDEO_CODE_STATUS.NOSTART
).values_list("id", flat=True))
for video_id in clipping_id_list:
set_tractate_video_clipping_to_video.delay(video_id)
# 检查七牛云截取状态
check_id_list = list(TractateVideo.objects.filter(
persistent_clip_status=VIDEO_CODE_STATUS.WAITING
).values_list("id", flat=True))
for video_id in check_id_list:
check_tractate_video_clipping_is_finish.delay(video_id)
# 将七牛云截取成功的 mp4 转换为 webP 动图并上传
set_id_list = list(TractateVideo.objects.filter(
persistent_clip_status=VIDEO_CODE_STATUS.OPERATING_LOCAL
).values_list("id", flat=True))
for video_id in set_id_list:
set_tractate_video_webp_pic.delay(video_id)
@shared_task
def set_tractate_video_clipping_to_video(video_id):
video = TractateVideo.objects.get(id=video_id)
if video.persistent_clip_status != VIDEO_CODE_STATUS.NOSTART:
return
hash_key = hashlib.md5(str(time.time()).encode("utf8")).hexdigest()
pid = video_clipping(
video.raw_video_url,
new_filename='{}_clip.mp4'.format(hash_key),
video_type='mp4',
water_mark_url=None,
start_time=3,
# water_mark_url=settings.WATER_MARK_URL_FOR_VIDEO
)
video.persistent_clip_status = VIDEO_CODE_STATUS.WAITING
video.persistent_clip_id = pid
video.save(update_fields=['persistent_clip_id', 'persistent_clip_status'])
@shared_task
def check_tractate_video_clipping_is_finish(video_id):
try:
video = TractateVideo.objects.get(id=video_id)
if video.persistent_clip_status != VIDEO_CODE_STATUS.WAITING:
return
if video.persistent_clip_id:
result = json.loads(requests.get('{}{}'.format(
settings.QINIU_VIDEO_INQUIRE_HOST,
video.persistent_clip_id
)).text)
# 状态码0成功,1等待处理,2正在处理,3处理失败,4通知提交失败。
# 如果请求失败,返回包含如下内容的JSON字符串{"error": "<ErrMsg string>"}
code = result.get('code')
error = result.get('error', '')
if error:
return
if code not in (
QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS,
QINIU_VIDEO_HANDLE_STATUS_CODE.PROCESSING_FAIL
):
return
elif code == QINIU_VIDEO_HANDLE_STATUS_CODE.PROCESSING_FAIL:
video.persistent_clip_status = VIDEO_CODE_STATUS.FAIL
video.save(update_fields=['persistent_clip_status'])
else:
if result['items'][0]['code'] == QINIU_VIDEO_HANDLE_STATUS_CODE.SUCCESS:
mp4_key = result['items'][0]['key']
# 刷新cdn缓存
QiniuTool.refresh_qiniu_resource_cache([urljoin(settings.VIDEO_HOST, mp4_key)])
video.intercept_video_url = mp4_key
video.persistent_clip_status = VIDEO_CODE_STATUS.OPERATING_LOCAL
video.save(update_fields=['persistent_clip_status', 'intercept_video_url'])
# set_tractate_video_webp_pic(video.id)
else:
video.persistent_status = result['items'][0]['code']
video.save(update_fields=['persistent_clip_status'])
except:
logging_exception()
@shared_task
def set_tractate_video_webp_pic(video_id):
video = TractateVideo.objects.get(id=video_id)
if video.persistent_clip_status != VIDEO_CODE_STATUS.OPERATING_LOCAL:
return
mp4_key = video.intercept_video_url
hash_key = hashlib.md5(str(time.time()).encode("utf8")).hexdigest()
input_path = os.path.join(settings.VIDEO_CONVERT_PATH, "{}.mp4".format(hash_key))
output_path = os.path.join(settings.VIDEO_CONVERT_PATH, '{}.webp'.format(hash_key))
try:
clipping_video_url = settings.VIDEO_HOST + mp4_key
res = requests.get(clipping_video_url)
# 文件存储到本地
with open(input_path, 'wb') as f:
f.write(res.content)
# 使用底层 FFmpeg 库进行转码
ff = ffmpy.FFmpeg(
inputs={input_path: None},
outputs={output_path: settings.VIDEO_FF_ARGS}
)
ff.run()
# 上传webP图片
video.webp_url = upload_file(output_path)
video.persistent_clip_status = VIDEO_CODE_STATUS.SUCCESS
video.save(update_fields=['webp_url', 'persistent_clip_status'])
# 删除中间数据
video_delete(mp4_key)
# 刷新缓存
QiniuTool.refresh_qiniu_resource_cache(
[urljoin(settings.VIDEO_HOST, mp4_key)]
)
except:
logging_exception()
video.persistent_clip_status = VIDEO_CODE_STATUS.FAIL
video.save(update_fields=['persistent_clip_status'])
finally:
if os.path.exists(input_path):
os.remove(input_path)
if os.path.exists(output_path):
os.remove(output_path)
@shared_task
def applet_replied_push(reply_id):
"""
帖子的评论被评论,给帖子评论的用户发送小程序推送 自己给自己评论无效
:param reply_id: 发出的那条回复ID
:return:
"""
if not reply_id:
return
try:
reply_info = TractateReply.objects.get(id=reply_id)
except TractateReply.DoesNotExist:
return
if not reply_info.replied_id:
return
try:
replied_info = TractateReply.objects.get(id=reply_info.replied_id)
except TractateReply.DoesNotExist:
return
reply_content = reply_info.content[:20]
nickname = UserService.get_user_by_user_id(reply_info.user_id).nickname[:10]
data = {
"name1": {
"value": nickname
},
"thing2": {
"value": reply_content
},
"date3": {
"value": "{date}".format(date=datetime.datetime.now().strftime('%Y年%m月%d日 %H:%M'))
},
"thing4": {
"value": "点击快速查看>>"
}
}
# 模板id
template_id = settings.APPLET_SUBSCRIBE_MSG.get("comment", "")
# 跳转页面
page = '/packageBBS/pages/posts/main?tractate_id={tractate_id}&top_id={top_id}&reply_id=' \
'{reply_id}&from={from_page}&from_action={from_action}'.format(
tractate_id=reply_info.tractate_id,
top_id=reply_info.top_id,
reply_id=reply_info.id,
from_page=APPLET_PAGE_FROM.CARD,
from_action=APPLET_SUBSCRIBE_MSG_TYPE.COMMENT
)
# 给一级评论用户发
if reply_info.replied_id != reply_info.top_id:
try:
level_1_reply = TractateReply.objects.get(id=reply_info.top_id)
except:
level_1_reply = None
if level_1_reply:
# 自己给自己评论无效
if reply_info.user_id != level_1_reply.user_id:
# 发送小程序推送
send_applet_subscribe_msg(level_1_reply.user_id, template_id, data=data, page=page)
# 给被评论用户发,自己给自己评论无效
if reply_info.user_id == replied_info.user_id:
return
# 发送小程序推送
send_applet_subscribe_msg(replied_info.user_id, template_id, data=data, page=page)
@shared_task
def applet_reply_summary_push(reply_id):
"""
用户评论完24小时后,
如果没有收到评论或点赞,则给用户推送帖子新增评论总数,或帖子的相关内容
如果被赞,且被赞数大于1,则推送新增被赞数
:param reply_id: 当前评论id
:return:
"""
if not reply_id:
return
try:
reply_info = TractateReply.objects.get(id=reply_id)
except TractateReply.DoesNotExist:
return
user_id = reply_info.user_id
# 查询当前用户有没有新的对帖子的评论
new_tractate_reply = TractateReply.objects.using(settings.SLAVE_DB_NAME).filter(id__gt=reply_id, user_id=user_id,
is_online=True).exists()
if new_tractate_reply:
return
replied_count = TractateReply.objects.using(settings.SLAVE_DB_NAME).\
filter(replied_id=reply_id, is_online=True).exclude(user_id=user_id).count()
voted_count = TractateReplyVote.objects.using(settings.SLAVE_DB_NAME).\
filter(tractate_id=reply_info.tractate_id, reply_id=reply_id, is_online=True).exclude(user_id=user_id).count()
tractate_id = reply_info.tractate_id
tractate_content = Tractate.objects.filter(id=tractate_id).first().content[:10]
# 当前评论有被评论或被赞
if replied_count or voted_count:
return
# 帖子新增的一级评论数(不包含自己的)
additional_reply_count = TractateReply.objects.using(settings.SLAVE_DB_NAME).\
filter(id__gt=reply_id, tractate_id=tractate_id, top_id=0, is_online=True).exclude(user_id=user_id).count()
# 有新增评论,发送24小内新增评论总数
if additional_reply_count:
data = {
"thing1": {
"value": tractate_content
},
"thing2": {
"value": "你评论的帖子,新增{reply_count}条吐槽,立即查看".format(reply_count=additional_reply_count)[:20]
}
}
# 模板id
template_id = settings.APPLET_SUBSCRIBE_MSG.get("vote", "")
# 跳转页面
page = '/packageBBS/pages/posts/main?tractate_id={tractate_id}&data_type=user_post&from={from_page}' \
'&from_action={from_action}'\
.format(
tractate_id=reply_info.tractate_id,
from_page=APPLET_PAGE_FROM.CARD,
from_action=APPLET_SUBSCRIBE_MSG_TYPE.NEW_COMMENT
)
# 无新增评论,推送评论的帖子的相同标签下的其它内容详情页
else:
data = {
"thing2": {
"value": tractate_content
},
"thing4": {
"value": "亲!你关注过的话题又有新内容啦>>"
}
}
# 模板id
template_id = settings.APPLET_SUBSCRIBE_MSG.get("recommend", "")
# 获取帖子标签
tag_id_list = TractateTag.objects.filter(tractate_id=tractate_id).values_list('tag_id', flat=True)
tag_info = tag_manager.get_tags_info_by_ids(tag_id_list)
tag_info_list = list(tag_info.values())
tag_name = tag_info_list and tag_info_list[0] and tag_info_list[0].get("tag_name")
if not tag_name:
return
# 调策略 获取相关内容
def get_new_tractate_id(rpc_client, offset):
res = rpc_client['doris/search/query_filter_tractate'](query=tag_name, size=1, offset=offset).unwrap()
new_tractate_id = res and res.get("tractate_id_list") and res.get("tractate_id_list")[0]
return new_tractate_id
rpc_client = get_current_rpc_invoker()
offset = random.randint(0, 200)
origin_offset = offset
new_tractate_id = None
num = 6
while not new_tractate_id and num :
try:
new_tractate_id = get_new_tractate_id(rpc_client, offset)
except:
new_tractate_id = None
offset = origin_offset % num
if num > 1:
offset += random.randint(0, 10)
num -= 1
if not new_tractate_id:
return
# 跳转页面
page = '/packageBBS/pages/posts/main?tractate_id={tractate_id}&data_type=user_post&from={from_page}' \
'&from_action={from_action}' \
.format(
tractate_id=new_tractate_id,
from_page=APPLET_PAGE_FROM.CARD,
from_action=APPLET_SUBSCRIBE_MSG_TYPE.RELATED_CONTENT
)
# 发送小程序推送
send_applet_subscribe_msg(user_id, template_id, data=data, page=page)
@shared_task
def refresh_pgc_info():
"""
定时更新有pgc属性的帖子状态:1. 更新新加入的帖子;2. 更新 smart_rank 值
直接做好排序,写入redis
:return:
"""
tractate_list = list(Tractate.objects.using(settings.SLAVE_DB_NAME).filter(
pgc_type=PGC_TYPE.COMMUNITY,
is_online=True,
status=TRACTATE_STATUS.AUDIT_SUCCESS
).values_list(
'id', 'create_time', 'pgc_type'
))
tractate_info = {
i[0]: {
'tractate_id': i[0],
'tractate_create_time': i[1],
'pgc_type': i[2],
'tractate_score': 0,
'is_online': 1
} for i in tractate_list
}
tractate_ids = [i[0] for i in tractate_list]
start_pos, end_pos = 0, 0
offset = 100
smart_rank_info = []
tractate_num = len(tractate_ids)
while end_pos < tractate_num:
end_pos = start_pos + offset
if end_pos > tractate_num:
end_pos = tractate_num
smart_rank_info.extend(list(TractateScore.objects.filter(
tractate_id__in=tractate_ids[start_pos:end_pos]
).values_list('tractate_id', 'tractate_score')))
start_pos = end_pos
for item in smart_rank_info:
tractate_info[item[0]]['tractate_score'] = item[1]
# 先按发布时间倒叙排列,发布时间相同再按smr分数从高到低排列
scored_list = list(tractate_info.values())
scored_list.sort(key=lambda x: (x['tractate_create_time'], x['tractate_score']), reverse=True)
sorted_ids = json.dumps([i['tractate_id'] for i in scored_list])
info_logger.info("更新pgc帖子缓存:{}".format(sorted_ids))
pgc_tractate_cache.set('pgc_tractate_sorted_ids', sorted_ids, 86400)
pgc_tractate_cache.set('pgc_tractate_update_time', int(time.time()), 86400)