Commit 251baa05 authored by 段英荣's avatar 段英荣

Merge branch 'similar_sort' into 'master'

modify

See merge request alpha/physical!162
parents a3516dc3 fe34b457
...@@ -20,8 +20,8 @@ class KafkaManager(object): ...@@ -20,8 +20,8 @@ class KafkaManager(object):
if not cls.consumser_obj: if not cls.consumser_obj:
topic_name = cls.topic_name if not topic_name else topic_name topic_name = cls.topic_name if not topic_name else topic_name
cls.consumser_obj = KafkaConsumer(bootstrap_servers=cls.kafka_broker_list) cls.consumser_obj = KafkaConsumer(topic_name,bootstrap_servers=cls.kafka_broker_list)
cls.consumser_obj.subscribe([topic_name]) # cls.consumser_obj.subscribe([topic_name])
return cls.consumser_obj return cls.consumser_obj
...@@ -82,79 +82,83 @@ class CollectData(object): ...@@ -82,79 +82,83 @@ class CollectData(object):
user_feature = [1,1] user_feature = [1,1]
kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name) kafka_consumer_obj = KafkaManager.get_kafka_consumer_ins(topic_name)
for ori_msg in kafka_consumer_obj: while True:
try: msg_dict = kafka_consumer_obj.poll(timeout_ms=100)
logging.info(ori_msg) for msg_key in msg_dict:
consume_msg = msg_dict[msg_key]
raw_val_dict = json.loads(ori_msg.value) for ori_msg in consume_msg:
try:
if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]: logging.info(ori_msg)
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
device_id = raw_val_dict["device"]["device_id"] raw_val_dict = json.loads(ori_msg.value)
logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id))) if "type" in raw_val_dict and "on_click_feed_topic_card" == raw_val_dict["type"]:
topic_id = raw_val_dict["params"]["business_id"] or raw_val_dict["params"]["topic_id"]
tag_list = list() device_id = raw_val_dict["device"]["device_id"]
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id)
for sql_item in sql_query_results: logging.info("consume topic_id:%s,device_id:%s" % (str(topic_id), str(device_id)))
tag_list.append(sql_item.tag_id)
tag_list = list()
is_click = 1 sql_query_results = TopicTag.objects.filter(is_online=True, topic_id=topic_id)
is_vote = 0 for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id)
reward = 1 if is_click or is_vote else 0
is_click = 1
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % ( is_vote = 0
str(device_id), str(topic_id), str(tag_list)))
for tag_id in tag_list: reward = 1 if is_click or is_vote else 0
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
logging.info("positive tag_list,device_id:%s,topic_id:%s,tag_list:%s" % (
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 str(device_id), str(topic_id), str(tag_list)))
self.update_recommend_tag_list(device_id, user_feature) for tag_id in tag_list:
elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"]) # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
elif isinstance(raw_val_dict["params"]["exposure_cards"],list): self.update_recommend_tag_list(device_id, user_feature)
exposure_cards_list = raw_val_dict["params"]["exposure_cards"] elif "type" in raw_val_dict and "page_precise_exposure" == raw_val_dict["type"]:
else: if isinstance(raw_val_dict["params"]["exposure_cards"],str):
exposure_cards_list = list() exposure_cards_list = json.loads(raw_val_dict["params"]["exposure_cards"])
device_id = raw_val_dict["device"]["device_id"] elif isinstance(raw_val_dict["params"]["exposure_cards"],list):
exposure_cards_list = raw_val_dict["params"]["exposure_cards"]
exposure_topic_id_list = list() else:
for item in exposure_cards_list: exposure_cards_list = list()
if "card_id" not in item: device_id = raw_val_dict["device"]["device_id"]
continue
exposure_topic_id = item["card_id"] exposure_topic_id_list = list()
logging.info( for item in exposure_cards_list:
"consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id))) if "card_id" not in item:
exposure_topic_id_list.append(exposure_topic_id) continue
exposure_topic_id = item["card_id"]
topic_tag_id_dict = dict() logging.info(
tag_list = list() "consume exposure topic_id:%s,device_id:%s" % (str(exposure_topic_id), str(device_id)))
sql_query_results = TopicTag.objects.filter(is_online=True, topic_id__in=exposure_topic_id_list) exposure_topic_id_list.append(exposure_topic_id)
for sql_item in sql_query_results:
tag_list.append(sql_item.tag_id) topic_tag_id_dict = dict()
tag_list = list()
if sql_item.topic_id not in topic_tag_id_dict: sql_query_results = TopicTag.objects.filter(is_online=True, topic_id__in=exposure_topic_id_list)
topic_tag_id_dict[sql_item.topic_id] = list() for sql_item in sql_query_results:
topic_tag_id_dict[sql_item.topic_id].append(sql_item.tag_id) tag_list.append(sql_item.tag_id)
is_click = 0 if sql_item.topic_id not in topic_tag_id_dict:
is_vote = 0 topic_tag_id_dict[sql_item.topic_id] = list()
topic_tag_id_dict[sql_item.topic_id].append(sql_item.tag_id)
reward = 1 if is_click or is_vote else 0
is_click = 0
logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % ( is_vote = 0
str(device_id), str(topic_tag_id_dict)))
for tag_id in tag_list: reward = 1 if is_click or is_vote else 0
self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
logging.info("negative tag_list,device_id:%s,topic_tag_id_dict:%s" % (
# 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后 str(device_id), str(topic_tag_id_dict)))
self.update_recommend_tag_list(device_id, user_feature) for tag_id in tag_list:
else: self.update_user_linucb_tag_info(reward, device_id, tag_id, user_feature)
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except: # 更新该用户的推荐tag数据,放在 更新完成user tag行为信息之后
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) self.update_recommend_tag_list(device_id, user_feature)
else:
logging.warning("unknown type msg:%s" % raw_val_dict.get("type", "missing type"))
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return True return True
except: except:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment