Commit fc8ca6d8 authored by 赵威's avatar 赵威

update sql function

parent fb1baad7
import datetime
import functools import functools
import operator import operator
from collections import Counter from collections import Counter
from tool import get_tag3_user_log from pyspark import SparkConf
from pyspark.sql import SparkSession
from tool import get_tag3_user_log, get_jerry_test
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}]
# {'手术': 5.8475846946146195, '激光': 1.949194898204873} # {'手术': 5.8475846946146195, '激光': 1.949194898204873}
...@@ -72,7 +74,43 @@ def update_tag3_user_portrait(cl_id): ...@@ -72,7 +74,43 @@ def update_tag3_user_portrait(cl_id):
return res return res
# def consume_kafka():
# # TODO 30
# sql = "select distinct cl_id from kafka_tag3_log where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 5 day))"
# db, cursor = get_jerry_test()
# cursor.execute(sql)
# data = [i[0] for i in cursor.fetchall()]
# db.close()
# cursor.close()
# sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
# .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
# .set("spark.tispark.plan.allow_index_double_read", "false") \
# .set("spark.tispark.plan.allow_index_read", "true") \
# .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
# .set("spark.tispark.pd.addresses", "172.16.40.170:2379") \
# .set("spark.io.compression.codec", "lzf") \
# .set("spark.driver.maxResultSize", "8g") \
# .set("spark.sql.avro.compression.codec", "snappy")
# spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# spark.sparkContext.setLogLevel("WARN")
# spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
# device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
# result = device_ids_lst_rdd.repartition(100).map(
# lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name))
# # result.foreach(print)
# print("done")
# result.collect()
# spark.stop()
if __name__ == "__main__": if __name__ == "__main__":
# start = datetime.datetime.now()
# consume_kafka()
# end = datetime.datetime.now()
# print(end - start)
cl_id = "866017030837899" cl_id = "866017030837899"
res = update_tag3_user_portrait(cl_id) res = update_tag3_user_portrait(cl_id)
print(res) print(res)
...@@ -355,14 +355,14 @@ def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0): ...@@ -355,14 +355,14 @@ def get_user_log(cl_id, all_word_tags, pay_time=0, debug=0):
return user_df_service return user_df_service
def get_jerry_test_cursor(): def get_jerry_test():
db = pymysql.connect(host="172.16.40.158", db = pymysql.connect(host="172.16.40.158",
port=4000, port=4000,
user="st_user", user="st_user",
passwd="aqpuBLYzEV7tML5RPsN1pntUzFy", passwd="aqpuBLYzEV7tML5RPsN1pntUzFy",
db="jerry_test", db="jerry_test",
charset="utf8") charset="utf8")
return db.cursor() return db, db.cursor()
def compute_tag3_score(x): def compute_tag3_score(x):
...@@ -387,9 +387,12 @@ def get_tag3_user_log(cl_id): ...@@ -387,9 +387,12 @@ def get_tag3_user_log(cl_id):
second_demands, first_positions, second_positions, projects second_demands, first_positions, second_positions, projects
from kafka_tag3_log where cl_id = '{}'""".format(cl_id) from kafka_tag3_log where cl_id = '{}'""".format(cl_id)
cursor = get_jerry_test_cursor() db, cursor = get_jerry_test()
cursor.execute(sql) cursor.execute(sql)
data = list(cursor.fetchall()) data = list(cursor.fetchall())
db.close()
cursor.close()
if data: if data:
user_df = pd.DataFrame(data) user_df = pd.DataFrame(data)
user_df.columns = columns user_df.columns = columns
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment