Commit beef3be5 authored by lixiaofang's avatar lixiaofang

add;

parent 2f28de8e
......@@ -69,7 +69,6 @@ def auto_click_user(card_info, after_day=False):
card_info['all_push_time'] = get_time
card_info["need_pust_num"] = len(userids)
card_info["have_pust_num"] = 0
logging.info("get card_info:%s" % card_info)
save_data_to_kafka(card_info)
break
else:
......@@ -91,7 +90,6 @@ def auto_click_user(card_info, after_day=False):
card_info["have_pust_num"] = card_info["have_pust_num"] + 1
save_data_to_kafka(card_info) # 存储数据
logging.info("get--------click--------------card_info:%s" % card_info)
except:
logging_exception()
......
......@@ -15,7 +15,6 @@ def click(card_info):
if redis_data:
datas = json.loads(str(redis_data, encoding="utf-8"))
if card_info['current_user_id'] in datas:
logging.info("当前需要去点赞的用户已经存在了:%s" % card_info)
return False
else:
......@@ -36,13 +35,11 @@ def click(card_info):
have_pust_num = int(redis_data.get('click_have_pust_num', card_info['have_pust_num']))
need_pust_num = int(redis_data.get('click_need_pust_num', card_info['need_pust_num']))
if have_pust_num > need_pust_num:
logging.info("当前下发次数:%s已经达上限%s次,不能再下发:%s" % (have_pust_num, need_pust_num, card_info))
return True
else:
redis_data['click_have_pust_num'] = have_pust_num + 1
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("当前下发次数:%s,上限是%s次,可以下发:%s" % (have_pust_num, need_pust_num, card_info))
else:
redis_data = {"click_have_pust_num": card_info['have_pust_num'],
......@@ -50,7 +47,6 @@ def click(card_info):
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:click,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
rpc_invoker = get_rpc_invoker()
try:
......
......@@ -93,7 +93,6 @@ def auto_comment_user(card_info, after_day=False):
break
repeat_time += 1
logging.info("get comment_have_get_after__time:%s" % card_info)
else: # 代表还有push好的时间没有下发完成 需要继续使用这些
current_user_id = card_info["current_user_id"]
card_info["all_follow_id"].remove(current_user_id)
......@@ -114,7 +113,6 @@ def auto_comment_user(card_info, after_day=False):
card_info['current_push_time'] = card_info['all_push_time'][0]
save_data_to_kafka(card_info) # 存储数据
logging.info("get-------comment---------------card_info:%s" % card_info)
except:
logging_exception()
......
......@@ -33,7 +33,6 @@ def comment(card_info):
redis_data = {"comment": 1}
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:comment,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
comment_num = redis_data["comment"]
if comment_num > 12:
......@@ -45,7 +44,6 @@ def comment(card_info):
answer_id=card_info['card_id'],
content=card_info[
'comment_content']).unwrap()
logging.info("get_card_info:%s,have_answer_reply:%s" % (card_info, status))
error = status.get('error', 1)
if error == 0 and comment_num <= 11:
return True, True
......@@ -74,7 +72,6 @@ def comment(card_info):
redis_data = {"comment": 1}
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:comment,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
comment_num = redis_data["comment"]
if comment_num > 12:
......@@ -86,7 +83,6 @@ def comment(card_info):
answer_id=card_info['card_id'],
content=card_info[
'comment_content']).unwrap()
logging.info("get_card_info:%s,have_answer_reply:%s" % (card_info, status))
error = status.get('error', 1)
if error == 0:
return True
......@@ -111,7 +107,6 @@ def comment(card_info):
if redis_data:
datas = json.loads(str(redis_data, encoding="utf-8"))
if card_info['comment_content'] in datas:
logging.info("当前评论和当前的用户已经存在了:%s" % card_info)
return False
else:
......@@ -134,13 +129,11 @@ def comment(card_info):
need_pust_num = int(redis_data.get('comment_need_pust_num', card_info['need_pust_num']))
if have_pust_num > need_pust_num:
logging.info("当前下发次数:%s已经达上限%s次,不能再下发:%s" % (have_pust_num, need_pust_num, card_info))
return True
else:
redis_data['comment_have_pust_num'] = have_pust_num + 1
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("当前下发次数:%s,上限是%s次,可以下发:%s" % (have_pust_num, need_pust_num, card_info))
else:
redis_data = {"comment_have_pust_num": card_info['have_pust_num'],
......@@ -148,8 +141,6 @@ def comment(card_info):
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:comment,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
rpc_invoker = get_rpc_invoker()
try:
......@@ -157,7 +148,8 @@ def comment(card_info):
answer_id=card_info['card_id'],
content=card_info[
'comment_content']).unwrap()
logging.info("get_card_info:%s,have_answer_reply:%s" % (card_info, status))
logging.info("get_card_info:%s,create_answer_reply:%s" % (card_info, status))
error = status.get('error', 1)
if error == 0:
return True
......
......@@ -73,8 +73,6 @@ def auto_follow_user(card_info, after_day=False):
card_info['all_push_time'] = get_time
card_info["need_pust_num"] = len(userids)
card_info["have_pust_num"] = 0
logging.info("get card_info:%s" % card_info)
save_data_to_kafka(card_info)
break
else:
......@@ -95,7 +93,6 @@ def auto_follow_user(card_info, after_day=False):
card_info['current_push_time'] = card_info['all_push_time'][0]
save_data_to_kafka(card_info) # 存储数据
logging.info("get-------follow---------------card_info:%s" % card_info)
except:
logging_exception()
......
......@@ -15,7 +15,6 @@ def follow(card_info):
if redis_data:
datas = json.loads(str(redis_data, encoding="utf-8"))
if card_info['current_user_id'] in datas:
logging.info("当前需要去关注的用户已经存在了:%s" % card_info)
return False
else:
......@@ -37,20 +36,17 @@ def follow(card_info):
need_pust_num = int(redis_data.get('follow_need_pust_num', card_info['need_pust_num']))
if have_pust_num > need_pust_num:
logging.info("当前下发次数:%s已经达上限%s次,不能再下发:%s" % (have_pust_num, need_pust_num, card_info))
return True
else:
redis_data['follow_have_pust_num'] = have_pust_num + 1
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("当前下发次数:%s,上限是%s次,可以下发:%s" % (have_pust_num, need_pust_num, card_info))
else:
redis_data = {"follow_have_pust_num": card_info['have_pust_num'],
"follow_need_pust_num": card_info['need_pust_num']}
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:follow,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
rpc_invoker = get_rpc_invoker()
try:
status = rpc_invoker['api/irrigation/user_add_follow'](follow_user_id=card_info['current_user_id'],
......
......@@ -23,15 +23,14 @@ from moment.views.process_time import judge_offset_partition_have_consum, judge_
import msgpack
from kafka import TopicPartition
clientid = "0"
clientid = "6"
def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=settings.KAFKA_BROKER_LIST, enable_auto_commit=True,
auto_commit_interval_ms=1000, group_id="vest", value_deserializer=msgpack.unpackb)
consumser_obj = KafkaConsumer(bootstrap_servers=settings.KAFKA_BROKER_LIST, group_id="vest", client_id=clientid)
partition = TopicPartition(topic_name, int(clientid))
consumser_obj.assignment([partition])
consumser_obj.assign([partition])
try:
# redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
# topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
......@@ -50,8 +49,8 @@ def kafka_consum(topic_name=None):
# bol_consum = judge_offset_partition_have_consum(card_info=card_info, offset=msg.offset,
# partition=msg.partition)
##在这里先判断当前的数据是否已经存在,存在的话直接PaaS,不存在的话再做下边的处理
bol_consum = judge_data_have_in_redis(card_info)
# bol_consum = True
# bol_consum = judge_data_have_in_redis(card_info)
bol_consum = True
if bol_consum:
logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key), card_info))
......@@ -67,17 +66,10 @@ def kafka_consum(topic_name=None):
if current_push_time == create_time:
if card_info['action_type'] == "comment":
auto_comment_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif card_info['action_type'] == "click":
auto_click_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_click_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif card_info['action_type'] == "follow":
auto_follow_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_follow_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
else:
pass
......@@ -86,20 +78,12 @@ def kafka_consum(topic_name=None):
if card_info['have_pust_num'] == card_info['need_pust_num']:
if nowtime.day - push_time_date.day == 0: # 今日的已经下发完了,需要去取之后的
action_type = card_info['action_type']
logging.info("今天已经下发完了[%s,%s,%s,%s]" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
if action_type == "comment":
auto_comment_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif action_type == "click":
auto_click_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
elif action_type == "follow":
auto_follow_user(card_info, after_day=True)
logging.info("当前卡片ID:%s,auto_comment_user子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
else:
pass
......@@ -112,15 +96,11 @@ def kafka_consum(topic_name=None):
if 'have_comment_number' in card_info and \
card_info['have_comment_number'] < 20:
card_info["have_comment_number"] += 1
logging.info("当前卡片ID:%s,comment1子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
is_success = comment(card_info)
logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset),
str(msg.key),
card_info["card_id"], is_success))
logging.info("当前卡片ID:%s,comment2子函数消费处理耗时:%f" % (
card_info['card_id'], time.time() - begin))
# 调完接口后需要再次去拿新的push_time的时间
if is_success:
auto_comment_user(card_info)
......@@ -130,15 +110,11 @@ def kafka_consum(topic_name=None):
elif action_type == "click": # 在这里去调点赞的接口
if 'have_click_number' in card_info:
card_info["have_click_number"] += 1
logging.info("当前卡片ID:%s,click1子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin))
is_success = click(card_info)
logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset),
str(msg.key),
card_info["card_id"], is_success))
logging.info("当前卡片ID:%s,click2子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin))
if is_success:
auto_click_user(card_info)
else:
......@@ -148,12 +124,7 @@ def kafka_consum(topic_name=None):
elif action_type == "follow": # 在这里去调关注的接口
if 'have_follow_number' in card_info:
card_info["have_follow_number"] += 1
logging.info("当前卡片ID:%s,follow1子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin))
is_success = follow(card_info)
logging.info("当前卡片ID:%s,follow2子函数消费处理耗时:%f" % (
card_info["card_id"], time.time() - begin))
logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (
str(msg.topic), str(msg.partition), str(msg.offset),
str(msg.key),
......
......@@ -352,7 +352,6 @@ def judge_offset_partition_have_consum(card_info=None, offset=0, partition=0):
datetime_create_time = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S')
str_data = str(datetime_create_time.year) + str(datetime_create_time.month) + str(datetime_create_time.day)
if str_data in ['20191229', '20191230']:
logging.info("该日期的数据已经被删除啦")
return False
redis_list_data = 0
......@@ -377,7 +376,6 @@ def judge_data_have_in_redis(card_info=None):
str_push_datetime = str(push_datetime.year) + str(push_datetime.month) + str(push_datetime.day)
if str_today != str_push_datetime:
logging.info("当前数据可以保留继续执行")
return True
if card_info['action_type'] == 'comment':
......@@ -387,7 +385,6 @@ def judge_data_have_in_redis(card_info=None):
if redis_data:
datas = json.loads(str(redis_data, encoding="utf-8"))
if card_info['comment_content'] in datas:
logging.info("当前评论和当前的用户已经存在了:%s" % card_info)
return False
else:
......@@ -407,7 +404,6 @@ def judge_data_have_in_redis(card_info=None):
if redis_data:
datas = json.loads(str(redis_data, encoding="utf-8"))
if card_info['current_user_id'] in datas:
logging.info("当前需要去关注的用户已经存在了:%s" % card_info)
return False
else:
......@@ -426,7 +422,6 @@ def judge_data_have_in_redis(card_info=None):
if redis_data:
datas = json.loads(str(redis_data, encoding="utf-8"))
if card_info['current_user_id'] in datas:
logging.info("c:%s" % card_info)
return False
else:
......
......@@ -30,7 +30,7 @@ class Command(BaseCommand):
def handle(self, *args, **options):
try:
if len(options["sync_type"]) and options["sync_type"] == "auto_vest":
kafka_consum()
kafka_consum(clientid)
logging.info("add a log >>>>> auto_vest")
except:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment