Commit 223a1260 authored by 张彦钊's avatar 张彦钊

change test file

parent d592d3f2
...@@ -20,14 +20,7 @@ import datetime ...@@ -20,14 +20,7 @@ import datetime
def Json(x): def Json(x):
data = json.loads(x[1]) data = json.loads(x[1])
if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params']: if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params']:
if data['type'] == 'on_click_card'and data['params']['card_content_type'] =='diary' \ if data['type'] == 'on_click_card' and data["device"]["device_id"] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6":
and data["device"]["device_id"] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6":
return True
# elif data['type'] == 'on_click_button'and 'params' in data:
# if 'page_name' in data['params'] and 'button_name' in data['params'] and 'extra_params' in data['params']:
# if data['params']['page_name'] =='page_choose_interest' and data['params']['button_name']=='next':
# return True
return True return True
else: else:
return False return False
...@@ -35,36 +28,149 @@ def Json(x): ...@@ -35,36 +28,149 @@ def Json(x):
return False return False
def filter_na(x):
if x[0] != "0" and x[1] is not None:
return True
else:
return False
def model(rdd): def model(rdd):
try: try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x)) rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
# .map(lambda x:write_redis(x[0],x[1])) .map(lambda x:write_redis(x[0],x[1],x[2]))
return rdd return rdd
except: except:
print("fail") print("fail")
def get_data(x): def get_data(x):
try: try:
data = json.loads(x[1]) data = json.loads(x[1])
device_id = data['device']['device_id'] device_id = data['device']['device_id']
diary_id = data['params']["card_id"] diary_id = data['params']["card_id"]
return device_id,diary_id card = data['params']['card_content_type']
return device_id,diary_id,card
except Exception as e: except Exception as e:
print("get_data fail") print("get_data fail")
# send_email("get_data", "get_data", e) # send_email("get_data", "get_data", e)
def write_redis(device_id,cid_list): def write_redis(device_id,cid,card):
if card == "diary":
diary_write(device_id, cid)
elif card == "question":
question_write(device_id, cid)
elif card == "answer":
answer_write(device_id, cid)
elif card == "user_post":
tractate_write(device_id, cid)
def tractate_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_tractate_tag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.tractate_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_tractate a left join src_mimas_prod_api_tractate_tag b " \
"on a.id=b.tractate_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
cids = list(set([i[0] for i in result]))
if len(cids) != 0:
r = redis.Redis(host="172.16.40.135", port=5379, password="")
key = str(device_id) + "_dislike_tractate"
if r.exists(key):
value = json.loads(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
else:
r.set(key, json.dumps(cids))
r.expire(key, 7*24*60*60)
return "tractate good"
except Exception as e:
print("tractate insert redis fail")
print(e)
def answer_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select c.id from src_mimas_prod_api_answer a left join src_mimas_prod_api_questiontag b " \
"on a.question_id = b.question_id left join src_zhengxing_api_tag c " \
"on b.tag_id = c.id where c.tag_type = '3' and a.id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_answer a left join src_mimas_prod_api_questiontag b " \
"on a.question_id = b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
cids = list(set([i[0] for i in result]))
if len(cids) != 0:
r = redis.Redis(host="172.16.40.135", port=5379, password="")
key = str(device_id) + "_dislike_answer"
if r.exists(key):
value = json.loads(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
else:
r.set(key, json.dumps(cids))
r.expire(key, 7*24*60*60)
return "answer good"
except Exception as e:
print("answer insert redis fail")
print(e)
def question_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_questiontag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.question_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_question a left join src_mimas_prod_api_questiontag b " \
"on a.id=b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
cids = list(set([i[0] for i in result]))
if len(cids) != 0:
r = redis.Redis(host="172.16.40.135", port=5379, password="")
key = str(device_id) + "_dislike_question"
if r.exists(key):
value = json.loads(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
else:
r.set(key, json.dumps(cids))
r.expire(key, 7*24*60*60)
return "question good"
except Exception as e:
print("question insert redis fail")
print(e)
def diary_write(device_id,cid):
try: try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \ sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid_list) "on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid)
cursor = db.cursor() cursor = db.cursor()
cursor.execute(sql) cursor.execute(sql)
result = cursor.fetchall() result = cursor.fetchall()
...@@ -80,75 +186,39 @@ def write_redis(device_id,cid_list): ...@@ -80,75 +186,39 @@ def write_redis(device_id,cid_list):
cids = list(set([i[0] for i in result])) cids = list(set([i[0] for i in result]))
if len(cids) != 0: if len(cids) != 0:
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379') r = redis.Redis(host="172.16.40.135", port=5379, password="")
key = str(device_id) + "_dislike_diary" key = str(device_id) + "_dislike_diary"
if r.exists(key): if r.exists(key):
value = eval(r.get(key)) value = json.loads(r.get(key))
value.extend(cids) value.extend(cids)
cids = json.dumps(list(set(value))) cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids)) r.set(key, json.dumps(cids))
else: else:
r.set(key, json.dumps(cids)) r.set(key, json.dumps(cids))
r.expire(key, 7*24*60*60) r.expire(key, 7*24*60*60)
return "good" return "diary good"
except Exception as e: except Exception as e:
print("insert redis fail") print("diary insert redis fail")
print(e) print(e)
def group_redis(x):
device_id = x[0] # sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("filter").set("spark.io.compression.codec", "lzf"))
cid_list = x[1] # ssc = StreamingContext(sc,4)
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle') # sc.setLogLevel("WARN")
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \ # kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list)) # "group.id": "filter",
cursor = db.cursor() # "socket.timeout.ms": "600000",
cursor.execute(sql) # "auto.offset.reset": "largest"}
result = cursor.fetchall() #
db.close() #
tags = list(set([i[0] for i in result])) # stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
if len(tags) != 0: # transformstream = stream.transform(lambda x:model(x))
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id in {} and c.tag_type = '3'".format(tuple(tags))
cursor.execute(sql)
result = cursor.fetchall()
cids = list(set([i[0] for i in result]))
if len(cids) >1 :
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = eval(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
else:
r.set(key, json.dumps(cids))
r.expire(key, 60*60)
# def group_write(rdd):
# rdd.groupByKey().foreachPartition(lambda x:x.map())
# return "good"
# Spark-Streaming-Kafka
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("filter").set("spark.io.compression.codec", "lzf"))
ssc=SQLContext(sc)
ssc = StreamingContext(sc,4)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "filter",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
transformstream = stream.transform(lambda x:model(x)).foreachRDD(lambda x:x.groupByKey().
foreach(group_redis))
# transformstream.pprint() # transformstream.pprint()
# print(transformstream) #
ssc.start() # ssc.start()
ssc.awaitTermination() # ssc.awaitTermination()
\ No newline at end of file print(tractate_write("hello","10078"))
print(tractate_write("hello",37975))
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment