Commit d203b389 authored by 张彦钊's avatar 张彦钊

change test file

parent 49029114
......@@ -37,12 +37,53 @@ def Json(x):
#rdd transform
def model(rdd):
try:
rdd = rdd.filter(lambda x:Json(x))
rdd = rdd.filter(lambda x:Json(x)).map(lambda x:get_data(x))
# .repartition(10)
return rdd
except:
print("fail")
def get_data(x):
try:
data = json.loads(x[1])
device_id = data['device']['device_id']
diary_id = data['params']['extra_param'][0]["card_id"]
return device_id,diary_id
except Exception as e:
print("get_data fail")
send_email("get_data", "get_data", e)
def write_redis(device_id,cid_list):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
tags = list(set([i[0] for i in result]))
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id in {} and c.tag_type = '3'".format(tuple(tags))
cursor.execute(sql)
result = cursor.fetchall()
if result is not None:
cids = list(set([i[0] for i in result]))
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = eval(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
else:
r.set(key, json.dumps(cids))
r.expire(key, 7*24*60*60)
except Exception as e:
print("insert redis fail")
print(e)
# Spark-Streaming-Kafka
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("filter").set("spark.io.compression.codec", "lzf"))
ssc=SQLContext(sc)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment