Commit 66c78b42 authored by 吴升宇's avatar 吴升宇

Merge branch 'master' of git.wanmeizhensuo.com:alpha/physical into like-pre/r01

parents e146f846 ab172f78
Pipeline #3954 canceled with stage
...@@ -4,7 +4,7 @@ __pycache__/ ...@@ -4,7 +4,7 @@ __pycache__/
*~ *~
# C extensions # C extensions
*.so *.so
venv
# Distribution / packaging # Distribution / packaging
.Python .Python
.vscode .vscode
......
crontab: crontab:
cp crontab.py /data/log/physical/app/crontab.py && python /data/log/physical/app/crontab.py && python /data/log/physical/app/crontabs.py cp crontab.py /data/log/physical/app/crontab.py && python /data/log/physical/app/crontab.py && python /data/log/physical/app/crontabs.py
celery: celery:
celery -A physical worker -c 1 -Q tapir-alpha -l debug --max-tasks-per-child == 500 celery -A physical worker -c 1 -Q vest -l debug
...@@ -4,7 +4,7 @@ ontime_list = [ ...@@ -4,7 +4,7 @@ ontime_list = [
"0 9 * * * source /srv/envs/physical/bin/activate && python /data/log/physical/app/crontab.py", "0 9 * * * source /srv/envs/physical/bin/activate && python /data/log/physical/app/crontab.py",
"10 9 * * * source /srv/envs/physical/bin/activate && python /data/log/physical/app/crontabs.py", "10 9 * * * source /srv/envs/physical/bin/activate && python /data/log/physical/app/crontabs.py",
"0 9 * * * sh /data/log/cybertron/app/statistics_query.sh > /data/log/cybertron/app/statistics_query.log", "0 9 * * * sh /data/log/cybertron/app/statistics_query.sh > /data/log/cybertron/app/statistics_query.log",
"54 */2 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_click_per_2h_by_post", "54 */1 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_click_per_2h_by_post",
# "*/5 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m true_click_one", # "*/5 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m true_click_one",
# "02,12,22,32,42,52 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es-m true_click_two", # "02,12,22,32,42,52 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es-m true_click_two",
# "00,10,20,30,40,50 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m true_click_three", # "00,10,20,30,40,50 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m true_click_three",
...@@ -16,11 +16,11 @@ ontime_list = [ ...@@ -16,11 +16,11 @@ ontime_list = [
"0 10 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_lunch_app", "0 10 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_lunch_app",
"30 10 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_lunch_app2", "30 10 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_lunch_app2",
# "*/5 * * * 1 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_5m_by_followed", # "*/5 * * * 1 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_5m_by_followed",
"1 */2 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_2h_by_post_and_regist", "1 */1 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_2h_by_post_and_regist",
"0 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m get_login_session", "0 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m get_login_session",
"0 0 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m get_user_id", "0 0 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m get_user_id",
# "0 14,18,22 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m principal_online_comment1", # "0 14,18,22 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m principal_online_comment1",
"25 */2 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_reply_per_2h_to_topic", "25 */1 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_reply_per_2h_to_topic",
"0 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_click_per_1d_by_post", "0 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_click_per_1d_by_post",
"1 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_1d_by_regist", "1 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_1d_by_regist",
"2 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_1d_by_post", "2 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_1d_by_post",
......
...@@ -27,7 +27,7 @@ def get_rand_time(hourlow=0, hourup=13, minutelow=0, minuteup=60): ...@@ -27,7 +27,7 @@ def get_rand_time(hourlow=0, hourup=13, minutelow=0, minuteup=60):
hours = random.randint(hourlow, hourup) hours = random.randint(hourlow, hourup)
minutes = random.randint(minutelow, minuteup) minutes = random.randint(minutelow, minuteup)
# todo redis会自动给加8个小时,所以这边先写死减少8小时 # todo redis会自动给加8个小时,所以这边先写死减少8小时
now_time = NOW + timedelta(hours=hours, minutes=minutes) - timedelta(hours=8) now_time = NOW + timedelta(hours=hours, minutes=minutes)
time = eta_2_push_time(now_time.strftime("%Y-%m-%d %H:%M:%S")) time = eta_2_push_time(now_time.strftime("%Y-%m-%d %H:%M:%S"))
print(datetime.fromtimestamp(time, pytz.timezone('Asia/Shanghai'))) print(datetime.fromtimestamp(time, pytz.timezone('Asia/Shanghai')))
return datetime.fromtimestamp(time, pytz.timezone('Asia/Shanghai')) return datetime.fromtimestamp(time, pytz.timezone('Asia/Shanghai'))
......
...@@ -7,7 +7,7 @@ from libs.cache import redis_client ...@@ -7,7 +7,7 @@ from libs.cache import redis_client
import logging import logging
from linucb.views.linucb import LinUCB from linucb.views.linucb import LinUCB
import json import json
from trans2es.models.tag import TopicTag,Tag from trans2es.models.tag import TopicTag, Tag
from trans2es.models.topic import TopicHomeRecommend from trans2es.models.topic import TopicHomeRecommend
import traceback import traceback
from django.conf import settings from django.conf import settings
...@@ -26,11 +26,11 @@ def loads_data(data): ...@@ -26,11 +26,11 @@ def loads_data(data):
try: try:
result = json.loads(data) result = json.loads(data)
msg = True msg = True
return result,msg return result, msg
except: except:
result = msgpack.loads(data) result = msgpack.loads(data)
msg = False msg = False
return result,msg return result, msg
class KafkaManager(object): class KafkaManager(object):
...@@ -38,7 +38,6 @@ class KafkaManager(object): ...@@ -38,7 +38,6 @@ class KafkaManager(object):
@classmethod @classmethod
def get_kafka_consumer_ins(cls, topic_name=None): def get_kafka_consumer_ins(cls, topic_name=None):
if not cls.consumser_obj: if not cls.consumser_obj:
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
gm_logging_name = settings.KAFKA_GM_LOGGING_TOPIC_NAME gm_logging_name = settings.KAFKA_GM_LOGGING_TOPIC_NAME
...@@ -47,6 +46,7 @@ class KafkaManager(object): ...@@ -47,6 +46,7 @@ class KafkaManager(object):
return cls.consumser_obj return cls.consumser_obj
class CollectData(object): class CollectData(object):
def __init__(self): def __init__(self):
...@@ -68,7 +68,6 @@ class CollectData(object): ...@@ -68,7 +68,6 @@ class CollectData(object):
# 默认 # 默认
self.user_feature = [0, 1] self.user_feature = [0, 1]
def _get_user_linucb_info(self, device_id, linucb_matrix_prefix): def _get_user_linucb_info(self, device_id, linucb_matrix_prefix):
try: try:
redis_key = linucb_matrix_prefix + str(device_id) redis_key = linucb_matrix_prefix + str(device_id)
...@@ -83,7 +82,8 @@ class CollectData(object): ...@@ -83,7 +82,8 @@ class CollectData(object):
return dict() return dict()
def update_recommend_tag_list(self, device_id, user_feature=None, user_id=None, click_topic_tag_list=None, def update_recommend_tag_list(self, device_id, user_feature=None, user_id=None, click_topic_tag_list=None,
new_user_click_tag_list=[], linucb_matrix_prefix=None, linucb_recommend_tag_prefix=None, new_user_click_tag_list=[], linucb_matrix_prefix=None,
linucb_recommend_tag_prefix=None,
linucb_topic_ids_prefix=None, linucb_pictorial_ids_prefix=None): linucb_topic_ids_prefix=None, linucb_pictorial_ids_prefix=None):
try: try:
redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id, linucb_matrix_prefix) redis_linucb_tag_data_dict = self._get_user_linucb_info(device_id, linucb_matrix_prefix)
...@@ -101,13 +101,16 @@ class CollectData(object): ...@@ -101,13 +101,16 @@ class CollectData(object):
if len(recommend_tag_list) > 0: if len(recommend_tag_list) > 0:
tag_recommend_redis_key = linucb_recommend_tag_prefix + str(device_id) tag_recommend_redis_key = linucb_recommend_tag_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list)) redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
redis_client.expire(tag_recommend_redis_key, 30*24*60*60) redis_client.expire(tag_recommend_redis_key, 30 * 24 * 60 * 60)
have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id,user_id,TopicPageType.HOME_RECOMMEND) have_read_topic_id_list = Tools.get_have_read_topic_id_list(device_id, user_id,
TopicPageType.HOME_RECOMMEND)
have_read_lin_pictorial_id_list = Tools.get_have_read_lin_pictorial_id_list(device_id, user_id, have_read_lin_pictorial_id_list = Tools.get_have_read_lin_pictorial_id_list(device_id, user_id,
TopicPageType.HOME_RECOMMEND) TopicPageType.HOME_RECOMMEND)
promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE1_DB_NAME).filter(is_online=1).values_list("topic_id",flat=True) promote_recommend_topic_id_list = TopicHomeRecommend.objects.using(settings.SLAVE1_DB_NAME).filter(
promote_lin_pictorial_id_list = CommunityPictorialHomeFeed.objects.using(settings.SLAVE1_DB_NAME).filter( is_online=1).values_list("topic_id", flat=True)
promote_lin_pictorial_id_list = CommunityPictorialHomeFeed.objects.using(
settings.SLAVE1_DB_NAME).filter(
is_deleted=0, is_online=1).values_list("pictorial_id", flat=True) is_deleted=0, is_online=1).values_list("pictorial_id", flat=True)
have_read_topic_id_list.extend(promote_recommend_topic_id_list) have_read_topic_id_list.extend(promote_recommend_topic_id_list)
have_read_lin_pictorial_id_list.extend(promote_lin_pictorial_id_list) have_read_lin_pictorial_id_list.extend(promote_lin_pictorial_id_list)
...@@ -118,10 +121,11 @@ class CollectData(object): ...@@ -118,10 +121,11 @@ class CollectData(object):
recommend_topic_id_list_click_dict = dict() recommend_topic_id_list_click_dict = dict()
recommend_lin_pictorial_id_list = list() recommend_lin_pictorial_id_list = list()
if click_topic_tag_list and len(click_topic_tag_list)>0: if click_topic_tag_list and len(click_topic_tag_list) > 0:
click_topic_tag_list_same_tagset_ids = get_same_tagset_ids(click_topic_tag_list) click_topic_tag_list_same_tagset_ids = get_same_tagset_ids(click_topic_tag_list)
recommend_topic_id_list_click,recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(click_topic_tag_list_same_tagset_ids, recommend_topic_id_list_click, recommend_topic_id_list_click_dict = ESPerform.get_tag_topic_list_dict(
have_read_topic_id_list,size=2) click_topic_tag_list_same_tagset_ids,
have_read_topic_id_list, size=2)
if len(recommend_topic_id_list_click) > 0: if len(recommend_topic_id_list_click) > 0:
recommend_topic_id_list.extend(recommend_topic_id_list_click) recommend_topic_id_list.extend(recommend_topic_id_list_click)
recommend_topic_id_list_dict.update(recommend_topic_id_list_click_dict) recommend_topic_id_list_dict.update(recommend_topic_id_list_click_dict)
...@@ -145,29 +149,31 @@ class CollectData(object): ...@@ -145,29 +149,31 @@ class CollectData(object):
# b"data"] else [] # b"data"] else []
# cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8")) # cursor = int(str(redis_topic_data_dict[b"cursor"], encoding="utf-8"))
# if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0: # if len(recommend_topic_id_list)==0 and cursor==0 and len(redis_topic_list)>0:
# have_read_topic_id_list.extend(redis_topic_list[:2]) # have_read_topic_id_list.extend(redis_topic_list[:2])
if len(new_user_click_tag_list)>0: if len(new_user_click_tag_list) > 0:
new_user_click_tag_list_same_tagset_ids = get_same_tagset_ids(new_user_click_tag_list) new_user_click_tag_list_same_tagset_ids = get_same_tagset_ids(new_user_click_tag_list)
tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(new_user_click_tag_list_same_tagset_ids, have_read_topic_id_list) tag_topic_id_list, tag_topic_dict = ESPerform.get_tag_topic_list_dict(
recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(new_user_click_tag_list_same_tagset_ids, new_user_click_tag_list_same_tagset_ids, have_read_topic_id_list)
recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(
new_user_click_tag_list_same_tagset_ids,
have_read_lin_pictorial_id_list) have_read_lin_pictorial_id_list)
else: else:
tag_id_list_same_tagset_ids = get_same_tagset_ids(tag_id_list) tag_id_list_same_tagset_ids = get_same_tagset_ids(tag_id_list)
tag_topic_id_list,tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list_same_tagset_ids,have_read_topic_id_list) tag_topic_id_list, tag_topic_dict = ESPerform.get_tag_topic_list_dict(tag_id_list_same_tagset_ids,
have_read_topic_id_list)
recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(tag_id_list_same_tagset_ids, recommend_lin_pictorial_id_list = ESPerform.get_tag_pictorial_id_list(tag_id_list_same_tagset_ids,
have_read_lin_pictorial_id_list) have_read_lin_pictorial_id_list)
if len(recommend_topic_id_list)>0 or len(tag_topic_id_list)>0 or len(new_user_click_tag_list) > 0: if len(recommend_topic_id_list) > 0 or len(tag_topic_id_list) > 0 or len(new_user_click_tag_list) > 0:
tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list tag_topic_id_list = recommend_topic_id_list + tag_topic_id_list
tag_topic_dict.update(recommend_topic_id_list_dict) tag_topic_dict.update(recommend_topic_id_list_dict)
redis_data_dict = { redis_data_dict = {
"data": json.dumps(tag_topic_id_list), "data": json.dumps(tag_topic_id_list),
"datadict":json.dumps(tag_topic_dict), "datadict": json.dumps(tag_topic_dict),
"cursor":0 "cursor": 0
} }
redis_client.hmset(topic_recommend_redis_key,redis_data_dict) redis_client.hmset(topic_recommend_redis_key, redis_data_dict)
if len(recommend_lin_pictorial_id_list) > 0: if len(recommend_lin_pictorial_id_list) > 0:
pictorial_data_dict = { pictorial_data_dict = {
"data": json.dumps(recommend_lin_pictorial_id_list), "data": json.dumps(recommend_lin_pictorial_id_list),
...@@ -184,19 +190,19 @@ class CollectData(object): ...@@ -184,19 +190,19 @@ class CollectData(object):
def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature, linucb_matrix_redis_prefix): def update_user_linucb_tag_info(self, reward, device_id, tag_id, user_feature, linucb_matrix_redis_prefix):
try: try:
user_feature = user_feature if user_feature else self.user_feature user_feature = user_feature if user_feature else self.user_feature
return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id, linucb_matrix_redis_prefix, redis_client) return LinUCB.update_linucb_info(user_feature, reward, tag_id, device_id, linucb_matrix_redis_prefix,
redis_client)
except: except:
logging_exception() logging_exception()
logging.error("update_user_linucb_tag_info error!") logging.error("update_user_linucb_tag_info error!")
return False return False
def transfer_old_info2ctr_feature_key(self, device_id): def transfer_old_info2ctr_feature_key(self, device_id):
try: try:
# 移植老用户的lin标签参数信息到ctr特征策略 # 移植老用户的lin标签参数信息到ctr特征策略
ctr_linucb_matrix_redis_prefix_key = self.ctr_linucb_matrix_redis_prefix + str(device_id) ctr_linucb_matrix_redis_prefix_key = self.ctr_linucb_matrix_redis_prefix + str(device_id)
linucb_matrix_redis_prefix_key = self.linucb_matrix_redis_prefix + str(device_id) linucb_matrix_redis_prefix_key = self.linucb_matrix_redis_prefix + str(device_id)
if redis_client.exists(ctr_linucb_matrix_redis_prefix_key): #如果新策略存在lin信息,则不需要移植 if redis_client.exists(ctr_linucb_matrix_redis_prefix_key): # 如果新策略存在lin信息,则不需要移植
return True return True
else: else:
if redis_client.exists(linucb_matrix_redis_prefix_key): if redis_client.exists(linucb_matrix_redis_prefix_key):
...@@ -223,7 +229,7 @@ class CollectData(object): ...@@ -223,7 +229,7 @@ class CollectData(object):
if redis_client.exists(linucb_recommend_pictorial_id_prefix): if redis_client.exists(linucb_recommend_pictorial_id_prefix):
older_device_info = redis_client.hgetall(linucb_recommend_pictorial_id_prefix) older_device_info = redis_client.hgetall(linucb_recommend_pictorial_id_prefix)
redis_client.hmset(ctr_linucb_recommend_pictorial_id_prefix, older_device_info) redis_client.hmset(ctr_linucb_recommend_pictorial_id_prefix, older_device_info)
logging.info("transfer_old_info2ctr_feature_key sucess:"+str(device_id)) logging.info("transfer_old_info2ctr_feature_key sucess:" + str(device_id))
return True return True
except: except:
logging_exception() logging_exception()
...@@ -246,7 +252,41 @@ class CollectData(object): ...@@ -246,7 +252,41 @@ class CollectData(object):
logging.error("get_device_tag_ctr error!") logging.error("get_device_tag_ctr error!")
return 0.001 return 0.001
def consume_data_from_kafka(self,topic_name=None): # 用户打标签加分
# 新增四种用户兴趣分行为
# 四种日志均为后端埋点日志
def transfer_update_recommend_tag_list(self, device_id, user_feature, user_id, tag_list, score_loop=1):
if len(tag_list) > 0:
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
for i in range(score_loop):
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature_ctr, self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
def consume_data_from_kafka(self, topic_name=None):
try: try:
user_feature = [1, 1] user_feature = [1, 1]
...@@ -257,11 +297,11 @@ class CollectData(object): ...@@ -257,11 +297,11 @@ class CollectData(object):
consume_msg = msg_dict[msg_key] consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg: for ori_msg in consume_msg:
try: try:
raw_val_dict,msg = loads_data(ori_msg.value) raw_val_dict, msg = loads_data(ori_msg.value)
if msg: if msg:
logging.info(ori_msg.value) logging.info(ori_msg.value)
if "type" in raw_val_dict and \ if "type" in raw_val_dict and \
(raw_val_dict["type"] in ("on_click_feed_topic_card","on_click_button")): (raw_val_dict["type"] in ("on_click_feed_topic_card", "on_click_button")):
click_topic_tag_list = list() click_topic_tag_list = list()
device_id = "" device_id = ""
if "on_click_feed_topic_card" == raw_val_dict["type"]: if "on_click_feed_topic_card" == raw_val_dict["type"]:
...@@ -283,7 +323,8 @@ class CollectData(object): ...@@ -283,7 +323,8 @@ class CollectData(object):
if is_collection: if is_collection:
topic_tag_list.append(tag_id) topic_tag_list.append(tag_id)
tag_query_results = Tag.objects.using(settings.SLAVE1_DB_NAME).filter( tag_query_results = Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=topic_tag_list, is_online=True, is_deleted=False, is_category=False).values_list("id", id__in=topic_tag_list, is_online=True, is_deleted=False,
is_category=False).values_list("id",
"is_ai") "is_ai")
for id, is_ai in tag_query_results: for id, is_ai in tag_query_results:
click_topic_tag_list.append(id) click_topic_tag_list.append(id)
...@@ -298,13 +339,17 @@ class CollectData(object): ...@@ -298,13 +339,17 @@ class CollectData(object):
tag_name = raw_val_dict["params"]["extra_param"] tag_name = raw_val_dict["params"]["extra_param"]
device_id = raw_val_dict["device"]["device_id"] device_id = raw_val_dict["device"]["device_id"]
user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None user_id = raw_val_dict["user_id"] if "user_id" in raw_val_dict else None
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(name=tag_name,is_online=True,is_deleted=False, is_category=False).values_list("id",flat=True)) tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(name=tag_name,
is_online=True,
is_deleted=False,
is_category=False).values_list(
"id", flat=True))
click_topic_tag_list.extend(tag_list) click_topic_tag_list.extend(tag_list)
logging.info("query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % ( logging.info(
"query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % (
str(device_id), tag_name, str(click_topic_tag_list))) str(device_id), tag_name, str(click_topic_tag_list)))
logging.info("click_topic_tag_list:%s" % (str(click_topic_tag_list)))
logging.info("click_topic_tag_list:%s"%(str(click_topic_tag_list)))
is_click = 1 is_click = 1
is_vote = 0 is_vote = 0
...@@ -324,9 +369,8 @@ class CollectData(object): ...@@ -324,9 +369,8 @@ class CollectData(object):
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr, self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix) self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
if len(click_topic_tag_list)>0: if len(click_topic_tag_list) > 0:
self.update_recommend_tag_list(device_id, user_feature, user_id, self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=click_topic_tag_list, click_topic_tag_list=click_topic_tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix, linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
...@@ -392,9 +436,9 @@ class CollectData(object): ...@@ -392,9 +436,9 @@ class CollectData(object):
# # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 # # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
# self.update_recommend_tag_list(device_id, user_feature, user_id) # self.update_recommend_tag_list(device_id, user_feature, user_id)
elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]: elif "type" in raw_val_dict and "interest_choice_click_next" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["tagid_list"],str): if isinstance(raw_val_dict["params"]["tagid_list"], str):
tagid_list = json.loads(raw_val_dict["params"]["tagid_list"]) tagid_list = json.loads(raw_val_dict["params"]["tagid_list"])
elif isinstance(raw_val_dict["params"]["tagid_list"],list): elif isinstance(raw_val_dict["params"]["tagid_list"], list):
tagid_list = raw_val_dict["params"]["tagid_list"] tagid_list = raw_val_dict["params"]["tagid_list"]
else: else:
tagid_list = list() tagid_list = list()
...@@ -408,7 +452,7 @@ class CollectData(object): ...@@ -408,7 +452,7 @@ class CollectData(object):
if len(tagid_list) > 0: if len(tagid_list) > 0:
tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter( tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tagid_list, is_online=True, is_deleted=False, id__in=tagid_list, is_online=True, is_deleted=False,
is_category=False).values_list("id",flat =True)) is_category=False).values_list("id", flat=True))
is_click = 1 is_click = 1
is_vote = 0 is_vote = 0
...@@ -491,7 +535,8 @@ class CollectData(object): ...@@ -491,7 +535,8 @@ class CollectData(object):
# 用户点击问题清单进linucb # 用户点击问题清单进linucb
elif b'content' in raw_val_dict: elif b'content' in raw_val_dict:
data = json.loads(raw_val_dict[b'content']) data = json.loads(raw_val_dict[b'content'])
if 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS']['action'] == "venus/community/skin_check/submit_questions": if 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS'][
'action'] == "venus/community/skin_check/submit_questions":
device_id = data['SYS']['cl_id'] device_id = data['SYS']['cl_id']
tagid_list = list(data['APP'].get('answer_tag', [])) tagid_list = list(data['APP'].get('answer_tag', []))
user_id = data['SYS'].get('user_id', None) user_id = data['SYS'].get('user_id', None)
...@@ -511,7 +556,8 @@ class CollectData(object): ...@@ -511,7 +556,8 @@ class CollectData(object):
for i in range(5): for i in range(5):
for tag_id in tag_query_results_multi: for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature, self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix) self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息 # 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id) device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
...@@ -532,9 +578,11 @@ class CollectData(object): ...@@ -532,9 +578,11 @@ class CollectData(object):
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix, linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix) linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" % logging.info("skin_check topic type:%s, device_id:%s, tag_query_results:%s" %
(str(data['SYS']['action']), str(device_id), str(tag_query_results_multi))) (str(data['SYS']['action']), str(device_id),
str(tag_query_results_multi)))
# 品牌问卷进linucb # 品牌问卷进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS']['action'] == "venus/community/survey_question/submit": elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and data['SYS'][
'action'] == "venus/community/survey_question/submit":
device_id = data['SYS']['cl_id'] device_id = data['SYS']['cl_id']
tagid_list = list(data['APP'].get('answer_tag', [])) tagid_list = list(data['APP'].get('answer_tag', []))
user_id = data['SYS'].get('user_id', None) user_id = data['SYS'].get('user_id', None)
...@@ -555,7 +603,8 @@ class CollectData(object): ...@@ -555,7 +603,8 @@ class CollectData(object):
for i in range(5): for i in range(5):
for tag_id in tag_query_results_multi: for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature, self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix) self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息 # 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id) device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
...@@ -576,16 +625,87 @@ class CollectData(object): ...@@ -576,16 +625,87 @@ class CollectData(object):
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix, linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix) linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
logging.info("survey_question type:%s, device_id:%s, tagid_list:%s" % logging.info("survey_question type:%s, device_id:%s, tagid_list:%s" %
(str(data['SYS']['action']), str(device_id), str(tag_query_results_multi))) (str(data['SYS']['action']), str(device_id),
str(tag_query_results_multi)))
# 首页搜索精准匹配标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
"api/v1/cards/topic" in str(data['SYS'].get('action',"")):
logging.info("action=api/v1/cards/topic")
tag_name = data["APP"].get("query", [])
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
name=tag_name).values_list("id"))
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
5)
logging.info(
"api/v1/cards/topic,device_id:%s,tag_list:%s" % (str(device_id), str(tag_list)))
# (客户端创建回答,后台创建回答或修改回答关联标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
("venus/community/topic/create" in str(data['SYS'].get('action',"")) or
"venus/sun/topic/edit" in str(data['SYS'].get('action',""))
):
action=str(data['SYS'].get('action',''))
logging.info("action=%s"%(action))
tag_ids = list(data["APP"].get("tag_ids", []))
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
10)
logging.info("%s,device_id:%s,tag_list:%s" % (action,
str(device_id), str(tag_list)))
# 创建问题关注标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
"venus/sun/pictorial/edit" in str(data['SYS'].get('action',"")):
action = str(data['SYS'].get('action', ''))
logging.info("action=%s" % (action))
tag_ids = list(data["APP"].get("tag_ids", []))
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
20)
logging.info("%s,device_id:%s,tag_list:%s" % (action,
str(device_id), str(tag_list)))
# kyc最后一题
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
"venus/community/survey_question/record_kyc_last_question" in str(data['SYS'].get('action',"")):
action = str(data['SYS'].get('action', ''))
tag_ids = list(data["APP"].get("tag_ids", []))
logging.info('action:%s,tag_list:%s' % (action, str(tag_ids)))
tag_query_results = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
tag_query_results = [i[0] for i in tag_query_results]
logging.info('action:%s,mysql query taglist:%s' % (action, str(tag_query_results)))
tag_query_results_multi = [i for i in tag_ids if i in tag_query_results]
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_query_results_multi,
5)
logging.info("action:%s,device_id:%s,tag_list:%s" % (action,
str(device_id),
str(tag_query_results_multi)))
else: else:
if msg: if msg:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type")) logging.warning(
"unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except: except:
logging_exception() logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
# 假设数据库连接异常,强制退出程序,supervisor重启linub # 假设数据库连接异常,强制退出程序,supervisor重启linub
os._exit(0) os._exit(0)
return True
except: except:
logging_exception() logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
...@@ -13,13 +13,6 @@ app = Celery('physical') ...@@ -13,13 +13,6 @@ app = Celery('physical')
# - namespace='CELERY' means all celery-related configuration keys # - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix. # should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY') app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': settings.CELERY_BROKER_URL,
'default_timeout': 60 * 60
}
}
# Load task modules from all registered Django app configs. # Load task modules from all registered Django app configs.
......
...@@ -9,6 +9,8 @@ class CeleryTaskRouter(object): ...@@ -9,6 +9,8 @@ class CeleryTaskRouter(object):
queue_task_map = { queue_task_map = {
"tapir-alpha": [ "tapir-alpha": [
'injection.data_sync.tasks.write_to_es', 'injection.data_sync.tasks.write_to_es',
],
"vest": [
'vest.request.auto_request.click', 'vest.request.auto_request.click',
'vest.request.auto_request.reply', 'vest.request.auto_request.reply',
'vest.request.auto_request.follow', 'vest.request.auto_request.follow',
......
...@@ -14,7 +14,6 @@ gevent==1.3.7 ...@@ -14,7 +14,6 @@ gevent==1.3.7
pypinyin==0.34.1 pypinyin==0.34.1
numpy==1.16.2 numpy==1.16.2
lz4==2.1.6 lz4==2.1.6
celery_once==3.0.1
git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git@master git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git@master
git+ssh://git@git.wanmeizhensuo.com/backend/helios.git@master git+ssh://git@git.wanmeizhensuo.com/backend/helios.git@master
......
...@@ -16,6 +16,7 @@ from trans2es.models.tag import CommunityTagSetRelation ...@@ -16,6 +16,7 @@ from trans2es.models.tag import CommunityTagSetRelation
from django.conf import settings from django.conf import settings
from libs.error import logging_exception from libs.error import logging_exception
from django.db import connection from django.db import connection
from trans2es.models.account_reg_extra import AccountRegExtra
def get_highlight(fields=[]): def get_highlight(fields=[]):
...@@ -143,6 +144,27 @@ def choice_pictorial_push_tag(device_id, user_id): ...@@ -143,6 +144,27 @@ def choice_pictorial_push_tag(device_id, user_id):
return {"pictorial_tag_list": []} return {"pictorial_tag_list": []}
@bind("physical/search/lintag_by_user_id")
def get_lintags_by_user_id(user_id):
try:
devices = AccountRegExtra.objects.filter(user_id=user_id, is_online=True, is_deleted=False).values_list("device_id", flat=True)
if devices:
linucb_recommend_redis_prefix = "physical:linucb:tag_recommend:device_id:"
device_id = devices[0]
redis_key = linucb_recommend_redis_prefix + str(device_id)
tag_data = redis_client.get(redis_key)
lintags = []
if tag_data is None:
lintags = []
else:
lintags = json.loads(str(tag_data, encoding="utf-8"))
return {"lin_tag_list": lintags[:3]}
return {"lin_tag_list": []}
except Exception as e:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return {"lin_tag_list": []}
@bind("physical/search/choice_push_tag") @bind("physical/search/choice_push_tag")
def choice_push_tag(device_id, user_id): def choice_push_tag(device_id, user_id):
""" """
......
import datetime
from django.db import models
class AccountRegExtra(models.Model):
class Meta:
verbose_name = u"设备用户关系表"
db_table = "account_reg_extra"
id = models.IntegerField(verbose_name="主键ID", primary_key=True)
is_online = models.BooleanField(verbose_name=u"是否上线")
create_time = models.DateTimeField(verbose_name=u"创建时间", default=datetime.datetime.fromtimestamp(0))
update_time = models.DateTimeField(verbose_name=u"更新时间", default=datetime.datetime.fromtimestamp(0))
is_deleted = models.BooleanField(verbose_name=u"")
geo = models.CharField(verbose_name=u"", max_length=300)
model = models.CharField(verbose_name=u"", max_length=64)
device_id = models.CharField(verbose_name=u"设备ID", max_length=64)
share_code = models.CharField(verbose_name=u"", max_length=64)
user_id = models.IntegerField(verbose_name="用户ID")
# coding=utf8
import redis
from django.conf import settings
class _RedisWithoutprefixProxy(object):
_hacked_methods = set([
'get', 'mget', 'hget', 'hgetall', 'rpop'
])
def __getattribute__(self, name):
try:
return super(_RedisWithoutprefixProxy, self).__getattribute__(name)
except AttributeError:
f = getattr(self.redis, name)
if name in _RedisWithoutprefixProxy._hacked_methods:
def wrapper(k, *args, **kwargs):
data = f(k, *args, **kwargs)
# bug fix for py35, json.loads does accept bytes!
if type(data) == bytes:
data = data.decode()
return data
return wrapper
return f
def __init__(self, conf):
self.__pool = redis.ConnectionPool(**conf)
self.redis = redis.StrictRedis(connection_pool=self.__pool)
reply_cache = _RedisWithoutprefixProxy(settings.REDIS_TOPIC['reply_cache'])
follow_cache = _RedisWithoutprefixProxy(settings.REDIS_TOPIC['follow_cache'])
click_cache = _RedisWithoutprefixProxy(settings.REDIS_TOPIC['click_cache'])
...@@ -42,7 +42,8 @@ def batch_handle(auto_click_list): ...@@ -42,7 +42,8 @@ def batch_handle(auto_click_list):
try: try:
cookies = login() cookies = login()
if cookies is not None: if cookies is not None:
click.apply_async(args=(cookies, topic_id), eta=get_rand_time()) time = get_rand_time()
click.apply_async(args=(cookies, topic_id), eta=time)
# click(cookies, topic_id) # click(cookies, topic_id)
except: except:
pass pass
......
...@@ -44,7 +44,8 @@ def batch_handle(auto_click_list): ...@@ -44,7 +44,8 @@ def batch_handle(auto_click_list):
cookies = login() cookies = login()
if cookies is not None: if cookies is not None:
# click(cookies, topic_id) # click(cookies, topic_id)
click.apply_async(args=(cookies, topic_id), eta=get_rand_time(hourup=1)) time = get_rand_time(hourup=0)
click.apply_async(args=(cookies, topic_id), eta=time)
except: except:
pass pass
...@@ -54,7 +55,7 @@ def auto_click_per_2h_by_post(): ...@@ -54,7 +55,7 @@ def auto_click_per_2h_by_post():
auto_click_list = [] auto_click_list = []
try: try:
# 发帖2小时内:[1-3]个点赞 # 发帖2小时内:[1-3]个点赞
numtime1, numtime2 = time_conv_hour(0, 2) numtime1, numtime2 = time_conv_hour(0, 1)
topic_ids = get_commnet_id(numtime2, numtime1, content_level_low=0, content_level_top=6) topic_ids = get_commnet_id(numtime2, numtime1, content_level_low=0, content_level_top=6)
for topic_id in topic_ids: for topic_id in topic_ids:
click_num = random.randint(1, 3) click_num = random.randint(1, 3)
......
...@@ -30,7 +30,8 @@ def batch_handle(auto_follow_list): ...@@ -30,7 +30,8 @@ def batch_handle(auto_follow_list):
cookies = login() cookies = login()
if cookies is not None: if cookies is not None:
# follow(cookies, user_id) # follow(cookies, user_id)
follow.apply_async(args=(cookies, user_id), eta=get_rand_time()) time = get_rand_time()
follow.apply_async(args=(cookies, user_id), eta=time)
except: except:
pass pass
......
...@@ -29,7 +29,8 @@ def batch_handle(auto_follow_list): ...@@ -29,7 +29,8 @@ def batch_handle(auto_follow_list):
cookies = login() cookies = login()
if cookies is not None: if cookies is not None:
# follow(cookies, user_id) # follow(cookies, user_id)
follow.apply_async(args=(cookies, user_id), eta=get_rand_time()) time = get_rand_time()
follow.apply_async(args=(cookies, user_id), eta=time)
except: except:
pass pass
......
...@@ -41,7 +41,8 @@ def batch_handle(auto_follow_list): ...@@ -41,7 +41,8 @@ def batch_handle(auto_follow_list):
cookies = login() cookies = login()
if cookies is not None: if cookies is not None:
# follow(cookies, user_id) # follow(cookies, user_id)
follow.apply_async(args=(cookies, user_id), eta=get_rand_time(hourup=1)) time = get_rand_time(hourup=0)
follow.apply_async(args=(cookies, user_id), eta=time)
except: except:
pass pass
...@@ -52,7 +53,7 @@ def auto_follow_per_2h_by_post_and_regist(): ...@@ -52,7 +53,7 @@ def auto_follow_per_2h_by_post_and_regist():
auto_follow_list = [] auto_follow_list = []
try: try:
# 发帖,注册后2小时内:[1-3]个粉丝 # 发帖,注册后2小时内:[1-3]个粉丝
numtime1, numtime2 = time_conv_hour(0, 2) numtime1, numtime2 = time_conv_hour(0, 1)
user_ids = get_commnet_id(numtime2, numtime1, content_level_low=0, content_level_top=6) user_ids = get_commnet_id(numtime2, numtime1, content_level_low=0, content_level_top=6)
for user_id in user_ids: for user_id in user_ids:
follow_num = random.randint(1, 3) follow_num = random.randint(1, 3)
......
...@@ -57,7 +57,8 @@ def batch_handle(pictorial_id_list): ...@@ -57,7 +57,8 @@ def batch_handle(pictorial_id_list):
if cookies is not None: if cookies is not None:
comment = judge_pictorial_info_get_comment(pictorial_id) comment = judge_pictorial_info_get_comment(pictorial_id)
# pictorial_reply(cookies, pictorial_id, comment) # pictorial_reply(cookies, pictorial_id, comment)
pictorial_reply.apply_async(args=(cookies, pictorial_id, comment), eta=get_rand_time()) time = get_rand_time()
pictorial_reply.apply_async(args=(cookies, pictorial_id, comment), eta=time)
except: except:
pass pass
......
...@@ -35,14 +35,17 @@ def batch_handle(topic_id_list): ...@@ -35,14 +35,17 @@ def batch_handle(topic_id_list):
comment = judge_topic_info_get_comment(topic_id) comment = judge_topic_info_get_comment(topic_id)
if comment: if comment:
# reply(cookies, topic_id, comment) # reply(cookies, topic_id, comment)
reply.apply_async(args=(cookies, topic_id, comment), eta=get_rand_time()) time = get_rand_time()
reply.apply_async(args=(cookies, topic_id, comment), eta=time)
else: else:
comment1, comment2 = get_answer_data() comment1, comment2 = get_answer_data()
response = reply_answer(cookies, topic_id, comment1) response = reply_answer(cookies, topic_id, comment1)
response = json.loads(response) response = json.loads(response)
cookies = login() cookies = login()
reply_id = response["data"]["id"] reply_id = response["data"].get('id')
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=get_rand_time()) if reply_id:
time = get_rand_time()
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=time)
......
...@@ -34,14 +34,17 @@ def batch_handle(topic_id_list): ...@@ -34,14 +34,17 @@ def batch_handle(topic_id_list):
comment = judge_topic_info_get_comment(topic_id) comment = judge_topic_info_get_comment(topic_id)
if comment: if comment:
# reply(cookies, topic_id, comment) # reply(cookies, topic_id, comment)
reply.apply_async(args=(cookies, topic_id, comment), eta=get_rand_time(hourup=1)) time = get_rand_time(hourup=0)
reply.apply_async(args=(cookies, topic_id, comment), eta=time)
else: else:
comment1, comment2 = get_answer_data() comment1, comment2 = get_answer_data()
response = reply_answer(cookies, topic_id, comment1) response = reply_answer(cookies, topic_id, comment1)
response = json.loads(response) response = json.loads(response)
cookies = login() cookies = login()
reply_id = response["data"]["id"] reply_id = response["data"].get('id')
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=get_rand_time(hourup=1)) if reply_id:
time = get_rand_time(hourup=0)
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=time)
except: except:
logging_exception() logging_exception()
...@@ -51,7 +54,7 @@ def batch_handle(topic_id_list): ...@@ -51,7 +54,7 @@ def batch_handle(topic_id_list):
def auto_reply_per_2h_to_topic(): def auto_reply_per_2h_to_topic():
topic_id_list = [] topic_id_list = []
try: try:
numtime1, numtime2 = time_conv_hour(0, 2) numtime1, numtime2 = time_conv_hour(0, 1)
topic_ids = get_data(numtime1, numtime2) topic_ids = get_data(numtime1, numtime2)
for topic_id in topic_ids: for topic_id in topic_ids:
random_num = random.randint(1, 2) random_num = random.randint(1, 2)
......
import requests import requests
import time
import datetime import datetime
import random import random
import traceback import traceback
...@@ -8,12 +7,12 @@ import json ...@@ -8,12 +7,12 @@ import json
import redis import redis
import smtplib import smtplib
from celery import shared_task from celery import shared_task
from celery_once import QueueOnce
from libs.cache import redis_client from libs.cache import redis_client
from email.mime.text import MIMEText from email.mime.text import MIMEText
from email.utils import formataddr from email.utils import formataddr
from physical.settings_local import DATABASES from physical.settings_local import DATABASES
from physical.settings_local import REDIS_URL from physical.settings_local import REDIS_URL
from vest.cache.base import reply_cache, follow_cache, click_cache
from vest.data.topic_models import get_pictorial_tag_by_id, get_topic_product_info, get_edit_tag_id_list, \ from vest.data.topic_models import get_pictorial_tag_by_id, get_topic_product_info, get_edit_tag_id_list, \
get_category_tag_id, topic_has_image, get_tag_id_list get_category_tag_id, topic_has_image, get_tag_id_list
from django.conf import settings from django.conf import settings
...@@ -115,9 +114,16 @@ def logins(user_id): ...@@ -115,9 +114,16 @@ def logins(user_id):
return None return None
# @shared_task(retry_kwargs={'max_retries': 0}, base=QueueOnce, once={'graceful': True, 'unlock_before_run': True})
@shared_task(retry_kwargs={'max_retries': 0}) @shared_task(retry_kwargs={'max_retries': 0})
def click(cookies_get, id): def click(cookies_get, id):
click_key = 'click topic_id:%s, cookies_get: %s' % (str(cookies_get), str(id))
cache_count = click_cache.get(click_key)
if cache_count:
return
else:
click_cache.set(click_key, 1)
click_cache.expire(click_key, settings.CACHE_SECONDS)
# 点赞 # 点赞
try: try:
topic_id = id[0] topic_id = id[0]
...@@ -150,9 +156,15 @@ def click(cookies_get, id): ...@@ -150,9 +156,15 @@ def click(cookies_get, id):
logging.error("catch exception,logins:%s" % traceback.format_exc()) logging.error("catch exception,logins:%s" % traceback.format_exc())
# @shared_task(retry_kwargs={'max_retries': 0}, base=QueueOnce, once={'graceful': True, 'unlock_before_run': True})
@shared_task(retry_kwargs={'max_retries': 0}) @shared_task(retry_kwargs={'max_retries': 0})
def reply(cookies_get, id, content): def reply(cookies_get, id, content):
reply_key = 'click topic_id:%s, cookies_get: %s, content:%s' % (str(id), str(cookies_get), str(content))
cache_count = reply_cache.get(reply_key)
if cache_count:
return
else:
reply_cache.set(reply_key, 1)
reply_cache.expire(reply_key, settings.CACHE_SECONDS)
try: try:
post_dict = { post_dict = {
'topic_id': id, 'topic_id': id,
...@@ -275,9 +287,15 @@ def get_comments(): ...@@ -275,9 +287,15 @@ def get_comments():
return None return None
# @shared_task(retry_kwargs={'max_retries': 0}, base=QueueOnce, once={'graceful': True, 'unlock_before_run': True})
@shared_task(retry_kwargs={'max_retries': 0}) @shared_task(retry_kwargs={'max_retries': 0})
def follow(cookies_get, id): def follow(cookies_get, id):
follow_key = 'click user_id:%s, cookies_get: %s' % (str(id), str(cookies_get))
cache_count = follow_cache.get(follow_key)
if cache_count:
return
else:
follow_cache.set(follow_key, 1)
follow_cache.expire(follow_key, settings.CACHE_SECONDS)
try: try:
post_dict = { post_dict = {
'type': 1, 'type': 1,
...@@ -442,9 +460,16 @@ def set_reply_to_redis(): ...@@ -442,9 +460,16 @@ def set_reply_to_redis():
logging.error("catch exception,logins:%s" % traceback.format_exc()) logging.error("catch exception,logins:%s" % traceback.format_exc())
# @shared_task(retry_kwargs={'max_retries': 0}, base=QueueOnce, once={'graceful': True, 'unlock_before_run': True})
@shared_task(retry_kwargs={'max_retries': 0}) @shared_task(retry_kwargs={'max_retries': 0})
def reply2(cookies_get, id, content, replied_id): def reply2(cookies_get, id, content, replied_id):
reply_key = 'click topic_id:%s, cookies_get: %s, content:%s, replied_id: %s' % \
(str(id), str(cookies_get), str(content), str(replied_id))
cache_count = reply_cache.get(reply_key)
if cache_count:
return
else:
reply_cache.set(reply_key, 1)
reply_cache.expire(reply_key, settings.CACHE_SECONDS)
try: try:
post_dict = { post_dict = {
'topic_id': id, 'topic_id': id,
...@@ -464,9 +489,15 @@ def reply2(cookies_get, id, content, replied_id): ...@@ -464,9 +489,15 @@ def reply2(cookies_get, id, content, replied_id):
logging.error("catch exception,logins:%s" % traceback.format_exc()) logging.error("catch exception,logins:%s" % traceback.format_exc())
# @shared_task(retry_kwargs={'max_retries': 0}, base=QueueOnce, once={'graceful': True, 'unlock_before_run': True})
@shared_task(retry_kwargs={'max_retries': 0}) @shared_task(retry_kwargs={'max_retries': 0})
def pictorial_reply(cookies_get, id, content): def pictorial_reply(cookies_get, id, content):
reply_key = 'click topic_id:%s, cookies_get: %s, content:%s' % (str(id), str(cookies_get), str(content))
cache_count = reply_cache.get(reply_key)
if cache_count:
return
else:
reply_cache.set(reply_key, 1)
reply_cache.expire(reply_key, settings.CACHE_SECONDS)
try: try:
post_dict = { post_dict = {
'pictorial_id': id, 'pictorial_id': id,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment