Commit a8a64ccf authored by lixiaofang's avatar lixiaofang

判断等级

parent 51fb4d05
...@@ -28,7 +28,7 @@ def kafka_consum(topic_name=None): ...@@ -28,7 +28,7 @@ def kafka_consum(topic_name=None):
auto_commit_interval_ms=1, group_id="vest") auto_commit_interval_ms=1, group_id="vest")
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
try: try:
redis_topic_partition_name = "vest:topic_name:" + str(topic_name) # redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
# topic_partition_info = redis_client.hgetall(redis_topic_partition_name) # topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
# for partition_id in topic_partition_info: # for partition_id in topic_partition_info:
# print (int(partition_id.decode()),topic_name) # print (int(partition_id.decode()),topic_name)
...@@ -46,7 +46,7 @@ def kafka_consum(topic_name=None): ...@@ -46,7 +46,7 @@ def kafka_consum(topic_name=None):
if bol_consum: if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % ( logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info)) str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
redis_client.hset(redis_topic_partition_name, msg.partition, msg.offset) # redis_client.hset(redis_topic_partition_name, msg.partition, msg.offset)
# 代表当天数据 # 代表当天数据
current_push_time = card_info['current_push_time'] current_push_time = card_info['current_push_time']
create_time = card_info['create_time'] create_time = card_info['create_time']
...@@ -154,6 +154,9 @@ def kafka_consum(topic_name=None): ...@@ -154,6 +154,9 @@ def kafka_consum(topic_name=None):
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key))) str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
save_data_to_kafka(card_info) save_data_to_kafka(card_info)
pass pass
else:
logging.info("此条数据已经消费过了")
pass
logging.info("消费处理耗时:%f" % (time.time() - begin)) logging.info("消费处理耗时:%f" % (time.time() - begin))
except: except:
consumser_obj.close() consumser_obj.close()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment