Commit b3f07036 authored by 李小芳's avatar 李小芳

Merge branch 'hkx/feature/rpc-method-header' into 'test'

Hkx/feature/rpc method header

See merge request !17
parents 45321f83 1877d5da
......@@ -37,7 +37,7 @@ RUN apk add --no-cache --virtual .build-deps \
\
# 取消ssh第一次链接的确认
&& echo "StrictHostKeyChecking no" >> /etc/ssh/ssh_config \
&& apk add --no-cache mariadb-connector-c-dev libxml2-dev libxslt-dev librdkafka-dev \
&& apk add --no-cache build-base mariadb-connector-c-dev libxml2-dev libxslt-dev librdkafka-dev libexecinfo libexecinfo-dev snappy snappy-dev \
&& pip install --no-cache-dir -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com -r /tmp/requirements.txt \
&& mkdir -p /data/log/vest/app
......
......@@ -5,35 +5,52 @@ import logging
from libs.cache import redis_client
import json
from moment.views.send_email import send_email_tome
import datetime
def click(card_info):
try:
rpc_invoker = get_rpc_invoker()
rpc_invoker['qa/irrigation/create_answer_vote'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id']).unwrap()
key = "auto_vest_one_user_action:" + str(card_info['card_id'])
redis_data = redis_client.get(key)
today = datetime.datetime.now()
str_today = str(today.year) + str(today.month) + str(today.day)
key = "auto_vest_one_user_action_answer:" + str(card_info['card_id'])
redis_data = redis_client.hget(key, str_today)
if redis_data:
redis_data = json.loads(redis_data)
redis_data = json.loads(str(redis_data, encoding="utf8"))
click_num = int(redis_data.get("click")) + 1
redis_data['click'] = click_num
redis_client.set(key, json.dumps(redis_data))
redis_client.hset(key, str_today, json.dumps(redis_data))
else:
##代表还没有存储或者是已经过去一天了 需要清掉数据 从新的一天开始
redis_client.delete(key)
redis_data = {"click": 1, "follow": 0, "comment": 0}
redis_client.set(key, json.dumps(redis_data))
redis_client.expire(key, time=24 * 60 * 60)
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get redis_data:%s" % redis_data)
logging.info("get action:click,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
click_num = redis_data["click"]
values = list(redis_data.values())
s = [True for i in values if i > 10]
if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info))
if click_num > 12:
logging.info("今天已经消费到最大次数了,不能再消费")
return True, False
else:
rpc_invoker = get_rpc_invoker()
try:
status = rpc_invoker['qa/irrigation/create_answer_vote'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id']).unwrap()
return True
logging.info("get_card_info:%s,create_answer_vote:%s" % (card_info, status))
error = status.get('error', 1)
if error == 0:
return True, True
else:
send_email_tome(str(card_info) + str(status))
return False, True
except:
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False, True
except:
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
return False, False
......@@ -62,6 +62,7 @@ def auto_comment_user(card_info, after_day=False):
if card_info['type'] == "get_write_answer_userinfo":
repeat_time = 0
card_info['have_comment_number'] = 0
get_time, time_region = get_content_time_by_create_time(create_time, content_level,
action_type="comment",
after_day=after_day,
......
......@@ -5,36 +5,160 @@ import logging
from libs.cache import redis_client
import json
from moment.views.send_email import send_email_tome
import datetime
from moment.views.process_time import get_vest_userid_and_comment
def comment(card_info):
"""
在这里需要判断是帖子下发评论还是问答还是日记下发
:param card_info:
:return:
"""
try:
rpc_invoker = get_rpc_invoker()
rpc_invoker['qa/irrigation/create_answer_reply'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id'],
content=card_info['comment_content']).unwrap()
####
key = "auto_vest_one_user_action:" + str(card_info['card_id'])
redis_data = redis_client.get(key)
if redis_data:
redis_data = json.loads(redis_data)
click_num = int(redis_data.get("comment")) + 1
redis_data['comment'] = click_num
redis_client.set(key, json.dumps(redis_data))
if card_info['card_status'] == 'tractate':
today = datetime.datetime.now()
str_today = str(today.year) + str(today.month) + str(today.day)
key = "auto_vest_one_user_action_tractate:" + str(card_info['card_id'])
redis_data = redis_client.hget(key, str_today)
if redis_data:
redis_data = json.loads(str(redis_data, encoding="utf8"))
click_num = int(redis_data.get("comment")) + 1
redis_data['comment'] = click_num
redis_client.hset(key, str_today, json.dumps(redis_data))
else:
##代表还没有存储或者是已经过去一天了 需要清掉数据 从新的一天开始
redis_client.delete(key)
redis_data = {"comment": 1}
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:comment,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
comment_num = redis_data["comment"]
if comment_num > 12:
logging.info("今天已经消费到最大次数了,不能再消费")
else:
try:
rpc_invoker = get_rpc_invoker()
status = rpc_invoker['qa/irrigation/create_answer_reply'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id'],
content=card_info[
'comment_content']).unwrap()
logging.info("get_card_info:%s,have_answer_reply:%s" % (card_info, status))
error = status.get('error', 1)
if error == 0 and comment_num <= 11:
return True, True
elif error == 0 and comment_num > 11:
return True, False
else:
send_email_tome(str(card_info) + str(status))
return False, False
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False, False
elif card_info['card_status'] == 'diary':
today = datetime.datetime.now()
str_today = str(today.year) + str(today.month) + str(today.day)
key = "auto_vest_one_user_action_diary:" + str(card_info['card_id'])
redis_data = redis_client.hget(key, str_today)
if redis_data:
redis_data = json.loads(str(redis_data, encoding="utf8"))
click_num = int(redis_data.get("comment")) + 1
redis_data['comment'] = click_num
redis_client.hset(key, str_today, json.dumps(redis_data))
else:
redis_client.delete(key)
redis_data = {"comment": 1}
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:comment,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
comment_num = redis_data["comment"]
if comment_num > 12:
logging.info("今天已经消费到最大次数了,不能再消费")
else:
try:
rpc_invoker = get_rpc_invoker()
status = rpc_invoker['qa/irrigation/create_answer_reply'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id'],
content=card_info[
'comment_content']).unwrap()
logging.info("get_card_info:%s,have_answer_reply:%s" % (card_info, status))
error = status.get('error', 1)
if error == 0:
return True
else:
send_email_tome(str(card_info) + str(status))
return False
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
else:
redis_data = {"click": 0, "follow": 0, "comment": 1}
redis_client.set(key, json.dumps(redis_data))
redis_client.expire(key, time=24 * 60 * 60)
today = datetime.datetime.now()
str_today = str(today.year) + str(today.month) + str(today.day)
key = "auto_vest_one_user_action_answer:" + str(card_info['card_id'])
redis_data = redis_client.hget(key, str_today)
if redis_data:
redis_data = json.loads(str(redis_data, encoding="utf8"))
click_num = int(redis_data.get("comment")) + 1
redis_data['comment'] = click_num
redis_client.hset(key, str_today, json.dumps(redis_data))
else:
redis_client.delete(key)
redis_data = {"click": 0, "follow": 0, "comment": 1}
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:comment,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
comment_num = redis_data["comment"]
###判断一下如果评论为空就重新拿一个
if not card_info['comment_content']:
comment = get_vest_userid_and_comment(need_comment_num=1, tag_names=card_info['tag_names'])[0]
card_info['comment_content'] = comment
logging.info("get redis_data:%s" % redis_data)
##在这里加一个判断 如果当前的评论的user_id和评论内容已经在这个评论下了就不再下发给同一个回答ID
else:
key = 'have_reply_answer_comment:' + str(card_info['card_id'])
redis_data = redis_client.hget(key, card_info['current_user_id'])
if redis_data:
datas = json.loads(redis_data, encoding="utf-8")
if card_info['comment_content'] in datas:
logging.info("当前评论和当前的用户已经存在了")
pass
else:
datas.append(card_info['comment_content'])
redis_client.hset(key, card_info['current_user_id'], json.dumps(datas))
else:
conent = [card_info['comment_content']]
redis_client.hset(key, card_info['current_user_id'], json.dumps(conent))
values = list(redis_data.values())
s = [True for i in values if i > 10]
if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info))
if comment_num > 12:
logging.info("今天已经消费到最大次数了,不能再消费")
return True, False
return True
else:
rpc_invoker = get_rpc_invoker()
try:
status = rpc_invoker['qa/irrigation/create_answer_reply'](user_id=card_info['current_user_id'],
answer_id=card_info['card_id'],
content=card_info[
'comment_content']).unwrap()
logging.info("get_card_info:%s,have_answer_reply:%s" % (card_info, status))
error = status.get('error', 1)
if error == 0:
return True, True
else:
send_email_tome(str(card_info) + str(status))
return False, True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False, True
except:
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
return False, False
......@@ -93,6 +93,7 @@ def auto_follow_user(card_info, after_day=False):
card_info['current_push_time'] = card_info['all_push_time'][0]
save_data_to_kafka(card_info) # 存储数据
logging.info("get-------follow---------------card_info:%s" % card_info)
except:
logging_exception()
......
......@@ -5,34 +5,50 @@ import logging
from libs.cache import redis_client
import json
from moment.views.send_email import send_email_tome
import datetime
def follow(card_info):
try:
rpc_invoker = get_rpc_invoker()
rpc_invoker['api/irrigation/user_add_follow'](follow_user_id=card_info['current_user_id'],
followed_user_id=card_info['card_user_id']).unwrap()
key = "auto_vest_one_user_action:" + str(card_info['card_id'])
redis_data = redis_client.get(key)
today = datetime.datetime.now()
str_today = str(today.year) + str(today.month) + str(today.day)
key = "auto_vest_one_user_action_answer:" + str(card_info['card_id'])
redis_data = redis_client.hget(key, str_today)
if redis_data:
redis_data = json.loads(redis_data)
redis_data = json.loads(str(redis_data, encoding="utf8"))
click_num = int(redis_data.get("follow")) + 1
redis_data['follow'] = click_num
redis_client.set(key, json.dumps(redis_data))
redis_client.hset(key, str_today, json.dumps(redis_data))
else:
redis_client.delete(key)
redis_data = {"click": 0, "follow": 1, "comment": 0}
redis_client.set(key, json.dumps(redis_data))
redis_client.expire(key, time=24 * 60 * 60)
redis_client.hset(key, str_today, json.dumps(redis_data))
logging.info("get action:follow,card_id:%s,redis_data:%s" % (card_info['card_id'], redis_data))
follow_num = redis_data["follow"]
logging.info("get redis_data:%s" % redis_data)
values = list(redis_data.values())
s = [True for i in values if i > 10]
if len(s) > 0:
send_email_tome(str(redis_data) + str(card_info))
if follow_num > 11:
logging.info("今天已经消费到最大次数了,不能再消费")
return True, False
else:
rpc_invoker = get_rpc_invoker()
try:
status = rpc_invoker['api/irrigation/user_add_follow'](follow_user_id=card_info['current_user_id'],
followed_user_id=card_info[
'card_user_id']).unwrap()
logging.info("get_card_info:%s,user_add_follow:%s" % (card_info, status))
return True
error = status.get('error', 1)
if error == 0:
return True, True
else:
send_email_tome(str(card_info) + str(status))
return False, True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False, True
except:
logging_exception()
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
return False, False
This diff is collapsed.
......@@ -14,6 +14,16 @@ from libs.error import logging_exception
@bind('vest/moment/vest_irrigation')
def vest_irrigation(card_id=0, card_type=None, card_user_id=None, create_time="", content_level=0, tag_names=[]):
"""
在这里把后端传的数据存进卡夫卡 日记和帖子只需要发评论不需要点赞和关注
:param card_id:
:param card_type:
:param card_user_id:
:param create_time:
:param content_level:
:param tag_names:
:return:
"""
try:
producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKER_LIST)
......@@ -32,6 +42,7 @@ def vest_irrigation(card_id=0, card_type=None, card_user_id=None, create_time=""
comment_msg_dict = {
"card_id": card_id,
"card_type": "auto_vest",
'card_status': card_type,
"create_time": create_time,
"content_level": content_level,
"tag_names": tag_names,
......@@ -43,32 +54,35 @@ def vest_irrigation(card_id=0, card_type=None, card_user_id=None, create_time=""
logging.info("get comment_msg_dict:%s" % comment_msg_dict)
producer.send(topic, json.dumps(comment_msg_dict).encode())
follow_msg_dict = {
"card_id": card_id,
"card_type": "auto_vest",
"create_time": create_time,
"content_level": content_level,
"tag_names": tag_names,
"type": "get_write_answer_userinfo",
"current_push_time": create_time,
'action_type': 'follow',
'card_user_id': card_user_id
if card_type == 'answer':
follow_msg_dict = {
"card_id": card_id,
"card_type": "auto_vest",
'card_status': card_type,
"create_time": create_time,
"content_level": content_level,
"tag_names": tag_names,
"type": "get_write_answer_userinfo",
"current_push_time": create_time,
'action_type': 'follow',
'card_user_id': card_user_id
}
producer.send(topic, json.dumps(follow_msg_dict).encode())
}
producer.send(topic, json.dumps(follow_msg_dict).encode())
click_msg_dict = {
"card_id": card_id,
"card_type": "auto_vest",
"create_time": create_time,
"content_level": content_level,
"tag_names": tag_names,
"type": "get_write_answer_userinfo",
"current_push_time": create_time,
'action_type': 'click'
click_msg_dict = {
"card_id": card_id,
"card_type": "auto_vest",
'card_status': card_type,
"create_time": create_time,
"content_level": content_level,
"tag_names": tag_names,
"type": "get_write_answer_userinfo",
"current_push_time": create_time,
'action_type': 'click'
}
producer.send(topic, json.dumps(click_msg_dict).encode())
}
producer.send(topic, json.dumps(click_msg_dict).encode())
producer.close()
......
......@@ -17,6 +17,18 @@ from bs4 import BeautifulSoup
logger = logging.getLogger(__name__)
producer = KafkaProducer(bootstrap_servers=settings.KAFKA_BROKER_LIST)
db_mimas_eagle = pymysql.connect(host=settings.HOST, port=settings.PORT, user=settings.USER,
password=settings.PASSWORD,
db=settings.NAME)
mimas_cursor = db_mimas_eagle.cursor()
db_zhengxing_eagle = pymysql.connect(host=settings.HOST1, port=settings.PORT, user=settings.USER1,
password=settings.PASSWORD1,
db=settings.NAME1)
zhengxing_cursor = db_zhengxing_eagle.cursor()
def strTimeProp(start, end, prop, frmt):
stime = time.mktime(time.strptime(start, frmt))
......@@ -30,10 +42,10 @@ def randomDate(create_time, frmt='%Y-%m-%d %H:%M:%S', action_type=None):
action_num = random.randint(1, 3)
if action_type == "comment":
action_num = random.randint(0, 1)
action_num = random.randint(1, 2)
start = str(create_time + datetime.timedelta(hours=2))
end = str(create_time + datetime.timedelta(hours=4))
start = str(create_time + datetime.timedelta(minutes=30))
end = str(create_time + datetime.timedelta(hours=2))
random_times = [randomDate_six_one(start, end, frmt) for _ in range(action_num)]
have_sort_times = sorted(random_times, key=lambda date: get_list(date))
......@@ -60,20 +72,20 @@ def get_one_six_days_random_time(frmt='%Y-%m-%d %H:%M:%S', num_days=0, action_ty
action_num = random.randint(5, 10)
if num_days <= 15 and num_days > 1 and action_type in ("follow", "click") and int(content_level) < 3:
action_num = random.randint(0, 1)
action_num = random.randint(1, 2)
if num_days <= 15 and num_days > 1 and action_type in ("follow") and int(content_level) >= 3:
action_num = random.randint(0, 5)
action_num = random.randint(1, 5)
if num_days == 1 and action_type in ("click") and int(content_level) >= 3:
action_num = random.randint(6, 12)
if num_days <= 15 and num_days > 1 and action_type in ("click") and int(content_level) >= 3:
action_num = random.randint(0, 6)
action_num = random.randint(1, 6)
if num_days >= 1 and num_days <= 6 and action_type in ("comment"):
if int(content_level) <= 3:
action_num = random.randint(0, 1)
action_num = random.randint(1, 2)
else:
action_num = random.randint(2, 4)
......@@ -104,24 +116,24 @@ def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', conte
lastday = datetime.datetime(now.year, now.month, now.day, 23, 0, 0)
add_number = 0
if num_days > 15 and action_type in ("follow"):
action_num = random.randint(0, 2)
action_num = random.randint(1, 2)
add_number = 10
elif num_days > 15 and action_type in ("click"):
if content_level < 3:
action_num = random.randint(0, 1)
action_num = random.randint(1, 2)
add_number = 6
else:
action_num = random.randint(0, 2)
action_num = random.randint(1, 2)
add_number = 5
elif num_days > 6 and action_type in ("comment"):
if content_level <= 3:
action_num = random.randint(0, 1)
action_num = 1
add_number = 10
else:
action_num = random.randint(0, 2)
action_num = random.randint(1, 2)
add_number = 10
else:
......@@ -147,7 +159,10 @@ def get_ten_last_days_random_time(num_days=None, frmt='%Y-%m-%d %H:%M:%S', conte
def get_content_time_by_create_time(create_time="", content_level=0, action_type=None, after_day=False, card_info=None,
repeat_time=0):
try:
###在这个地方需要重新判断一下星级
content_level = get_current_card_content_level(card_info)
card_info['content_level'] = content_level
##
card_info['type'] = 'have_get_push_time'
card_info['have_comment_number'] = 0
now = datetime.datetime.now()
......@@ -193,6 +208,10 @@ def get_content_time_by_create_time(create_time="", content_level=0, action_type
def get_click_follow_time_by_create_time(create_time="", content_level=0, action_type=None, after_day=False,
card_info=None, repeat_time=0):
try:
######在这个地方需要重新判断一下星级
content_level = get_current_card_content_level(card_info)
card_info['content_level'] = content_level
####
card_info['have_click_number'] = 0
card_info['have_follow_number'] = 0
now = datetime.datetime.now()
......@@ -296,17 +315,76 @@ def get_vest_userid_and_comment(need_comment_num=0, tag_names=[], card_id=0):
for item in tag_names:
if item in all_keys_name:
service_closure_tags = redis_client.hget(redis_key, item)
closure_tags = json.loads(service_closure_tags)
closure_tags = json.loads(str(service_closure_tags, encoding="utf-8"))
all_comment_list.extend(closure_tags)
if all_comment_list:
content = random.sample(all_comment_list, need_comment_num)
else:
content = None
content = []
else:
content = None
content = []
return content
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return []
def judge_offset_partition_have_consum(card_info=None, offset=0, partition=0):
"""
根据当前的offset和分区去判断数据是否已经被消费
:param offset:
:param partition:
:return:
"""
try:
##先判断是不是2019-12-29的
create_time = card_info['create_time']
datetime_create_time = datetime.datetime.strptime(create_time, '%Y-%m-%d %H:%M:%S')
str_data = str(datetime_create_time.year) + str(datetime_create_time.month) + str(datetime_create_time.day)
if str_data in ['20191229', '20191230']:
logging.info("该日期的数据已经被删除啦")
return False
redis_list_data = 0
key = "irrigation_partition_offset_have_consum:" + str(partition)
redis_data = redis_client.get(key)
if offset < int(redis_data):
return False
redis_client.set(key, offset)
return True
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return False
def get_current_card_content_level(card_info=[]):
try:
# 判断当前卡片的等级
if card_info['card_status'] == 'answer':
sql = 'select level from api_answer where id = %s ' % (card_info['card_id'])
mimas_cursor.execute(sql)
data = list(mimas_cursor.fetchall())
if card_info['card_status'] == 'tractate':
sql = 'select content_level from api_tractate where id = %s ' % (card_info['card_id'])
mimas_cursor.execute(sql)
data = list(mimas_cursor.fetchall())
if card_info['card_status'] == 'diary':
sql = 'select content_level from api_diary where id = %s ' % (card_info['card_id'])
zhengxing_cursor.execute(sql)
data = list(zhengxing_cursor.fetchall())
if len(data) > 0:
if data[0][0]:
return int(data[0][0])
else:
return 0
else:
return 0
except:
logging.error("catch exception,err_msg:%s" % traceback.format_exc())
return 0
This diff is collapsed.
......@@ -24,3 +24,4 @@ def send_email_tome(stat_data):
# server.quit()
except Exception:
logging.error("catch exception,main:%s" % traceback.format_exc())
......@@ -19,8 +19,10 @@ urllib3==1.24.1
git+ssh://git@git.wanmeizhensuo.com/backend/helios.git@v0.7.2
virtualenv==15.1.0
gevent==1.2.1
git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git@v0.2.2
git+ssh://git@git.wanmeizhensuo.com/backend/gm-rpcd.git@v0.2.3
jieba==0.39
gunicorn==19.7.1
wheel==0.24.0
bs4==0.0.1
\ No newline at end of file
bs4==0.0.1
python-snappy==0.5.4
lz4==2.2.1
......@@ -20,6 +20,12 @@ PASSWORD = 'Gengmei1'
HOST = 'bj-cdb-6slgqwlc.sql.tencentcdb.com'
PORT = 62120
ENGINE1 = 'django.db.backends.mysql', # 设置为mysql数据库
NAME1 = 'zhengxing_test'
USER1 = 'work'
PASSWORD1 = 'Gengmei1'
HOST1 = 'bj-cdb-6slgqwlc.sql.tencentcdb.com'
OPTIONS = {
"init_command": "SET foreign_key_checks = 0;",
"charset": "utf8mb4", # 为了支持emoji表情
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment