Commit 9ab9f978 authored by 张彦钊's avatar 张彦钊

change test file

parent d577dbc1
...@@ -33,12 +33,19 @@ def Json(x): ...@@ -33,12 +33,19 @@ def Json(x):
return False return False
else: else:
return False return False
#queue
#rdd transform
def filter_na(x):
if x[0] == "0" or x[1] is None:
return False
else:
return True
def model(rdd): def model(rdd):
try: try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x).na.drop().groupByKey())\ rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
.map(lambda x:write_redis(x[0],x[1])) .filter(lambda x:filter_na(x)).map(lambda x:write_redis(x[0],x[1]))
return rdd return rdd
except: except:
print("fail") print("fail")
...@@ -57,7 +64,7 @@ def write_redis(device_id,cid_list): ...@@ -57,7 +64,7 @@ def write_redis(device_id,cid_list):
try: try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \ sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list)) "on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid_list)
cursor = db.cursor() cursor = db.cursor()
cursor.execute(sql) cursor.execute(sql)
result = cursor.fetchall() result = cursor.fetchall()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment