Commit 5e3e16ab authored by zhanglu's avatar zhanglu

Merge branch 'master' into feature/sun_topic2es

parents 5dd3fb7e 85d71bf0
......@@ -20,8 +20,8 @@ class KafkaManager(object):
if not cls.consumser_obj:
topic_name = cls.topic_name if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(bootstrap_servers=cls.kafka_broker_list)
cls.consumser_obj.subscribe([topic_name])
cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=cls.kafka_broker_list)
# cls.consumser_obj.subscribe([topic_name])
return cls.consumser_obj
......@@ -55,8 +55,9 @@ class CollectData(object):
LinUCB.init_device_id_linucb_info(redis_client, self.linucb_matrix_redis_prefix,device_id,recommend_tag_list)
else:
user_feature = user_feature if user_feature else self.user_feature
recommend_tag_list = LinUCB.linucb_recommend_tag(redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
recommend_tag_list = LinUCB.linucb_recommend_tag(device_id,redis_linucb_tag_data_dict,user_feature,list(redis_linucb_tag_data_dict.keys()))
logging.info("duan add,device_id:%s,recommend_tag_list:%s" % (str(device_id), str(recommend_tag_list)))
if len(recommend_tag_list) > 0:
tag_recommend_redis_key = self.linucb_recommend_redis_prefix + str(device_id)
redis_client.set(tag_recommend_redis_key, json.dumps(recommend_tag_list))
......@@ -82,71 +83,83 @@ class CollectData(object):
user_feature = [1,1]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
for ori_msg in kafka_consumer_obj:
try:
logging.info(ori_msg)
raw_val_dict = json.loads(ori_msg.value)
if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
device_id = raw_val_dict["device"]["device_id"]
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
str(device_id), str(topic_id), str(tag_list)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
else:
exposure_cards_list = list()
device_id = raw_val_dict["device"]["device_id"]
for item in exposure_cards_list:
if "card_id" not in item:
continue
exposure_topic_id = item["card_id"]
logging.info(
"consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=exposure_topic_id)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
is_click = 0
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("negative tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
str(device_id), str(exposure_topic_id), str(tag_list)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
while True:
msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
for msg_key in msg_dict:
consume_msg = msg_dict[msg_key]
for ori_msg in consume_msg:
try:
logging.info(ori_msg)
raw_val_dict = json.loads(ori_msg.value)
if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
device_id = raw_val_dict["device"]["device_id"]
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
is_click = 1
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
str(device_id), str(topic_id), str(tag_list)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
else:
exposure_cards_list = list()
device_id = raw_val_dict["device"]["device_id"]
exposure_topic_id_list = list()
for item in exposure_cards_list:
if "card_id" not in item:
continue
exposure_topic_id = item["card_id"]
logging.info(
"consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
exposure_topic_id_list.append(exposure_topic_id)
topic_tag_id_dict = dict()
tag_list = list()
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id__in=exposure_topic_id_list)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
if sql_item.topic_id not in topic_tag_id_dict:
topic_tag_id_dict[sql_item.topic_id] = list()
topic_tag_id_dict[sql_item.topic_id].append(sql_item.tag_id)
is_click = 0
is_vote = 0
reward = 1 if is_click or is_vote else 0
logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % (
str(device_id), str(topic_tag_id_dict)))
for tag_id in tag_list:
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
self.update_recommend_tag_list(device_id, user_feature)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return True
except:
......
......@@ -33,7 +33,7 @@ class LinUCB:
return list()
@classmethod
def linucb_recommend_tag(cls,redis_linucb_tag_data_dict,user_features_list,tag_list):
def linucb_recommend_tag(cls,device_id,redis_linucb_tag_data_dict,user_features_list,tag_list):
"""
:remark 获取推荐标签
:param redis_linucb_tag_data_dict:
......@@ -91,6 +91,7 @@ class LinUCB:
if len(top_tag_set) >= 10:
break
logging.info("duan add,device_id:%s,sorted_np_score_list:%s,np_score_dict:%s" % (str(device_id), str(sorted_np_score_list), str(np_score_dict)))
return list(top_tag_set)
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
......@@ -68,7 +68,7 @@ def get_home_recommend_topic_ids(user_id,device_id,offset,size,query=None,query_
recommend_topic_ids = []
have_read_topic_id_list = list()
if redis_field_val_list[0]:
if redis_field_val_list[0] and query is None:
have_read_topic_id_list = list(json.loads(redis_field_val_list[0]))
user_similar_score_redis_key = "physical:user_similar_score:user_id:" + str(user_id)
......
......@@ -30,7 +30,7 @@ class Celebrity(models.Model):
def get_pick_id_list(self):
try:
pick_id_list = list()
query_results = PickCelebrity.objects.filter(celebrity_id=self.id,is_deleted=False)
query_results = PickCelebrity.objects.using(settings.SLAVE_DB_NAME).filter(celebrity_id=self.id,is_deleted=False)
for data_item in query_results:
pick_id_list.append(data_item.pick_id)
......
......@@ -83,7 +83,7 @@ class Topic(models.Model):
try:
has_image = False
query_list = TopicImage.objects.filter(topic_id=self.id, is_deleted=False, is_online=True)
query_list = TopicImage.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, is_deleted=False, is_online=True)
if len(query_list) > 0:
has_image = True
......@@ -95,7 +95,7 @@ class Topic(models.Model):
def get_pick_id_info(self):
try:
pick_id_list = list()
query_list = PickTopic.objects.filter(topic_id=self.id, is_deleted=False)
query_list = PickTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, is_deleted=False)
for item in query_list:
pick_id_list.append(item.pick_id)
......@@ -109,8 +109,8 @@ class Topic(models.Model):
topic_tag_id_list = list()
edit_tag_id_list = list()
tag_id_list = TopicTag.objects.filter(topic_id=self.id).values_list("tag_id", flat=True)
tag_query_results = Tag.objects.filter(id__in=tag_id_list)
tag_id_list = TopicTag.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id).values_list("tag_id", flat=True)
tag_query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id__in=tag_id_list)
for tag_item in tag_query_results:
is_online=tag_item.is_online
is_deleted=tag_item.is_deleted
......@@ -131,7 +131,7 @@ class Topic(models.Model):
tag_name_list = list()
logging.info("get tag_id_list :%s" % tag_id_list)
for i in range(0, len(tag_name_list), 1000):
query_results = Tag.objects.filter(id__in=tag_id_list[i:i + 1000])
query_results = Tag.objects.using(settings.SLAVE_DB_NAME).filter(id__in=tag_id_list[i:i + 1000])
for item in query_results:
tag_name_list.append(item)
return tag_name_list
......@@ -145,7 +145,7 @@ class Topic(models.Model):
user_is_shadow = False
# 是否官方推荐用户
user_query_results = UserExtra.objects.filter(user_id=self.user_id)
user_query_results = UserExtra.objects.using(settings.SLAVE_DB_NAME).filter(user_id=self.user_id)
if user_query_results.count() > 0:
if user_query_results[0].is_recommend:
offline_score += 2.0
......@@ -164,9 +164,9 @@ class Topic(models.Model):
elif self.content_level == '3':
offline_score += 2.0
exposure_count = ActionSumAboutTopic.objects.filter(topic_id=self.id, data_type=1).count()
click_count = ActionSumAboutTopic.objects.filter(topic_id=self.id, data_type=2).count()
uv_num = ActionSumAboutTopic.objects.filter(topic_id=self.id, data_type=3).count()
exposure_count = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=1).count()
click_count = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=2).count()
uv_num = ActionSumAboutTopic.objects.using(settings.SLAVE_DB_NAME).filter(topic_id=self.id, data_type=3).count()
if exposure_count > 0:
offline_score += click_count / exposure_count
......
......@@ -37,7 +37,7 @@ class User(models.Model):
def get_is_recommend_flag(self):
is_shadow = False
is_recommend = False
query_sql = UserExtra.objects.filter(user_id=self.user_id, is_deleted=False, is_online=True)
query_sql = UserExtra.objects.using(settings.SLAVE_DB_NAME).filter(user_id=self.user_id, is_deleted=False, is_online=True)
for record in query_sql:
is_recommend = record.is_recommend
is_shadow = record.is_shadow
......@@ -48,7 +48,7 @@ class User(models.Model):
latest_topic_time_val = -1
# 获取该用户最新发帖时间
topic_records = Topic.objects.filter(user_id=self.user_id).order_by("-update_time").values_list("update_time",
topic_records = Topic.objects.using(settings.SLAVE_DB_NAME).filter(user_id=self.user_id).order_by("-update_time").values_list("update_time",
flat=True).first()
if topic_records:
tzlc_topic_update_time = tzlc(topic_records)
......@@ -65,7 +65,7 @@ class User(models.Model):
logging.info("get follow_user_id_list :%s" % follow_user_id_list)
for i in range(0, len(follow_user_id_list), 1000):
logging.info("get follow_user_id_list :%s" % follow_user_id_list[i:i + 1000])
sql_data_list = User.objects.filter(user_id__in=follow_user_id_list[i:i + 1000])
sql_data_list = User.objects.using(settings.SLAVE_DB_NAME).filter(user_id__in=follow_user_id_list[i:i + 1000])
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
......@@ -78,7 +78,7 @@ class User(models.Model):
def get_attention_group_id_list(self):
try:
attention_group_id_list = list()
query_results = GroupUserRole.objects.filter(is_online=True, user_id=self.user_id)
query_results = GroupUserRole.objects.using(settings.SLAVE_DB_NAME).filter(is_online=True, user_id=self.user_id)
for item in query_results:
item_dict = {
"group_id": item.group_id,
......@@ -93,13 +93,13 @@ class User(models.Model):
def get_pick_user_id_list(self):
pick_topic_id_list = list()
user_picks = self.user_pick.filter(is_deleted=False, is_pick=True)
user_picks = self.user_pick.using(settings.SLAVE_DB_NAME).filter(is_deleted=False, is_pick=True)
for user_pick in user_picks:
pick_topic_id_list.append(user_pick.picktopic_id)
pick_user_id_list = []
for i in range(0, len(pick_topic_id_list), 1000):
topic_sql_list = Topic.objects.filter(id__in=pick_topic_id_list[i:i + 1000])
topic_sql_list = Topic.objects.using(settings.SLAVE_DB_NAME).filter(id__in=pick_topic_id_list[i:i + 1000])
for topic_data in topic_sql_list:
pick_user_id_list.append(topic_data.user_id)
......@@ -107,7 +107,7 @@ class User(models.Model):
pick_user_detail_list = list()
for i in range(0, len(pick_user_id_list), 1000):
sql_data_list = User.objects.filter(user_id__in=pick_user_id_list[i:i + 1000])
sql_data_list = User.objects.using(settings.SLAVE_DB_NAME).filter(user_id__in=pick_user_id_list[i:i + 1000])
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
......@@ -119,16 +119,16 @@ class User(models.Model):
def get_same_group_user_id_list(self):
same_group_user_id_list = list()
group_items_list = GroupUserRole.objects.filter(user_id=self.user_id)
group_items_list = GroupUserRole.objects.using(settings.SLAVE_DB_NAME).filter(user_id=self.user_id)
for group_item in group_items_list:
group_id = group_item.group_id
user_items_list = GroupUserRole.objects.filter(group_id=group_id)
user_items_list = GroupUserRole.objects.using(settings.SLAVE_DB_NAME).filter(group_id=group_id)
for user_items_list in user_items_list:
same_group_user_id_list.append(user_items_list.user_id)
same_group_detail_list = list()
for i in range(0, len(same_group_user_id_list), 1000):
sql_data_list = User.objects.filter(user_id__in=same_group_user_id_list[i:i + 1000])
sql_data_list = User.objects.using(settings.SLAVE_DB_NAME).filter(user_id__in=same_group_user_id_list[i:i + 1000])
for detail_data in sql_data_list:
item = {
"user_id": detail_data.user_id,
......@@ -142,7 +142,7 @@ class User(models.Model):
try:
user_tag_id_list = list()
query_results = AccountUserTag.objects.filter(user=self.user_id, is_deleted=False)
query_results = AccountUserTag.objects.using(settings.SLAVE_DB_NAME).filter(user=self.user_id, is_deleted=False)
for item in query_results:
user_tag_id_list.append(item.tag_id)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment