Commit aaa065f1 authored by 李小芳's avatar 李小芳

Merge branch 'test' into 'master'

Test

See merge request !2
parents 3757ad26 8427993c
...@@ -48,20 +48,16 @@ def auto_click_user(card_info, after_day=False): ...@@ -48,20 +48,16 @@ def auto_click_user(card_info, after_day=False):
"all_follow_id" not in card_info and "all_push_time" not in card_info): "all_follow_id" not in card_info and "all_push_time" not in card_info):
###根据创建时间回答等级去获取对应的下发时间 ###根据创建时间回答等级去获取对应的下发时间
repeat_time = 1
while True: while True:
repeat_time = 1
if card_info['type'] == "get_write_answer_userinfo": if card_info['type'] == "get_write_answer_userinfo":
repeat_time = 0 repeat_time = 0
card_info['have_click_number'] = 0 card_info['have_click_number'] = 0
get_time, time_region = get_click_follow_time_by_create_time(create_time, content_level, get_time, time_region = get_click_follow_time_by_create_time(create_time, content_level,
action_type="click", action_type="click",
after_day=after_day, after_day=after_day,
card_info=card_info, card_info=card_info,
repeat_time=repeat_time) repeat_time=repeat_time)
if len(get_time) > 0 and time_region != 3: if len(get_time) > 0 and time_region != 3:
###拿到下发时间后根据下发的时间个数去拿对应个数的马甲账号 ###拿到下发时间后根据下发的时间个数去拿对应个数的马甲账号
userids = get_vest_userid(need_comment_num=len(get_time)) userids = get_vest_userid(need_comment_num=len(get_time))
...@@ -81,7 +77,6 @@ def auto_click_user(card_info, after_day=False): ...@@ -81,7 +77,6 @@ def auto_click_user(card_info, after_day=False):
repeat_time += 1 repeat_time += 1
else: # 代表还有push好的时间没有下发完成 需要继续使用这些 else: # 代表还有push好的时间没有下发完成 需要继续使用这些
current_user_id = card_info["current_user_id"] current_user_id = card_info["current_user_id"]
card_info["all_follow_id"].remove(current_user_id) card_info["all_follow_id"].remove(current_user_id)
current_push_time = card_info['current_push_time'] current_push_time = card_info['current_push_time']
......
...@@ -2,13 +2,35 @@ from libs.rpc import get_rpc_invoker ...@@ -2,13 +2,35 @@ from libs.rpc import get_rpc_invoker
import traceback import traceback
from libs.error import logging_exception from libs.error import logging_exception
import logging import logging
from libs.cache import redis_client
import json
from moment.views.send_email import send_email_tome
def click(card_info): def click(card_info):
try: try:
rpc_invoker = get_rpc_invoker() rpc_invoker = get_rpc_invoker()
rpc_invoker['qa/irrigation/create_answer_vote'](user_id=card_info['current_user_id'], rpc_invoker['qa/irrigation/create_answer_vote'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id']).unwrap() answer_id=card_info['card_id']).unwrap()
key = "auto_vest_one_user_action:" + str(card_info['card_id'])
redis_data = redis_client.get(key)
if redis_data:
redis_data = json.loads(redis_data)
click_num = int(redis_data.get("click")) + 1
redis_data['click'] = click_num
redis_client.set(key, json.dumps(redis_data))
else:
redis_data = {"click": 1, "follow": 0, "comment": 0}
redis_client.set(key, json.dumps(redis_data))
redis_client.expire(key, time=24 * 60 * 60)
logging.info("get redis_data:%s" % redis_data)
values = list(redis_data.values())
s = [True for i in values if i > 0]
if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info))
return True return True
except: except:
......
...@@ -57,13 +57,11 @@ def auto_comment_user(card_info, after_day=False): ...@@ -57,13 +57,11 @@ def auto_comment_user(card_info, after_day=False):
"all_follow_id" not in card_info and "all_push_time" not in card_info and card_info[ "all_follow_id" not in card_info and "all_push_time" not in card_info and card_info[
'type'] == "get_write_answer_userinfo"): 'type'] == "get_write_answer_userinfo"):
repeat_time = 1
while True: while True:
repeat_time = 1
if card_info['type'] == "get_write_answer_userinfo": if card_info['type'] == "get_write_answer_userinfo":
repeat_time = 0 repeat_time = 0
card_info['have_comment_number'] = 0 card_info['have_comment_number'] = 0
get_time, time_region = get_content_time_by_create_time(create_time, content_level, get_time, time_region = get_content_time_by_create_time(create_time, content_level,
action_type="comment", action_type="comment",
after_day=after_day, after_day=after_day,
......
...@@ -2,6 +2,9 @@ from libs.rpc import get_rpc_invoker ...@@ -2,6 +2,9 @@ from libs.rpc import get_rpc_invoker
import traceback import traceback
from libs.error import logging_exception from libs.error import logging_exception
import logging import logging
from libs.cache import redis_client
import json
from moment.views.send_email import send_email_tome
def comment(card_info): def comment(card_info):
...@@ -10,6 +13,26 @@ def comment(card_info): ...@@ -10,6 +13,26 @@ def comment(card_info):
rpc_invoker['qa/irrigation/create_answer_reply'](user_id=card_info['current_user_id'], rpc_invoker['qa/irrigation/create_answer_reply'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id'], answer_id=card_info['card_id'],
content=card_info['comment_content']).unwrap() content=card_info['comment_content']).unwrap()
####
key = "auto_vest_one_user_action:" + str(card_info['card_id'])
redis_data = redis_client.get(key)
if redis_data:
redis_data = json.loads(redis_data)
click_num = int(redis_data.get("comment")) + 1
redis_data['comment'] = click_num
redis_client.set(key, json.dumps(redis_data))
else:
redis_data = {"click": 0, "follow": 0, "comment": 1}
redis_client.set(key, json.dumps(redis_data))
redis_client.expire(key, time=24 * 60 * 60)
logging.info("get redis_data:%s" % redis_data)
values = list(redis_data.values())
s = [True for i in values if i > 0]
if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info))
return True return True
except: except:
logging_exception() logging_exception()
......
...@@ -49,9 +49,8 @@ def auto_follow_user(card_info, after_day=False): ...@@ -49,9 +49,8 @@ def auto_follow_user(card_info, after_day=False):
"all_follow_id" not in card_info and "all_push_time" not in card_info): "all_follow_id" not in card_info and "all_push_time" not in card_info):
###根据创建时间回答等级去获取对应的下发时间 ###根据创建时间回答等级去获取对应的下发时间
repeat_time = 1
while True: while True:
repeat_time = 1
if card_info['type'] == "get_write_answer_userinfo": if card_info['type'] == "get_write_answer_userinfo":
repeat_time = 0 repeat_time = 0
card_info['have_follow_number'] = 0 card_info['have_follow_number'] = 0
...@@ -80,12 +79,7 @@ def auto_follow_user(card_info, after_day=False): ...@@ -80,12 +79,7 @@ def auto_follow_user(card_info, after_day=False):
if time_region == 3: if time_region == 3:
break break
repeat_time += 1 repeat_time += 1
logging.info("get have_get_after__time:%s" % card_info)
else: # 代表还有push好的时间没有下发完成 需要继续使用这些 else: # 代表还有push好的时间没有下发完成 需要继续使用这些
logging.info("get-----follow----card_info:%s" % card_info)
card_info["have_pust_num"] = card_info["have_pust_num"] + 1 card_info["have_pust_num"] = card_info["have_pust_num"] + 1
current_user_id = card_info["current_user_id"] current_user_id = card_info["current_user_id"]
card_info["all_follow_id"].remove(current_user_id) card_info["all_follow_id"].remove(current_user_id)
......
...@@ -2,6 +2,9 @@ from libs.rpc import get_rpc_invoker ...@@ -2,6 +2,9 @@ from libs.rpc import get_rpc_invoker
import traceback import traceback
from libs.error import logging_exception from libs.error import logging_exception
import logging import logging
from libs.cache import redis_client
import json
from moment.views.send_email import send_email_tome
def follow(card_info): def follow(card_info):
...@@ -10,6 +13,24 @@ def follow(card_info): ...@@ -10,6 +13,24 @@ def follow(card_info):
rpc_invoker['api/irrigation/user_add_follow'](follow_user_id=card_info['current_user_id'], rpc_invoker['api/irrigation/user_add_follow'](follow_user_id=card_info['current_user_id'],
followed_user_id=card_info['card_user_id']).unwrap() followed_user_id=card_info['card_user_id']).unwrap()
key = "auto_vest_one_user_action:" + str(card_info['card_id'])
redis_data = redis_client.get(key)
if redis_data:
redis_data = json.loads(redis_data)
click_num = int(redis_data.get("follow")) + 1
redis_data['follow'] = click_num
redis_client.set(key, json.dumps(redis_data))
else:
redis_data = {"click": 0, "follow": 1, "comment": 0}
redis_client.set(key, json.dumps(redis_data))
redis_client.expire(key, time=24 * 60 * 60)
logging.info("get redis_data:%s" % redis_data)
values = list(redis_data.values())
s = [True for i in values if i > 0]
if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info))
return True return True
except: except:
logging_exception() logging_exception()
......
...@@ -17,24 +17,31 @@ from libs.error import logging_exception ...@@ -17,24 +17,31 @@ from libs.error import logging_exception
from click.views.click_fun import click from click.views.click_fun import click
from comment.views.comment_fun import comment from comment.views.comment_fun import comment
from follow.views.follow_fun import follow from follow.views.follow_fun import follow
from libs.cache import redis_client
from kafka.structs import TopicPartition
def kafka_consum(topic_name=None): def kafka_consum(topic_name=None):
topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name topic_name = settings.KAFKA_TOPIC_NAME if not topic_name else topic_name
consumser_obj = KafkaConsumer(topic_name, bootstrap_servers=[settings.KAFKA_BROKER_LIST], enable_auto_commit=True, consumser_obj = KafkaConsumer(topic_name,auto_offset_reset='largest',bootstrap_servers=[settings.KAFKA_BROKER_LIST], enable_auto_commit=True,
auto_commit_interval_ms=100, group_id="vest") auto_commit_interval_ms=1, group_id="vest")
consumser_obj.subscribe([topic_name, ]) consumser_obj.subscribe([topic_name, ])
try: try:
redis_topic_partition_name = "vest:topic_name:" + str(topic_name)
# topic_partition_info = redis_client.hgetall(redis_topic_partition_name)
# for partition_id in topic_partition_info:
# print (int(partition_id.decode()),topic_name)
# consumser_obj.seek(partition=TopicPartition(topic_name,int(partition_id.decode())),offset=int(topic_partition_info[partition_id]))
while True: while True:
msg_dict = consumser_obj.poll(timeout_ms=100, max_records=50) begin = time.time()
consumser_obj.commit_async() msg_dict = consumser_obj.poll(timeout_ms=100, max_records=30)
for msg_key in msg_dict: for msg_key in msg_dict:
consume_msg = msg_dict[msg_key] consume_msg = msg_dict[msg_key]
for msg in consume_msg: for msg in consume_msg:
card_info = json.loads(msg.value) card_info = json.loads(msg.value)
if card_info['card_type'] == "auto_vest": if card_info['card_type'] == "auto_vest":
logging.info("+++++++++++++++++++++++++新的数据进来了+++++++++++++++++++++++") logging.info("消费到新数据了[%s,%s,%s,%s],get card_info:%s" % (str(msg.topic),str(msg.partition),str(msg.offset),str(msg.key),card_info))
logging.info("get card_info:%s" % card_info) redis_client.hset(redis_topic_partition_name,msg.partition,msg.offset)
# 代表当天数据 # 代表当天数据
current_push_time = card_info['current_push_time'] current_push_time = card_info['current_push_time']
create_time = card_info['create_time'] create_time = card_info['create_time']
...@@ -60,7 +67,8 @@ def kafka_consum(topic_name=None): ...@@ -60,7 +67,8 @@ def kafka_consum(topic_name=None):
if card_info['have_pust_num'] == card_info['need_pust_num']: if card_info['have_pust_num'] == card_info['need_pust_num']:
if nowtime.day - push_time_date.day == 0: # 今日的已经下发完了,需要去取之后的 if nowtime.day - push_time_date.day == 0: # 今日的已经下发完了,需要去取之后的
action_type = card_info['action_type'] action_type = card_info['action_type']
logging.info("get-------今天已经下发完了----------------") logging.info("今天已经下发完了[%s,%s,%s,%s]" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
if action_type == "comment": if action_type == "comment":
auto_comment_user(card_info, after_day=True) auto_comment_user(card_info, after_day=True)
elif action_type == "click": elif action_type == "click":
...@@ -81,8 +89,7 @@ def kafka_consum(topic_name=None): ...@@ -81,8 +89,7 @@ def kafka_consum(topic_name=None):
card_info['have_comment_number'] < 20: card_info['have_comment_number'] < 20:
card_info["have_comment_number"] += 1 card_info["have_comment_number"] += 1
is_success = comment(card_info) is_success = comment(card_info)
logging.info("当前ID:%s,下发状状态:%s" % (card_info["card_id"], is_success)) logging.info("comment [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success))
logging.info("get------xiafa------------vestcomment:%s" % card_info)
# 调完接口后需要再次去拿新的push_time的时间 # 调完接口后需要再次去拿新的push_time的时间
auto_comment_user(card_info) auto_comment_user(card_info)
...@@ -91,8 +98,7 @@ def kafka_consum(topic_name=None): ...@@ -91,8 +98,7 @@ def kafka_consum(topic_name=None):
if 'have_click_number' in card_info and card_info['have_click_number'] < 20: if 'have_click_number' in card_info and card_info['have_click_number'] < 20:
card_info["have_click_number"] += 1 card_info["have_click_number"] += 1
is_success = click(card_info) is_success = click(card_info)
logging.info("当前ID:%s,下发状状态:%s" % (card_info["card_id"], is_success)) logging.info("click [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success))
logging.info("get------xiafa------------vestclick:%s" % card_info)
auto_click_user(card_info) auto_click_user(card_info)
...@@ -102,16 +108,16 @@ def kafka_consum(topic_name=None): ...@@ -102,16 +108,16 @@ def kafka_consum(topic_name=None):
card_info['have_follow_number'] < 20: card_info['have_follow_number'] < 20:
card_info["have_follow_number"] += 1 card_info["have_follow_number"] += 1
is_success = follow(card_info) is_success = follow(card_info)
logging.info("当前ID:%s,下发状状态:%s" % (card_info["card_id"], is_success)) logging.info("follow [%s,%s,%s,%s],当前ID:%s,下发状状态:%s" % (str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key),card_info["card_id"], is_success))
logging.info("get------xiafa------------vestfollow:%s" % card_info)
auto_follow_user(card_info) auto_follow_user(card_info)
else: # push_time时间未到 需要等待 else: # push_time时间未到 需要等待
logging.info("push_time时间未到========================需要等待") logging.info("follow [%s,%s,%s,%s],push_time未到,需要等待" % (
str(msg.topic), str(msg.partition), str(msg.offset), str(msg.key)))
save_data_to_kafka(card_info) save_data_to_kafka(card_info)
pass pass
logging.info("消费处理耗时:%f" % (time.time()-begin))
except: except:
consumser_obj.close() consumser_obj.close()
logging_exception() logging_exception()
......
...@@ -15,6 +15,7 @@ import pymysql ...@@ -15,6 +15,7 @@ import pymysql
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKER_LIST)
def strTimeProp(start, end, prop, frmt): def strTimeProp(start, end, prop, frmt):
...@@ -91,9 +92,10 @@ def get_one_six_days_random_time(frmt='%Y-%m-%d %H:%M:%S', num_days=0, action_ty ...@@ -91,9 +92,10 @@ def get_one_six_days_random_time(frmt='%Y-%m-%d %H:%M:%S', num_days=0, action_ty
return [] return []
def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', action_type=None, content_level=0, def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', content_level=0,
content_day_need_add_one_day=False): content_day_need_add_one_day=False, action_type=None, repeat_time=1):
try: try:
if num_days == None: if num_days == None:
return [] return []
##比较当前时间和最后一次创建时间的差 ##比较当前时间和最后一次创建时间的差
...@@ -126,14 +128,14 @@ def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', actio ...@@ -126,14 +128,14 @@ def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', actio
pass pass
if content_day_need_add_one_day == True: if content_day_need_add_one_day == True:
start_time = zeroday + datetime.timedelta(days=add_number + 1) start_time = zeroday + datetime.timedelta(days=add_number * repeat_time)
end_time = lastday + datetime.timedelta(days=add_number + 1) end_time = lastday + datetime.timedelta(days=add_number * repeat_time)
else: else:
start_time = zeroday + datetime.timedelta(days=add_number) start_time = zeroday + datetime.timedelta(days=add_number)
end_time = lastday + datetime.timedelta(days=add_number) end_time = lastday + datetime.timedelta(days=add_number)
random_times = [randomDate_six_one(start_time, end_time, frmt) for _ in range(action_num)] random_times = [randomDate_six_one(str(start_time), str(end_time), frmt) for _ in range(action_num)]
have_sort_times = sorted(random_times, key=lambda date: get_list(date)) have_sort_times = sorted(random_times, key=lambda date: get_list(date))
return have_sort_times return have_sort_times
...@@ -150,14 +152,12 @@ def get_content_time_by_create_time(create_time="", content_level=0, action_type ...@@ -150,14 +152,12 @@ def get_content_time_by_create_time(create_time="", content_level=0, action_type
card_info['have_comment_number'] = 0 card_info['have_comment_number'] = 0
now = datetime.datetime.now() now = datetime.datetime.now()
createt = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S') createt = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S')
# nowt = now.strftime('%Y-%m-%d %H:%M:%S') num_days = (now - createt).days
##获取创建时间和当前时间的相差秒数
# num = (now - createt).total_seconds()
num_days = now.day - createt.day
content_day_need_add_one_day = False content_day_need_add_one_day = False
if after_day == True and card_info is not None: if after_day == True and card_info is not None:
num_days += repeat_time num_days += repeat_time
content_day_need_add_one_day = True content_day_need_add_one_day = True
# 创建时间切换成分钟数便于比较 # 创建时间切换成分钟数便于比较
# mins = divmod(num, min)[0] # mins = divmod(num, min)[0]
##根据转换后的分钟数进行比较 ##根据转换后的分钟数进行比较
...@@ -176,7 +176,8 @@ def get_content_time_by_create_time(create_time="", content_level=0, action_type ...@@ -176,7 +176,8 @@ def get_content_time_by_create_time(create_time="", content_level=0, action_type
elif num_days > 6 and num_days <= 365: elif num_days > 6 and num_days <= 365:
get_time = get_ten_last_days_random_time(num_days=num_days, content_level=content_level, get_time = get_ten_last_days_random_time(num_days=num_days, content_level=content_level,
content_day_need_add_one_day=content_day_need_add_one_day) content_day_need_add_one_day=content_day_need_add_one_day,
action_type=action_type, repeat_time=repeat_time)
time_region = 2 time_region = 2
return get_time, time_region return get_time, time_region
...@@ -198,19 +199,15 @@ def get_click_follow_time_by_create_time(create_time="", content_level=0, action ...@@ -198,19 +199,15 @@ def get_click_follow_time_by_create_time(create_time="", content_level=0, action
createt = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S') createt = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S')
nowt = now.strftime('%Y-%m-%d %H:%M:%S') nowt = now.strftime('%Y-%m-%d %H:%M:%S')
##获取创建时间和当前时间的相差秒数 ##获取创建时间和当前时间的相差秒数
num = (now - createt).total_seconds() num_days = (now - createt).days
num_days = now.day - createt.day
content_day_need_add_one_day = False content_day_need_add_one_day = False
if after_day: if after_day:
num_days += repeat_time num_days += repeat_time
content_day_need_add_one_day = True content_day_need_add_one_day = True
# 创建时间切换成分钟数便于比较
# mins = divmod(num, min)[0]
##根据转换后的分钟数进行比较
##转化成分数后进行一层一层的比较
if num_days == 0: if num_days == 0:
get_time = randomDate(create_time=createt, action_type=action_type) get_time = randomDate(create_time=createt, action_type=action_type)
return get_time, 0 return get_time, 0
elif num_days == 1: elif num_days == 1:
...@@ -229,14 +226,15 @@ def get_click_follow_time_by_create_time(create_time="", content_level=0, action ...@@ -229,14 +226,15 @@ def get_click_follow_time_by_create_time(create_time="", content_level=0, action
elif num_days > 15 and num_days <= 365: elif num_days > 15 and num_days <= 365:
##需要删掉kafka的数据不再进行下发 ##需要删掉kafka的数据不再进行下发
get_time = get_ten_last_days_random_time(num_days=num_days, action_type=action_type, get_time = get_ten_last_days_random_time(num_days=num_days, action_type=action_type,
content_level=content_level, content_level=content_level,
content_day_need_add_one_day=content_day_need_add_one_day) content_day_need_add_one_day=content_day_need_add_one_day)
return get_time, 4 return get_time, 4
else: else:
return [], 3 return [], 3
except: except:
logger.error("catch exception,err_log:%s" % traceback.format_exc()) logger.error("catch exception,err_log:%s" % traceback.format_exc())
return [], 5 return [], 5
...@@ -244,11 +242,8 @@ def get_click_follow_time_by_create_time(create_time="", content_level=0, action ...@@ -244,11 +242,8 @@ def get_click_follow_time_by_create_time(create_time="", content_level=0, action
def save_data_to_kafka(card_info): def save_data_to_kafka(card_info):
try: try:
producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKER_LIST)
topic = settings.KAFKA_TOPIC_NAME topic = settings.KAFKA_TOPIC_NAME
producer.send(topic, json.dumps(card_info).encode()) producer.send(topic, json.dumps(card_info).encode())
producer.close()
except: except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc()) logging.error("catch exception,err_msg:%s" % traceback.format_exc())
......
import datetime
import traceback
import logging
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from django.conf import settings
my_sender = 'lixiaofang@igengmei.com'
my_pass = 'Wd3W9j5XDbcKQHiz'
my_user6 = "lixiaofang@igengmei.com"
def send_email_tome(stat_data):
try:
msg = MIMEText(stat_data, 'plain', 'utf-8')
msg['From'] = formataddr(["李小芳", my_sender])
msg["To"] = formataddr(["李小芳", my_user6])
msg['Subject'] = str(datetime.date.today()) + "马甲超过次数啦,赶紧看一下"
server = smtplib.SMTP_SSL("smtp.exmail.qq.com", 465)
server.login(my_sender, my_pass)
server.sendmail(my_sender, [my_user6], msg.as_string())
server.quit()
except Exception:
logging.error("catch exception,main:%s" % traceback.format_exc())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment