Commit 05ec711d authored by 李小芳's avatar 李小芳

Merge branch 'save_have_xiafa' into 'master'

Save have xiafa

See merge request !22
parents 74bea2fb 1026edae
...@@ -35,7 +35,7 @@ def kafka_consum(topic_name=None): ...@@ -35,7 +35,7 @@ def kafka_consum(topic_name=None):
# consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id])) # consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=50) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
for msg_value in msg_dict.values(): for msg_value in msg_dict.values():
for msg in msg_value: for msg in msg_value:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
...@@ -70,7 +70,6 @@ def kafka_consum(topic_name=None): ...@@ -70,7 +70,6 @@ def kafka_consum(topic_name=None):
auto_follow_user(card_info, after_day=True) auto_follow_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_follow_user子函数消费处理耗时:%f" % ( logging.info("当前卡片ID:%s,auto_follow_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin)) card_info['card_id'], time.time() - begin))
else: else:
pass pass
...@@ -155,14 +154,13 @@ def kafka_consum(topic_name=None): ...@@ -155,14 +154,13 @@ def kafka_consum(topic_name=None):
auto_follow_user(card_info) auto_follow_user(card_info)
else: else:
logging.info("这条数据将被paas:%s" % card_info) logging.info("这条数据将被paas:%s" % card_info)
else: # push_time时间未到 需要等待 else: # push_time时间未到 需要等待
logging.info("follow [%s,%s,%s,%s],push_time未到,需要等待" % ( logging.info("follow [%s,%s,%s,%s],push_time未到,需要等待" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key))) str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
save_data_to_kafka(card_info) save_data_to_kafka(card_info)
pass
else: else:
logging.info("此条数据已经消费过了:%s" % card_info) logging.info("此条数据已经消费过了:%s" % card_info)
pass
logging.info("消费处理耗时:%f" % (time.time() - begin)) logging.info("消费处理耗时:%f" % (time.time() - begin))
except: except:
consumser_obj.close() consumser_obj.close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment