Commit fefe9fc1 authored by 谢祁峰's avatar 谢祁峰

fix

parents f20817d3 01f2957b
......@@ -4,7 +4,7 @@ ontime_list = [
"0 9 * * * source /srv/envs/physical/bin/activate && python /data/log/physical/app/crontab.py",
"10 9 * * * source /srv/envs/physical/bin/activate && python /data/log/physical/app/crontabs.py",
"0 9 * * * sh /data/log/cybertron/app/statistics_query.sh > /data/log/cybertron/app/statistics_query.log",
"54 */2 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_click_per_2h_by_post",
"54 */1 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_click_per_2h_by_post",
# "*/5 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m true_click_one",
# "02,12,22,32,42,52 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es-m true_click_two",
# "00,10,20,30,40,50 * * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m true_click_three",
......@@ -16,11 +16,11 @@ ontime_list = [
"0 10 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_lunch_app",
"30 10 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_lunch_app2",
# "*/5 * * * 1 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_5m_by_followed",
"1 */2 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_2h_by_post_and_regist",
"1 */1 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_2h_by_post_and_regist",
"0 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m get_login_session",
"0 0 * * 3 source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m get_user_id",
# "0 14,18,22 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m principal_online_comment1",
"25 */2 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_reply_per_2h_to_topic",
"25 */1 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_reply_per_2h_to_topic",
"0 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_click_per_1d_by_post",
"1 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_1d_by_regist",
"2 9 * * * source /srv/envs/physical/bin/activate && cd /srv/apps/physical && python manage.py trans2es_mapping2es -m auto_follow_per_1d_by_post",
......
......@@ -27,7 +27,7 @@ def get_rand_time(hourlow=0, hourup=13, minutelow=0, minuteup=60):
hours = random.randint(hourlow, hourup)
minutes = random.randint(minutelow, minuteup)
# todo redis会自动给加8个小时,所以这边先写死减少8小时
now_time = NOW + timedelta(hours=hours, minutes=minutes) - timedelta(hours=8)
now_time = NOW + timedelta(hours=hours, minutes=minutes)
time = eta_2_push_time(now_time.strftime("%Y-%m-%d %H:%M:%S"))
print(datetime.fromtimestamp(time, pytz.timezone('Asia/Shanghai')))
return datetime.fromtimestamp(time, pytz.timezone('Asia/Shanghai'))
......
# -*- coding: UTF-8 -*-
# !/usr/bin/env python
from kafka import KafkaConsumer
import random
from libs.cache import redis_client
import logging
from linucb.views.linucb import LinUCB
import json
import os
from trans2es.models.tag import TopicTag, Tag
from trans2es.models.topic import TopicHomeRecommend
import traceback
import msgpack
from django.conf import settings
from kafka import KafkaConsumer
import libs.tools as Tools
from libs.cache import redis_client
from libs.error import logging_exception
from libs.es import ESPerform
from linucb.views.linucb import LinUCB
from search.utils.common import *
from search.views.tag import get_same_tagset_ids
import libs.tools as Tools
from trans2es.models.pictorial import CommunityPictorialHomeFeed
from trans2es.models.portrait_stat import LikeDeviceTagStat
from trans2es.models.tag import TopicTag, Tag
from trans2es.models.topic import TopicHomeRecommend
from libs.error import logging_exception
import os
from search.views.tag import get_same_tagset_ids
import msgpack
def loads_data(data):
......@@ -252,6 +252,40 @@ class CollectData(object):
logging.error("get_device_tag_ctr error!")
return 0.001
# 用户打标签加分
# 新增四种用户兴趣分行为
# 四种日志均为后端埋点日志
def transfer_update_recommend_tag_list(self, device_id, user_feature, user_id, tag_list, score_loop=1):
if len(tag_list) > 0:
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
for i in range(score_loop):
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature_ctr, self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
def consume_data_from_kafka(self, topic_name=None):
try:
user_feature = [1, 1]
......@@ -268,7 +302,6 @@ class CollectData(object):
logging.info(ori_msg.value)
if "type" in raw_val_dict and \
(raw_val_dict["type"] in ("on_click_feed_topic_card", "on_click_button")):
# 标签处理
click_topic_tag_list = list()
device_id = ""
if "on_click_feed_topic_card" == raw_val_dict["type"]:
......@@ -283,17 +316,12 @@ class CollectData(object):
# if collection and is_ai:
# click_topic_tag_list.append(id)
topic_tag_list = list()
# 从mysql表community_topictag中获取数据
click_results = TopicTag.objects.using(settings.SLAVE1_DB_NAME).filter(
topic_id=topic_id, is_online=True).values_list("tag_id", "is_collection")
for tag_id, is_collection in click_results:
# topic_tag_list.append(tag_id)
if is_collection:
topic_tag_list.append(tag_id)
# 从mysql表community_tag中获取数据
# 标签共有1000w
# select id from communty_tag where id in (select tag_id from community_topictag)
# 关联得到100w标签
tag_query_results = Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=topic_tag_list, is_online=True, is_deleted=False,
is_category=False).values_list("id",
......@@ -303,16 +331,6 @@ class CollectData(object):
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
str(device_id), str(topic_id), str(click_topic_tag_list)))
# 更新不同策略的lin标签参数信息
# 点击信息流卡片(问题卡片和回答卡片)对应的标签权重为1
for tag_id in click_topic_tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
elif raw_val_dict["type"] == "on_click_button" and "page_name" in \
raw_val_dict["params"] and "button_name" in raw_val_dict["params"] \
and "extra_param" in raw_val_dict["params"]:
......@@ -330,31 +348,27 @@ class CollectData(object):
logging.info(
"query tag attention,positive tag_list,device_id:%s,query_name:%s,tag_list:%s" % (
str(device_id), tag_name, str(click_topic_tag_list)))
# 更新不同策略的lin标签参数信息
# 注册页和搜索页关注的标签权重为10
for i in range[10]:
for tag_id in click_topic_tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
logging.info("click_topic_tag_list:%s" % (str(click_topic_tag_list)))
is_click = 1
is_vote = 0
# 如果点击和点赞满足一个回报即为1
reward = 1 if is_click or is_vote else 0
# 用户处理
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
# 更新不同策略的lin标签参数信息
for tag_id in click_topic_tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature_ctr,
self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
if len(click_topic_tag_list) > 0:
self.update_recommend_tag_list(device_id, user_feature, user_id,
......@@ -539,6 +553,7 @@ class CollectData(object):
reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
for i in range(5):
for tag_id in tag_query_results_multi:
self.update_user_linucb_tag_info(reward, device_id, tag_id,
......@@ -613,21 +628,16 @@ class CollectData(object):
(str(data['SYS']['action']), str(device_id),
str(tag_query_results_multi)))
# 首页搜索精准匹配标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data[
'SYS'] and "api/v1/cards/topic" in \
data['SYS'][
'action']:
tag_name = data["APP"].get("query", "")
tag_id = data["APP"].get("tag_id", "")
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
"api/v1/cards/topic" in data['SYS']['action']:
tag_name = data["APP"].get("query", [])
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
tag=tag_name).values_list("id"))
tag_list.append(tag_id)
logging.info("action=home_page_search,tagid_list:%s" % tag_list)
device_id = data['SYS']['cl_id']
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id, tag_list,
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
5)
logging.info("action=home_page_search,update lintag success")
# (客户端创建回答,后台创建回答或修改回答关联标签关键字进linucb
elif 'SYS' in data and 'APP' in data and 'action' in data['SYS'] and \
("venus/community/topic/create" in data['SYS'][
......@@ -636,7 +646,7 @@ class CollectData(object):
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
device_id = data['SYS']['cl_id']
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
......@@ -648,14 +658,15 @@ class CollectData(object):
tag_list = list(Tag.objects.using(settings.SLAVE1_DB_NAME).filter(
id__in=tag_ids, is_online=True, is_deleted=False,
is_category=False).values_list("id"))
device_id = data['SYS']['cl_id']
device_id = data["SYS"]["cl_id"]
user_id = data['SYS'].get('user_id', None)
self.transfer_update_recommend_tag_list(device_id, user_feature, user_id,
tag_list,
20)
else:
if msg:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
logging.warning(
"unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......@@ -666,37 +677,3 @@ class CollectData(object):
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
# 用户打标签加分
# 新增四种用户兴趣分行为
# 四种日志均为后端埋点日志
def transfer_update_recommend_tag_list(self, device_id, user_feature, user_id, tag_list, score_loop=1):
if len(tag_list) > 0:
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
# 移植老用户的lin信息到ctr特征策略
self.transfer_old_info2ctr_feature_key(device_id)
for i in range(score_loop):
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature,
self.linucb_matrix_redis_prefix)
# 获取tag的ctr信息
device_tag_ctr = self.get_device_tag_ctr(device_id, tag_id)
user_feature_ctr = [device_tag_ctr, device_tag_ctr]
self.update_user_linucb_tag_info(reward, device_id, tag_id,
user_feature_ctr, self.ctr_linucb_matrix_redis_prefix)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.linucb_recommend_pictorial_id_prefix)
self.update_recommend_tag_list(device_id, user_feature, user_id,
click_topic_tag_list=tag_list,
linucb_matrix_prefix=self.ctr_linucb_matrix_redis_prefix,
linucb_recommend_tag_prefix=self.ctr_linucb_recommend_redis_prefix,
linucb_topic_ids_prefix=self.ctr_linucb_recommend_topic_id_prefix,
linucb_pictorial_ids_prefix=self.ctr_linucb_recommend_pictorial_id_prefix)
......@@ -9,6 +9,8 @@ class CeleryTaskRouter(object):
queue_task_map = {
"tapir-alpha": [
'injection.data_sync.tasks.write_to_es',
],
"vest": [
'vest.request.auto_request.click',
'vest.request.auto_request.reply',
'vest.request.auto_request.follow',
......
......@@ -42,7 +42,8 @@ def batch_handle(auto_click_list):
try:
cookies = login()
if cookies is not None:
click.apply_async(args=(cookies, topic_id), eta=get_rand_time())
time = get_rand_time()
click.apply_async(args=(cookies, topic_id), eta=time)
# click(cookies, topic_id)
except:
pass
......
......@@ -44,7 +44,8 @@ def batch_handle(auto_click_list):
cookies = login()
if cookies is not None:
# click(cookies, topic_id)
click.apply_async(args=(cookies, topic_id), eta=get_rand_time(hourup=1))
time = get_rand_time(hourup=0)
click.apply_async(args=(cookies, topic_id), eta=time)
except:
pass
......@@ -54,7 +55,7 @@ def auto_click_per_2h_by_post():
auto_click_list = []
try:
# 发帖2小时内:[1-3]个点赞
numtime1, numtime2 = time_conv_hour(0, 2)
numtime1, numtime2 = time_conv_hour(0, 1)
topic_ids = get_commnet_id(numtime2, numtime1, content_level_low=0, content_level_top=6)
for topic_id in topic_ids:
click_num = random.randint(1, 3)
......
......@@ -30,7 +30,8 @@ def batch_handle(auto_follow_list):
cookies = login()
if cookies is not None:
# follow(cookies, user_id)
follow.apply_async(args=(cookies, user_id), eta=get_rand_time())
time = get_rand_time()
follow.apply_async(args=(cookies, user_id), eta=time)
except:
pass
......
......@@ -29,7 +29,8 @@ def batch_handle(auto_follow_list):
cookies = login()
if cookies is not None:
# follow(cookies, user_id)
follow.apply_async(args=(cookies, user_id), eta=get_rand_time())
time = get_rand_time()
follow.apply_async(args=(cookies, user_id), eta=time)
except:
pass
......
......@@ -41,7 +41,8 @@ def batch_handle(auto_follow_list):
cookies = login()
if cookies is not None:
# follow(cookies, user_id)
follow.apply_async(args=(cookies, user_id), eta=get_rand_time(hourup=1))
time = get_rand_time(hourup=0)
follow.apply_async(args=(cookies, user_id), eta=time)
except:
pass
......@@ -52,7 +53,7 @@ def auto_follow_per_2h_by_post_and_regist():
auto_follow_list = []
try:
# 发帖,注册后2小时内:[1-3]个粉丝
numtime1, numtime2 = time_conv_hour(0, 2)
numtime1, numtime2 = time_conv_hour(0, 1)
user_ids = get_commnet_id(numtime2, numtime1, content_level_low=0, content_level_top=6)
for user_id in user_ids:
follow_num = random.randint(1, 3)
......
......@@ -57,7 +57,8 @@ def batch_handle(pictorial_id_list):
if cookies is not None:
comment = judge_pictorial_info_get_comment(pictorial_id)
# pictorial_reply(cookies, pictorial_id, comment)
pictorial_reply.apply_async(args=(cookies, pictorial_id, comment), eta=get_rand_time())
time = get_rand_time()
pictorial_reply.apply_async(args=(cookies, pictorial_id, comment), eta=time)
except:
pass
......
......@@ -35,14 +35,16 @@ def batch_handle(topic_id_list):
comment = judge_topic_info_get_comment(topic_id)
if comment:
# reply(cookies, topic_id, comment)
reply.apply_async(args=(cookies, topic_id, comment), eta=get_rand_time())
time = get_rand_time()
reply.apply_async(args=(cookies, topic_id, comment), eta=time)
else:
comment1, comment2 = get_answer_data()
response = reply_answer(cookies, topic_id, comment1)
response = json.loads(response)
cookies = login()
reply_id = response["data"]["id"]
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=get_rand_time())
time = get_rand_time()
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=time)
......
......@@ -34,14 +34,16 @@ def batch_handle(topic_id_list):
comment = judge_topic_info_get_comment(topic_id)
if comment:
# reply(cookies, topic_id, comment)
reply.apply_async(args=(cookies, topic_id, comment), eta=get_rand_time(hourup=1))
time = get_rand_time(hourup=0)
reply.apply_async(args=(cookies, topic_id, comment), eta=time)
else:
comment1, comment2 = get_answer_data()
response = reply_answer(cookies, topic_id, comment1)
response = json.loads(response)
cookies = login()
reply_id = response["data"]["id"]
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=get_rand_time(hourup=1))
time = get_rand_time(hourup=0)
reply2.apply_async(args=(cookies, topic_id, comment2, reply_id), eta=time)
except:
logging_exception()
......@@ -51,7 +53,7 @@ def batch_handle(topic_id_list):
def auto_reply_per_2h_to_topic():
topic_id_list = []
try:
numtime1, numtime2 = time_conv_hour(0, 2)
numtime1, numtime2 = time_conv_hour(0, 1)
topic_ids = get_data(numtime1, numtime2)
for topic_id in topic_ids:
random_num = random.randint(1, 2)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment