Commit 467123ae authored by 高雅喆's avatar 高雅喆

历史订单tag去掉kv和tidb存储

parent 73331e88
......@@ -52,7 +52,7 @@ def send_email(app,id,e):
print('error')
def get_user_history_order_service_tag(user_id, stat_date):
def get_user_history_order_service_tag(user_id):
try:
if user_id:
db_zhengxing = pymysql.connect(host="172.16.30.141", port=3306, user="work",
......@@ -66,30 +66,29 @@ def get_user_history_order_service_tag(user_id, stat_date):
cur_zhengxing.execute(sql)
tags_dict = cur_zhengxing.fetchall()
tags_list = [i["tag_id"] for i in tags_dict]
# 写gmkv
gm_kv_cli = redis.Redis(host="172.16.40.135", port=5379, db=2, socket_timeout=2000)
db_zhengxing.close()
cur_zhengxing.close()
# # 写gmkv
# 写redis
user_history_order_tags_key = "user:history_order:tags:user_id:" + str(user_id)
tags_list_json = json.dumps(tags_list)
gm_kv_cli.set(user_history_order_tags_key, tags_list_json)
gm_kv_cli.expire(user_history_order_tags_key, time=30 * 24 * 60 * 60)
# 写redis
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
redis_client.set(user_history_order_tags_key, tags_list_json)
redis_client.expire(user_history_order_tags_key, time=30 * 24 * 60 * 60)
# 写tidb
sql_cl_id = "select device_id from statistic_device where id = (select max(device_id) from statistic_device_user where user_id = %d) " % (int(user_id))
cur_zhengxing.execute(sql_cl_id)
cl_id = cur_zhengxing.fetchall()[0]["device_id"]
db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
replace_sql = """replace into user_history_order_tags (stat_date, cl_id, user_id, tag_list) values("{stat_date}","{cl_id}",{user_id},"{tag_list}")"""\
.format(stat_date=stat_date, cl_id=cl_id, user_id=user_id, tag_list=tags_list)
cur_jerry_test.execute(replace_sql)
db_jerry_test.commit()
db_jerry_test.close()
return 'sucess'
# # 写tidb
# sql_cl_id = "select device_id from statistic_device where id = (select max(device_id) from statistic_device_user where user_id = %d) " % (int(user_id))
# cur_zhengxing.execute(sql_cl_id)
# cl_id = cur_zhengxing.fetchall()[0]["device_id"]
#
# db_jerry_test = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
# db='jerry_test', charset='utf8')
# cur_jerry_test = db_jerry_test.cursor()
# replace_sql = """replace into user_history_order_tags (stat_date, cl_id, user_id, tag_list) values("{stat_date}","{cl_id}",{user_id},"{tag_list}")"""\
# .format(stat_date=stat_date, cl_id=cl_id, user_id=user_id, tag_list=tags_list)
# cur_jerry_test.execute(replace_sql)
# db_jerry_test.commit()
# db_jerry_test.close()
return user_id
except Exception as e:
return 'pass'
......@@ -105,7 +104,9 @@ if __name__ == '__main__':
sql_device_ids = "select distinct user_id from api_order where status=1 and pay_time>'2015-08-16'"
cur_zhengxing.execute(sql_device_ids)
device_ids_lst = [i["user_id"] for i in cur_zhengxing.fetchall()]
stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
# stat_date = datetime.datetime.today().strftime('%Y-%m-%d')
db_zhengxing.close()
cur_zhengxing.close()
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
......@@ -120,7 +121,7 @@ if __name__ == '__main__':
spark.sparkContext.setLogLevel("WARN")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_history_order_service_tag(x, stat_date))
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_history_order_service_tag(x))
result.collect()
except Exception as e:
......
......@@ -6,7 +6,7 @@ mysql -u root -p3SYz54LS9#^9sBvC -h 172.16.40.158 -P 4000 -D jerry_test -e "dele
mysql -u root -p3SYz54LS9#^9sBvC -h 172.16.40.158 -P 4000 -D jerry_test -e "delete from jerry_test.user_new_tag_log where id in (select a.id from jerry_test.user_new_tag_log a left join eagle.src_zhengxing_api_tag b on a.tag_id=b.id where b.tag_type+0 > '3'+0)"
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.spark --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 30 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --archives /srv/apps/ftrl/bandits.zip --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/ffm-baseline_git/eda/smart_rank/dist_update_user_portrait.py
#/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.spark --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 30 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --archives /srv/apps/ftrl/bandits.zip --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/ffm-baseline_git/eda/smart_rank/dist_update_user_portrait.py
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.spark --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 30 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --archives /srv/apps/ftrl/bandits.zip --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/ffm-baseline_git/eda/smart_rank/dist_update_user_portrait_service.py
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment