Commit d713224f authored by 高雅喆's avatar 高雅喆

update

parent 2c0620c3
......@@ -112,67 +112,56 @@ def get_user_service_portrait(cl_id, all_word_tags, all_tag_tag_type, all_3tag_2
db_jerry_test.commit()
cur_jerry_test.close()
db_jerry_test.close()
# # 写tidb 用户分层营销
# # todo 不准确,因为聚合后,一个标签会有多个来源,即多个pay_type
# score_result = tag_score_sum[["tag2", "cl_id", "tag_score", "weight", "pay_type"]]
# score_result.rename(columns={"tag2": "tag_id", "cl_id": "device_id", "tag_score": "score"}, inplace=True)
# delete_sql = "delete from api_market_personas where device_id='{}'".format(cl_id)
# cur_jerry_test.execute(delete_sql)
# db_jerry_test.commit()
#
# for index, row in score_result.iterrows():
# insert_sql = "insert into api_market_personas values (null, {}, '{}', {}, {}, {})".format(
# row['tag_id'], row['device_id'], row['score'], row['weight'], row['pay_type'])
# cur_jerry_test.execute(insert_sql)
# db_jerry_test.commit()
# db_jerry_test.close()
return cl_id
if __name__ == '__main__':
db_jerry_test = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
# 获取最近30天内的用户设备id
sql_device_ids = "select distinct cl_id from user_new_tag_log " \
"where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
cur_jerry_test.execute(sql_device_ids)
device_ids_lst = [i[0] for i in cur_jerry_test.fetchall()]
db_jerry_test.close()
cur_jerry_test.close()
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
# 获取搜索词及其近义词对应的tag
all_word_tags = get_all_word_tags()
all_tag_tag_type = get_all_tag_tag_type()
# 3级tag对应的2级tag
all_3tag_2tag = get_all_3tag_2tag()
# 标签id对应的中文名称
all_tags_name = get_all_tags_name()
# 搜索词tag
search_words_synonym_tags_key = "search:words:synonym:tags"
search_words_synonym_tags_json = json.dumps(all_word_tags)
# gm_kv_cli.set(search_words_synonym_tags_key, search_words_synonym_tags_json)
redis_client.set(search_words_synonym_tags_key, search_words_synonym_tags_json)
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name))
# result.foreach(print)
result.collect()
spark.stop()
try:
db_jerry_test = pymysql.connect(host='172.16.40.170', port=4000, user='root', passwd='3SYz54LS9#^9sBvC',
db='jerry_test', charset='utf8')
cur_jerry_test = db_jerry_test.cursor()
# 获取最近30天内的用户设备id
sql_device_ids = "select distinct cl_id from user_new_tag_log " \
"where time > UNIX_TIMESTAMP(DATE_SUB(NOW(), INTERVAL 30 day))"
cur_jerry_test.execute(sql_device_ids)
device_ids_lst = [i[0] for i in cur_jerry_test.fetchall()]
db_jerry_test.close()
cur_jerry_test.close()
redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
# 获取搜索词及其近义词对应的tag
all_word_tags = get_all_word_tags()
all_tag_tag_type = get_all_tag_tag_type()
# 3级tag对应的2级tag
all_3tag_2tag = get_all_3tag_2tag()
# 标签id对应的中文名称
all_tags_name = get_all_tags_name()
# 搜索词tag
search_words_synonym_tags_key = "search:words:synonym:tags"
search_words_synonym_tags_json = json.dumps(all_word_tags)
# gm_kv_cli.set(search_words_synonym_tags_key, search_words_synonym_tags_json)
redis_client.set(search_words_synonym_tags_key, search_words_synonym_tags_json)
# rdd
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
.set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true") \
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
result = device_ids_lst_rdd.repartition(100).map(lambda x: get_user_service_portrait(x, all_word_tags, all_tag_tag_type, all_3tag_2tag, all_tags_name))
# result.foreach(print)
result.collect()
spark.stop()
except Exception as e:
pass
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment