Commit 96c25e9a authored by lixiaofang's avatar lixiaofang

auto_vest

parent 8427993c
...@@ -28,7 +28,7 @@ def click(card_info): ...@@ -28,7 +28,7 @@ def click(card_info):
logging.info("get redis_data:%s" % redis_data) logging.info("get redis_data:%s" % redis_data)
values = list(redis_data.values()) values = list(redis_data.values())
s = [True for i in values if i > 0] s = [True for i in values if i > 10]
if len(s) > 0: if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info)) send_email_tome(str(redis_data) + str(card_info))
......
...@@ -29,7 +29,7 @@ def comment(card_info): ...@@ -29,7 +29,7 @@ def comment(card_info):
logging.info("get redis_data:%s" % redis_data) logging.info("get redis_data:%s" % redis_data)
values = list(redis_data.values()) values = list(redis_data.values())
s = [True for i in values if i > 0] s = [True for i in values if i > 10]
if len(s) > 0: if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info)) send_email_tome(str(redis_data) + str(card_info))
......
...@@ -27,7 +27,7 @@ def follow(card_info): ...@@ -27,7 +27,7 @@ def follow(card_info):
logging.info("get redis_data:%s" % redis_data) logging.info("get redis_data:%s" % redis_data)
values = list(redis_data.values()) values = list(redis_data.values())
s = [True for i in values if i > 0] s = [True for i in values if i > 10]
if len(s) > 0: if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info)) send_email_tome(str(redis_data) + str(card_info))
......
...@@ -23,7 +23,8 @@ from kafka.structs import TopicPartition ...@@ -23,7 +23,8 @@ from kafka.structs import TopicPartition
def kafka_consum(topic_name=None): def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
consumser_obj = KafkaConsumer(topic_name,auto_offset_reset='largest',bootstrap_servers=[settings.KAFKA_BROKER_LIST], enable_auto_commit=True, consumser_obj = KafkaConsumer(topic_name, auto_offset_reset='largest',
bootstrap_servers=[settings.KAFKA_BROKER_LIST], enable_auto_commit=True,
auto_commit_interval_ms=1, group_id="vest") auto_commit_interval_ms=1, group_id="vest")
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
try: try:
...@@ -40,8 +41,9 @@ def kafka_consum(topic_name=None): ...@@ -40,8 +41,9 @@ def kafka_consum(topic_name=None):
for msg in consume_msg: for msg in consume_msg:
card_info = json.loads(msg.value) card_info = json.loads(msg.value)
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (str(msg.topic),str(msg.partition),str(msg.offset),str(msg.key),card_info)) logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
redis_client.hset(redis_topic_partition_name,msg.partition,msg.offset) str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
redis_client.hset(redis_topic_partition_name, msg.partition, msg.offset)
# 代表当天数据 # 代表当天数据
current_push_time = card_info['current_push_time'] current_push_time = card_info['current_push_time']
create_time = card_info['create_time'] create_time = card_info['create_time']
...@@ -89,7 +91,9 @@ def kafka_consum(topic_name=None): ...@@ -89,7 +91,9 @@ def kafka_consum(topic_name=None):
card_info['have_comment_number'] < 20: card_info['have_comment_number'] < 20:
card_info["have_comment_number"] += 1 card_info["have_comment_number"] += 1
is_success = comment(card_info) is_success = comment(card_info)
logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success)) logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),
card_info["card_id"], is_success))
# 调完接口后需要再次去拿新的push_time的时间 # 调完接口后需要再次去拿新的push_time的时间
auto_comment_user(card_info) auto_comment_user(card_info)
...@@ -98,7 +102,9 @@ def kafka_consum(topic_name=None): ...@@ -98,7 +102,9 @@ def kafka_consum(topic_name=None):
if 'have_click_number' in card_info and card_info['have_click_number'] < 20: if 'have_click_number' in card_info and card_info['have_click_number'] < 20:
card_info["have_click_number"] += 1 card_info["have_click_number"] += 1
is_success = click(card_info) is_success = click(card_info)
logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success)) logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),
card_info["card_id"], is_success))
auto_click_user(card_info) auto_click_user(card_info)
...@@ -108,7 +114,9 @@ def kafka_consum(topic_name=None): ...@@ -108,7 +114,9 @@ def kafka_consum(topic_name=None):
card_info['have_follow_number'] < 20: card_info['have_follow_number'] < 20:
card_info["have_follow_number"] += 1 card_info["have_follow_number"] += 1
is_success = follow(card_info) is_success = follow(card_info)
logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success)) logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),
card_info["card_id"], is_success))
auto_follow_user(card_info) auto_follow_user(card_info)
...@@ -117,7 +125,7 @@ def kafka_consum(topic_name=None): ...@@ -117,7 +125,7 @@ def kafka_consum(topic_name=None):
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key))) str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
save_data_to_kafka(card_info) save_data_to_kafka(card_info)
pass pass
logging.info("消费处理耗时:%f" % (time.time()-begin)) logging.info("消费处理耗时:%f" % (time.time() - begin))
except: except:
consumser_obj.close() consumser_obj.close()
logging_exception() logging_exception()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment