Commit 9a0d441a authored by lixiaofang's avatar lixiaofang

提升poll的数据

parents 5aa7237f fa2739c8
...@@ -19,6 +19,7 @@ from comment.views.comment_fun import comment ...@@ -19,6 +19,7 @@ from comment.views.comment_fun import comment
from follow.views.follow_fun import follow from follow.views.follow_fun import follow
from libs.cache import redis_client from libs.cache import redis_client
from kafka.structs import TopicPartition from kafka.structs import TopicPartition
from moment.views.process_time import judge_offset_partition_have_consum
def kafka_consum(topic_name=None): def kafka_consum(topic_name=None):
...@@ -35,117 +36,124 @@ def kafka_consum(topic_name=None): ...@@ -35,117 +36,124 @@ def kafka_consum(topic_name=None):
while True: while True:
begin = time.time() begin = time.time()
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=50) msg_dict = consumser_obj.poll(timeout_ms=100, max_records=50)
for msg_key in msg_dict: for msg_value in msg_dict.values():
consume_msg = msg_dict[msg_key] for msg in msg_value:
for msg in consume_msg:
card_info = json.loads(str(msg.value, encoding="utf8")) card_info = json.loads(str(msg.value, encoding="utf8"))
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % ( ###在这里去判断一下当前的partition和offset是否已经消费过了 如果已经消费了需要直接去掉数据
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info)) bol_consum = judge_offset_partition_have_consum(offset=msg.offset, partition=msg.partition)
redis_client.hset(redis_topic_partition_name, msg.partition, msg.offset)
# 代表当天数据
current_push_time = card_info['current_push_time']
create_time = card_info['create_time']
nowtime = datetime.datetime.now()
push_time_date = datetime.datetime.strptime(current_push_time, '%Y-%m-%d %H:%M:%S')
if push_time_date <= nowtime: # push_time已经到时间了 需要去下发
# 判断如果当前的push_time 和当前的创建时间一样 需要给push_time下发真的push时间
if current_push_time == create_time:
if card_info['action_type'] == "comment":
auto_comment_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif card_info['action_type'] == "click":
auto_click_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_click_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif card_info['action_type'] == "follow": if bol_consum:
auto_follow_user(card_info, after_day=True) logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
logging.info("当前卡片ID:%s,auto_follow_user子函数消费处理耗时:%f" % ( str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
card_info['card_id'], time.time() - begin)) redis_client.hset(redis_topic_partition_name, msg.partition, msg.offset)
# 代表当天数据
current_push_time = card_info['current_push_time']
create_time = card_info['create_time']
nowtime = datetime.datetime.now()
push_time_date = datetime.datetime.strptime(current_push_time, '%Y-%m-%d %H:%M:%S')
if push_time_date <= nowtime: # push_time已经到时间了 需要去下发
# 判断如果当前的push_time 和当前的创建时间一样 需要给push_time下发真的push时间
if current_push_time == create_time:
if card_info['action_type'] == "comment":
auto_comment_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif card_info['action_type'] == "click":
auto_click_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_click_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif card_info['action_type'] == "follow":
auto_follow_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_follow_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
else:
pass
else: else:
pass # 当前已下发完 需要判断最新一次的下发时间是否是今天 是的话直接保存起来,l=轮询等待明天再下发
if card_info['have_pust_num'] == card_info['need_pust_num']:
if nowtime.day - push_time_date.day == 0: # 今日的已经下发完了,需要去取之后的
action_type = card_info['action_type']
logging.info("今天已经下发完了[%s,%s,%s,%s]" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
if action_type == "comment":
auto_comment_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif action_type == "click":
auto_click_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif action_type == "follow":
auto_follow_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
else:
pass
else: else: # 还有一种是下发时间已到
# 当前已下发完 需要判断最新一次的下发时间是否是今天 是的话直接保存起来,l=轮询等待明天再下发
if card_info['have_pust_num'] == card_info['need_pust_num']:
if nowtime.day - push_time_date.day == 0: # 今日的已经下发完了,需要去取之后的
action_type = card_info['action_type'] action_type = card_info['action_type']
logging.info("今天已经下发完了[%s,%s,%s,%s]" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
if action_type == "comment":
auto_comment_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif action_type == "click":
auto_click_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif action_type == "follow":
auto_follow_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
else:
pass
else: # 还有一种是下发时间已到
action_type = card_info['action_type']
if card_info['have_pust_num'] < card_info['need_pust_num'] and \ if card_info['have_pust_num'] < card_info['need_pust_num'] and \
push_time_date < nowtime: push_time_date < nowtime:
if action_type == "comment": # 在这里去调评论的接口 if action_type == "comment": # 在这里去调评论的接口
if 'have_comment_number' in card_info and \ if 'have_comment_number' in card_info and \
card_info['have_comment_number'] < 20: card_info['have_comment_number'] < 20:
card_info["have_comment_number"] += 1 card_info["have_comment_number"] += 1
logging.info("当前卡片ID:%s,comment1子函数消费处理耗时:%f" % ( logging.info("当前卡片ID:%s,comment1子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin)) card_info['card_id'], time.time() - begin))
is_success = comment(card_info) is_success = comment(card_info)
logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % ( logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), str(msg.topic), str(msg.partition), str(msg.offset),
card_info["card_id"], is_success)) str(msg.key),
logging.info("当前卡片ID:%s,comment2子函数消费处理耗时:%f" % ( card_info["card_id"], is_success))
card_info['card_id'], time.time() - begin)) logging.info("当前卡片ID:%s,comment2子函数消费处理耗时:%f" % (
# 调完接口后需要再次去拿新的push_time的时间 card_info['card_id'], time.time() - begin))
auto_comment_user(card_info) # 调完接口后需要再次去拿新的push_time的时间
auto_comment_user(card_info)
elif action_type == "click": # 在这里去调点赞的接口 elif action_type == "click": # 在这里去调点赞的接口
if 'have_click_number' in card_info and card_info['have_click_number'] < 20: if 'have_click_number' in card_info and card_info[
card_info["have_click_number"] += 1 'have_click_number'] < 20:
logging.info("当前卡片ID:%s,click1子函数消费处理耗时:%f" % ( card_info["have_click_number"] += 1
card_info["card_id"], time.time() - begin)) logging.info("当前卡片ID:%s,click1子函数消费处理耗时:%f" % (
is_success = click(card_info) card_info["card_id"], time.time() - begin))
logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % ( is_success = click(card_info)
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
card_info["card_id"], is_success)) str(msg.topic), str(msg.partition), str(msg.offset),
logging.info("当前卡片ID:%s,click2子函数消费处理耗时:%f" % ( str(msg.key),
card_info["card_id"], time.time() - begin)) card_info["card_id"], is_success))
auto_click_user(card_info) logging.info("当前卡片ID:%s,click2子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin))
auto_click_user(card_info)
elif action_type == "follow": # 在这里去调关注的接口 elif action_type == "follow": # 在这里去调关注的接口
if 'have_follow_number' in card_info and \ if 'have_follow_number' in card_info and \
card_info['have_follow_number'] < 20: card_info['have_follow_number'] < 20:
card_info["have_follow_number"] += 1 card_info["have_follow_number"] += 1
logging.info("当前卡片ID:%s,follow1子函数消费处理耗时:%f" % ( logging.info("当前卡片ID:%s,follow1子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin)) card_info["card_id"], time.time() - begin))
is_success = follow(card_info) is_success = follow(card_info)
logging.info("当前卡片ID:%s,follow2子函数消费处理耗时:%f" % ( logging.info("当前卡片ID:%s,follow2子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin)) card_info["card_id"], time.time() - begin))
logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % ( logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), str(msg.topic), str(msg.partition), str(msg.offset),
card_info["card_id"], is_success)) str(msg.key),
card_info["card_id"], is_success))
auto_follow_user(card_info) auto_follow_user(card_info)
else: # push_time时间未到 需要等待 else: # push_time时间未到 需要等待
logging.info("follow [%s,%s,%s,%s],push_time未到,需要等待" % ( logging.info("follow [%s,%s,%s,%s],push_time未到,需要等待" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key))) str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
save_data_to_kafka(card_info) save_data_to_kafka(card_info)
pass pass
logging.info("消费处理耗时:%f" % (time.time() - begin)) logging.info("消费处理耗时:%f" % (time.time() - begin))
except: except:
consumser_obj.close() consumser_obj.close()
......
...@@ -60,20 +60,20 @@ def get_one_six_days_random_time(frmt='%Y-%m-%d %H:%M:%S', num_days=0, action_ty ...@@ -60,20 +60,20 @@ def get_one_six_days_random_time(frmt='%Y-%m-%d %H:%M:%S', num_days=0, action_ty
action_num = random.randint(5, 10) action_num = random.randint(5, 10)
if num_days <= 15 and num_days > 1 and action_type in ("follow", "click") and int(content_level) < 3: if num_days <= 15 and num_days > 1 and action_type in ("follow", "click") and int(content_level) < 3:
action_num = random.randint(0, 1) action_num = random.randint(1, 2)
if num_days <= 15 and num_days > 1 and action_type in ("follow") and int(content_level) >= 3: if num_days <= 15 and num_days > 1 and action_type in ("follow") and int(content_level) >= 3:
action_num = random.randint(0, 5) action_num = random.randint(1, 5)
if num_days == 1 and action_type in ("click") and int(content_level) >= 3: if num_days == 1 and action_type in ("click") and int(content_level) >= 3:
action_num = random.randint(6, 12) action_num = random.randint(6, 12)
if num_days <= 15 and num_days > 1 and action_type in ("click") and int(content_level) >= 3: if num_days <= 15 and num_days > 1 and action_type in ("click") and int(content_level) >= 3:
action_num = random.randint(0, 6) action_num = random.randint(1, 6)
if num_days >= 1 and num_days <= 6 and action_type in ("comment"): if num_days >= 1 and num_days <= 6 and action_type in ("comment"):
if int(content_level) <= 3: if int(content_level) <= 3:
action_num = random.randint(0, 1) action_num = random.randint(1, 2)
else: else:
action_num = random.randint(2, 4) action_num = random.randint(2, 4)
...@@ -104,24 +104,24 @@ def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', conte ...@@ -104,24 +104,24 @@ def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', conte
lastday = datetime.datetime(now.year, now.month, now.day, 23, 0, 0) lastday = datetime.datetime(now.year, now.month, now.day, 23, 0, 0)
add_number = 0 add_number = 0
if num_days > 15 and action_type in ("follow"): if num_days > 15 and action_type in ("follow"):
action_num = random.randint(0, 2) action_num = random.randint(1, 2)
add_number = 10 add_number = 10
elif num_days > 15 and action_type in ("click"): elif num_days > 15 and action_type in ("click"):
if content_level < 3: if content_level < 3:
action_num = random.randint(0, 1) action_num = random.randint(1, 2)
add_number = 6 add_number = 6
else: else:
action_num = random.randint(0, 2) action_num = random.randint(1, 2)
add_number = 5 add_number = 5
elif num_days > 6 and action_type in ("comment"): elif num_days > 6 and action_type in ("comment"):
if content_level <= 3: if content_level <= 3:
action_num = random.randint(0, 1) action_num = 1
add_number = 10 add_number = 10
else: else:
action_num = random.randint(0, 2) action_num = random.randint(1, 2)
add_number = 10 add_number = 10
else: else:
...@@ -310,3 +310,26 @@ def get_vest_userid_and_comment(need_comment_num=0, tag_names=[], card_id=0): ...@@ -310,3 +310,26 @@ def get_vest_userid_and_comment(need_comment_num=0, tag_names=[], card_id=0):
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return [] return []
def judge_offset_partition_have_consum(offset=0, partition=0):
"""
根据当前的offset和分区去判断数据是否已经被消费
:param offset:
:param partition:
:return:
"""
try:
redis_list_data = set()
key = "irrigation_partition_offset_have_consum:" + str(partition)
redis_data = redis_client.get(key)
if redis_data:
redis_list_data = set(json.loads(redis_data))
if offset in redis_list_data:
return False
redis_list_data.add(offset)
redis_client.set(key, json.dumps(list(redis_list_data)))
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment