Commit 6a5e4f63 authored by lixiaofang's avatar lixiaofang

增加日志信息

parent cb8b142e
......@@ -22,19 +22,18 @@ from follow.views.follow_fun import follow
def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=[settings.KAFKA_BROKER_LIST], enable_auto_commit=True,
auto_commit_interval_ms=100, group_id="vest")
auto_commit_interval_ms=1, group_id="vest")
consumser_obj.subscribe([topic_name, ])
try:
while True:
begin = datetime.datetime.now()
begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
for msg_key in msg_dict:
consume_msg = msg_dict[msg_key]
for msg in consume_msg:
card_info = json.loads(msg.value)
if card_info['card_type'] == "auto_vest":
logging.info("+++++++++++++++++++++++++新的数据进来了+++++++++++++++++++++++")
logging.info("get card_info:%s" % card_info)
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (str(msg.topic),str(msg.partition),str(msg.offset),str(msg.key),card_info))
# 代表当天数据
current_push_time = card_info['current_push_time']
create_time = card_info['create_time']
......@@ -60,7 +59,8 @@ def kafka_consum(topic_name=None):
if card_info['have_pust_num'] == card_info['need_pust_num']:
if nowtime.day - push_time_date.day == 0: # 今日的已经下发完了,需要去取之后的
action_type = card_info['action_type']
logging.info("get-------今天已经下发完了----------------")
logging.info("今天已经下发完了[%s,%s,%s,%s]" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
if action_type == "comment":
auto_comment_user(card_info, after_day=True)
elif action_type == "click":
......@@ -81,8 +81,7 @@ def kafka_consum(topic_name=None):
card_info['have_comment_number'] < 20:
card_info["have_comment_number"] += 1
is_success = comment(card_info)
logging.info("当前ID:%s,下发状状态:%s" % (card_info["card_id"], is_success))
logging.info("get------xiafa------------vestcomment:%s" % card_info)
logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success))
# 调完接口后需要再次去拿新的push_time的时间
auto_comment_user(card_info)
......@@ -91,8 +90,7 @@ def kafka_consum(topic_name=None):
if 'have_click_number' in card_info and card_info['have_click_number'] < 20:
card_info["have_click_number"] += 1
is_success = click(card_info)
logging.info("当前ID:%s,下发状状态:%s" % (card_info["card_id"], is_success))
logging.info("get------xiafa------------vestclick:%s" % card_info)
logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success))
auto_click_user(card_info)
......@@ -102,17 +100,16 @@ def kafka_consum(topic_name=None):
card_info['have_follow_number'] < 20:
card_info["have_follow_number"] += 1
is_success = follow(card_info)
logging.info("当前ID:%s,下发状状态:%s" % (card_info["card_id"], is_success))
logging.info("get------xiafa------------vestfollow:%s" % card_info)
logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success))
auto_follow_user(card_info)
else: # push_time时间未到 需要等待
logging.info("push_time时间未到========================需要等待")
logging.info("follow [%s,%s,%s,%s],push_time未到,需要等待" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
save_data_to_kafka(card_info)
pass
print((datetime.datetime.now() - begin).seconds)
logging.info("消费处理耗时:%f" % (time.time()-begin))
except:
consumser_obj.close()
logging_exception()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment