Commit a458bb53 authored by 王志伟's avatar 王志伟

迁移到腾讯

parents 25b39fd1 6bb8533b
...@@ -6,15 +6,10 @@ import datetime ...@@ -6,15 +6,10 @@ import datetime
def con_sql(db,sql): def con_sql(db,sql):
cursor = db.cursor() cursor = db.cursor()
try: cursor.execute(sql)
cursor.execute(sql) result = cursor.fetchall()
result = cursor.fetchall() df = pd.DataFrame(list(result))
df = pd.DataFrame(list(result)) db.close()
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df return df
...@@ -32,14 +27,14 @@ def multi_hot(df,column,n): ...@@ -32,14 +27,14 @@ def multi_hot(df,column,n):
def get_data(): def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from {}".format(train_data_set) sql = "select max(stat_date) from {}".format(train_data_set)
validate_date = con_sql(db, sql)[0].values.tolist()[0] validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date) print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d") temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=300)).strftime("%Y-%m-%d") start = (temp - datetime.timedelta(days=3)).strftime("%Y-%m-%d")
print(start) print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name,u.device_type,u.manufacturer," \
"u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids,feat.level2 " \ "u.channel,c.top,e.device_id,cut.time,dl.app_list,e.diary_service_id,feat.level3_ids,feat.level2 " \
"from {} e left join user_feature u on e.device_id = u.device_id " \ "from {} e left join user_feature u on e.device_id = u.device_id " \
...@@ -55,7 +50,7 @@ def get_data(): ...@@ -55,7 +50,7 @@ def get_data():
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id",
11: "time", 12: "app_list", 13: "service_id", 14: "level3_ids", 15: "level2"}) 11: "time", 12: "app_list", 13: "service_id", 14: "level3_ids", 15: "level2"})
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \ sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \
"from train_Knowledge_network_data" "from train_Knowledge_network_data"
knowledge = con_sql(db, sql) knowledge = con_sql(db, sql)
...@@ -67,7 +62,7 @@ def get_data(): ...@@ -67,7 +62,7 @@ def get_data():
df = df.drop("level2", axis=1) df = df.drop("level2", axis=1)
service_id = tuple(df["service_id"].unique()) service_id = tuple(df["service_id"].unique())
db = pymysql.connect(host='rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com', port=3306, user='work', db = pymysql.connect(host='172.16.30.143', port=3306, user='work',
passwd='BJQaT9VzDcuPBqkd', db='zhengxing') passwd='BJQaT9VzDcuPBqkd', db='zhengxing')
sql = "select s.id,d.hospital_id from api_service s left join api_doctor d on s.doctor_id = d.id " \ sql = "select s.id,d.hospital_id from api_service s left join api_doctor d on s.doctor_id = d.id " \
"where s.id in {}".format(service_id) "where s.id in {}".format(service_id)
...@@ -152,7 +147,7 @@ def write_csv(df,name,n): ...@@ -152,7 +147,7 @@ def write_csv(df,name,n):
def get_predict(date,value_map,app_list_map,level2_map,level3_map): def get_predict(date,value_map,app_list_map,level2_map,level3_map):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \ sql = "select e.y,e.z,e.label,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \ "u.device_type,u.manufacturer,u.channel,c.top,e.device_id,e.cid_id,cut.time," \
"dl.app_list,e.hospital_id,feat.level3_ids,feat.level2 " \ "dl.app_list,e.hospital_id,feat.level3_ids,feat.level2 " \
...@@ -160,14 +155,14 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map): ...@@ -160,14 +155,14 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
"left join cid_type_top c on e.device_id = c.device_id " \ "left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \ "left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \ "left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id" "left join diary_feat feat on e.cid_id = feat.diary_id limit 600"
df = con_sql(db, sql) df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id", 6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "device_id",
11: "cid_id", 12: "time", 13: "app_list", 14: "hospital_id", 15: "level3_ids", 11: "cid_id", 12: "time", 13: "app_list", 14: "hospital_id", 15: "level3_ids",
16: "level2"}) 16: "level2"})
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \ sql = "select level2_id,treatment_method,price_min,price_max,treatment_time,maintain_time,recover_time " \
"from train_Knowledge_network_data" "from train_Knowledge_network_data"
knowledge = con_sql(db, sql) knowledge = con_sql(db, sql)
...@@ -232,7 +227,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map): ...@@ -232,7 +227,7 @@ def get_predict(date,value_map,app_list_map,level2_map,level3_map):
if __name__ == '__main__': if __name__ == '__main__':
train_data_set = "esmm_train_data" train_data_set = "esmm_train_data"
path = "/data/esmm/" path = "/home/gmuser/esmm/"
date, value, app_list, level2, level3 = get_data() date, value, app_list, level2, level3 = get_data()
get_predict(date, value, app_list, level2, level3) get_predict(date, value, app_list, level2, level3)
......
#! /bin/bash #! /bin/bash
git checkout master git checkout master
PYTHON_PATH=/home/gaoyazhe/miniconda3/bin/python PYTHON_PATH=/opt/anaconda3/envs/esmm/bin/python
MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm/Model_pipline MODEL_PATH=/srv/apps/ffm-baseline/eda/esmm/Model_pipline
DATA_PATH=/data/esmm DATA_PATH=/home/gmuser/esmm
echo "rm leave tfrecord" echo "rm leave tfrecord"
rm ${DATA_PATH}/tr/* rm ${DATA_PATH}/tr/*
......
...@@ -3,14 +3,14 @@ ...@@ -3,14 +3,14 @@
from sqlalchemy import create_engine from sqlalchemy import create_engine
import pandas as pd import pandas as pd
import pymysql import pymysql
import time import datetime
def con_sql(sql): def con_sql(sql):
""" """
:type sql : str :type sql : str
:rtype : tuple :rtype : tuple
""" """
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor() cursor = db.cursor()
cursor.execute(sql) cursor.execute(sql)
result = cursor.fetchall() result = cursor.fetchall()
...@@ -36,10 +36,10 @@ def native_set_join(lst): ...@@ -36,10 +36,10 @@ def native_set_join(lst):
def main(): def main():
# native queue # native queue
df2 = pd.read_csv('/data/esmm/native.csv') df2 = pd.read_csv(path+'/native.csv')
df2['cid_id'] = df2['cid_id'].astype(str) df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/data/esmm/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"]) df1 = pd.read_csv(path+"/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"] df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False) df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':native_set_join}).reset_index(drop=False)
df3.columns = ["device_id","city_id","native_queue"] df3.columns = ["device_id","city_id","native_queue"]
...@@ -47,10 +47,10 @@ def main(): ...@@ -47,10 +47,10 @@ def main():
# nearby queue # nearby queue
df2 = pd.read_csv('/data/esmm/nearby.csv') df2 = pd.read_csv(path+'/nearby.csv')
df2['cid_id'] = df2['cid_id'].astype(str) df2['cid_id'] = df2['cid_id'].astype(str)
df1 = pd.read_csv("/data/esmm/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"]) df1 = pd.read_csv(path+"/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"] df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False) df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':nearby_set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"] df4.columns = ["device_id","city_id","nearby_queue"]
...@@ -60,11 +60,10 @@ def main(): ...@@ -60,11 +60,10 @@ def main():
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("") df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
df_all['device_id'] = df_all['device_id'].astype(str) df_all['device_id'] = df_all['device_id'].astype(str)
df_all['city_id'] = df_all['city_id'].astype(str) df_all['city_id'] = df_all['city_id'].astype(str)
ctime = int(time.time()) df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
df_all["time"] = ctime
print("union_device_count",df_all.shape) print("union_device_count",df_all.shape)
host='10.66.157.22' host='172.16.40.158'
port=4000 port=4000
user='root' user='root'
password='3SYz54LS9#^9sBvC' password='3SYz54LS9#^9sBvC'
...@@ -78,7 +77,7 @@ def main(): ...@@ -78,7 +77,7 @@ def main():
# df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1) # df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str) delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') con = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor() cur = con.cursor()
cur.execute(delete_str) cur.execute(delete_str)
con.commit() con.commit()
...@@ -88,5 +87,7 @@ def main(): ...@@ -88,5 +87,7 @@ def main():
print("done") print("done")
if __name__ == '__main__': if __name__ == '__main__':
path = "/home/gmuser/esmm"
main() main()
\ No newline at end of file
...@@ -4,13 +4,11 @@ from __future__ import absolute_import ...@@ -4,13 +4,11 @@ from __future__ import absolute_import
from __future__ import division from __future__ import division
from __future__ import print_function from __future__ import print_function
import pandas as pd import pandas as pd
import sys
import os import os
import glob import glob
import tensorflow as tf import tensorflow as tf
import numpy as np import numpy as np
import re
from multiprocessing import Pool as ThreadPool from multiprocessing import Pool as ThreadPool
flags = tf.app.flags flags = tf.app.flags
......
...@@ -6,12 +6,10 @@ ...@@ -6,12 +6,10 @@
#import argparse #import argparse
import shutil import shutil
#import sys
import os import os
import json import json
import glob import glob
from datetime import date, timedelta from datetime import date, timedelta
from time import time
import random import random
import tensorflow as tf import tensorflow as tf
......
dev.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true dev.tidb.jdbcuri=jdbc:mysql://192.168.15.12:4000/eagle?user=root&password=&rewriteBatchedStatements=true
dev.tispark.pd.addresses=10.66.157.22:2379 dev.tispark.pd.addresses=192.168.15.11:2379
dev.mimas.jdbcuri= jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/mimas_test?user=work&password=workwork&rewriteBatchedStatements=true dev.mimas.jdbcuri= jdbc:mysql://rm-2zenowgrn4i5p0j7txo.mysql.rds.aliyuncs.com/mimas_test?user=work&password=Gengmei1&rewriteBatchedStatements=true
dev.gaia.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/zhengxing_test?user=work&password=workwork&rewriteBatchedStatements=true dev.gaia.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/zhengxing_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.gold.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/doris_test?user=work&password=workwork&rewriteBatchedStatements=true dev.gold.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/doris_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.redis.host=10.30.50.58 dev.jerry.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com/jerry_test?user=work&password=workwork&rewriteBatchedStatements=true
dev.redis.port=6379 dev.test.jdbcuri= jdbc:mysql://rm-2ze0v6uua2hl9he8edo.mysql.rds.aliyuncs.com/mimas_test?user=work&password=Gengmei1&rewriteBatchedStatements=true
pre.tidb.jdbcuri=jdbc:mysql://192.168.16.11:4000/eagle?user=root&password=&rewriteBatchedStatements=true pre.tidb.jdbcuri=jdbc:mysql://192.168.16.11:4000/eagle?user=root&password=&rewriteBatchedStatements=true
pre.tispark.pd.addresses=192.168.16.11:2379 pre.tispark.pd.addresses=192.168.16.11:2379
pre.mimas.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3308/mimas_prod?user=mimas&password=workwork&rewriteBatchedStatements=true pre.mimas.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3308/mimas_prod?user=mimas&password=workwork&rewriteBatchedStatements=true
<<<<<<< HEAD
#prod.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true #prod.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#prod.gold.jdbcuri=jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true #prod.gold.jdbcuri=jdbc:mysql://rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true
#prod.mimas.jdbcuri=jdbc:mysql://rm-m5emg41za2w7l6au3.mysql.rds.aliyuncs.com/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true #prod.mimas.jdbcuri=jdbc:mysql://rm-m5emg41za2w7l6au3.mysql.rds.aliyuncs.com/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true
...@@ -19,6 +19,22 @@ pre.mimas.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3308/mimas ...@@ -19,6 +19,22 @@ pre.mimas.jdbcuri=jdbc:mysql://rdsmaqevmuzj6jy.mysql.rds.aliyuncs.com:3308/mimas
#prod.redis.host=10.30.50.58 #prod.redis.host=10.30.50.58
#prod.redis.port=6379 #prod.redis.port=6379
=======
#阿里云线上配置
#prod.tidb.jdbcuri=jdbc:mysql://10.66.157.22:4000/eagle?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#prod.gold.jdbcuri=jdbc:mysql://rm-m5ey2s823bq0lc616.mysql.rds.aliyuncs.com/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true
#prod.mimas.jdbcuri=jdbc:mysql://rm-m5emg41za2w7l6au3.mysql.rds.aliyuncs.com/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true
#prod.gaia.jdbcuri=jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true
#prod.jerry.jdbcuri=jdbc:mysql://10.66.157.22:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#prod.tispark.pd.addresses=10.66.157.22:2379
#
#prod.tidb.jdbcuri_new=jdbc:mysql://152.136.44.138:4000/eagle?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#prod.jerry.jdbcuri_new=jdbc:mysql://152.136.44.138:4000/jerry_prod?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true
#腾讯云线上配置
>>>>>>> 6bb8533b68efef7c647251ef08479560d5e1216a
prod.gold.jdbcuri=jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true prod.gold.jdbcuri=jdbc:mysql://172.16.30.136/doris_prod?user=doris&password=o5gbA27hXHHm&rewriteBatchedStatements=true
prod.mimas.jdbcuri=jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true prod.mimas.jdbcuri=jdbc:mysql://172.16.30.138/mimas_prod?user=mimas&password=GJL3UJe1Ck9ggL6aKnZCq4cRvM&rewriteBatchedStatements=true
prod.gaia.jdbcuri=jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true prod.gaia.jdbcuri=jdbc:mysql://172.16.30.143/zhengxing?user=work&password=BJQaT9VzDcuPBqkd&rewriteBatchedStatements=true
......
...@@ -37,27 +37,17 @@ object GmeiConfig extends Serializable { ...@@ -37,27 +37,17 @@ object GmeiConfig extends Serializable {
sparkConf.set("spark.debug.maxToStringFields", "130") sparkConf.set("spark.debug.maxToStringFields", "130")
sparkConf.set("spark.sql.broadcastTimeout", "6000") sparkConf.set("spark.sql.broadcastTimeout", "6000")
if (!sparkConf.contains("""spark.master""")) {
sparkConf.setMaster("local[3]")
}
if (!sparkConf.contains("spark.tispark.pd.addresses")) {
sparkConf.set("spark.tispark.pd.addresses", this.config.getString("tispark.pd.addresses"))
}
println(sparkConf.get("spark.tispark.pd.addresses"))
val spark = SparkSession val spark = SparkSession
.builder() .builder()
// .config(sparkConf) .config(sparkConf)
.appName("feededa")
.enableHiveSupport()
.config("spark.tispark.pd.addresses","172.16.40.158:2379") .config("spark.tispark.pd.addresses","172.16.40.158:2379")
.config("spark.sql.extensions","org.apache.spark.sql.TiExtensions") .config("spark.sql.extensions","org.apache.spark.sql.TiExtensions")
.appName("feededa")
.enableHiveSupport()
.getOrCreate() .getOrCreate()
spark.sql("SET mapreduce.job.queuename=data") spark.sql("use online")
spark.sql("SET mapred.input.dir.recursive=true")
spark.sql("SET hive.mapred.supports.subdirectories=true")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
......
...@@ -52,7 +52,7 @@ object Recommendation_strategy_all { ...@@ -52,7 +52,7 @@ object Recommendation_strategy_all {
val stat_date = GmeiConfig.getMinusNDate(1) val stat_date = GmeiConfig.getMinusNDate(1)
// val stat_date = param.date // val stat_date = param.date
//println(param.date) //println(param.date)
val partition_date = stat_date.replace("-","") val partition_date = stat_date.replace("-","")
val decive_id_oldUser = sc.sql( val decive_id_oldUser = sc.sql(
...@@ -83,7 +83,7 @@ object Recommendation_strategy_all { ...@@ -83,7 +83,7 @@ object Recommendation_strategy_all {
|and jd.device_id not in (select device_id from blacklist) |and jd.device_id not in (select device_id from blacklist)
|and jd.stat_date ='${stat_date}' |and jd.stat_date ='${stat_date}'
""".stripMargin """.stripMargin
) )
val imp_count_oldUser_Contrast = sc.sql( val imp_count_oldUser_Contrast = sc.sql(
s""" s"""
...@@ -95,7 +95,7 @@ object Recommendation_strategy_all { ...@@ -95,7 +95,7 @@ object Recommendation_strategy_all {
|and je.device_id not in (select device_id from blacklist) |and je.device_id not in (select device_id from blacklist)
|and je.stat_date ='${stat_date}' |and je.stat_date ='${stat_date}'
""".stripMargin """.stripMargin
) )
val clk_count_oldUser_all = sc.sql( val clk_count_oldUser_all = sc.sql(
s""" s"""
...@@ -119,7 +119,7 @@ object Recommendation_strategy_all { ...@@ -119,7 +119,7 @@ object Recommendation_strategy_all {
""".stripMargin """.stripMargin
) )
//获取策略命中用户device_id //获取策略命中用户device_id
val device_id_cover = sc.sql( val device_id_cover = sc.sql(
s""" s"""
|select distinct(device_id) as device_id |select distinct(device_id) as device_id
...@@ -183,13 +183,13 @@ object Recommendation_strategy_all { ...@@ -183,13 +183,13 @@ object Recommendation_strategy_all {
""".stripMargin """.stripMargin
) )
val result1 = clk_count_oldUser_Contrast.join(imp_count_oldUser_Contrast,"stat_date") val result1 = clk_count_oldUser_Contrast.join(imp_count_oldUser_Contrast,"stat_date")
.join(clk_count_oldUser_all,"stat_date") .join(clk_count_oldUser_all,"stat_date")
.join(imp_count_oldUser_all,"stat_date") .join(imp_count_oldUser_all,"stat_date")
.join(clk_count_oldUser_Cover,"stat_date") .join(clk_count_oldUser_Cover,"stat_date")
.join(imp_count_oldUser_Cover,"stat_date") .join(imp_count_oldUser_Cover,"stat_date")
.join(device_num_cover,"stat_date") .join(device_num_cover,"stat_date")
.join(device_num_1_hit,"stat_date") .join(device_num_1_hit,"stat_date")
.join(device_num_hit,"stat_date") .join(device_num_hit,"stat_date")
result1.show() result1.show()
GmeiConfig.writeToJDBCTable(result1, "Recommendation_strategy_temp", SaveMode.Append) GmeiConfig.writeToJDBCTable(result1, "Recommendation_strategy_temp", SaveMode.Append)
...@@ -287,7 +287,7 @@ object Recommendation_strategy_all { ...@@ -287,7 +287,7 @@ object Recommendation_strategy_all {
GmeiConfig.writeToJDBCTable(result2, "strategy_other", SaveMode.Append) GmeiConfig.writeToJDBCTable(result2, "strategy_other", SaveMode.Append)
//统计新用户点击率 //统计新用户点击率
val devicee_id_newUser = sc.sql( val devicee_id_newUser = sc.sql(
s""" s"""
|select distinct(device_id) as device_id |select distinct(device_id) as device_id
...@@ -442,7 +442,7 @@ object Gini_coefficient { ...@@ -442,7 +442,7 @@ object Gini_coefficient {
""".stripMargin """.stripMargin
) )
agency_id.createOrReplaceTempView("agency_id") agency_id.createOrReplaceTempView("agency_id")
//统计次数 //统计次数
val diary_clk_num = sc.sql( val diary_clk_num = sc.sql(
s""" s"""
|select temp1.diary_id as diary_id,count(ov.cl_id) as diary_clk_num |select temp1.diary_id as diary_id,count(ov.cl_id) as diary_clk_num
...@@ -468,3 +468,4 @@ object Gini_coefficient { ...@@ -468,3 +468,4 @@ object Gini_coefficient {
...@@ -6,7 +6,7 @@ import java.time.LocalDate ...@@ -6,7 +6,7 @@ import java.time.LocalDate
import com.gmei.lib.AbstractParams import com.gmei.lib.AbstractParams
import org.apache.log4j.{Level, Logger} import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext} import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import scopt.OptionParser import scopt.OptionParser
import scala.util.parsing.json.JSON import scala.util.parsing.json.JSON
...@@ -46,9 +46,6 @@ object esmm_feature { ...@@ -46,9 +46,6 @@ object esmm_feature {
GmeiConfig.setup(param.env) GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession() val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2 val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_test",tableName = "device_app_list")
ti.tidbMapTable(dbName = "jerry_test",tableName = "user_feature")
user_feature(sc) user_feature(sc)
get_applist(sc) get_applist(sc)
...@@ -67,7 +64,7 @@ object esmm_feature { ...@@ -67,7 +64,7 @@ object esmm_feature {
""".stripMargin).dropDuplicates("device_id") """.stripMargin).dropDuplicates("device_id")
df.persist() df.persist()
val old = spark.sql("select device_id from device_app_list").collect().map(x => x(0).toString) val old = spark.sql("select device_id from jerry_test.device_app_list").collect().map(x => x(0).toString)
import spark.implicits._ import spark.implicits._
val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString)) val android = df.rdd.map(x => (x(0).toString,x(1).toString,x(2).toString))
...@@ -81,8 +78,6 @@ object esmm_feature { ...@@ -81,8 +78,6 @@ object esmm_feature {
val new_user = rdd.filter(x => old.indexOf(x._1)== -1) val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","os","app_list","update_date") .toDF("device_id","os","app_list","update_date")
if (new_user.take(1).nonEmpty){ if (new_user.take(1).nonEmpty){
val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbc, new_user,"device_app_list", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append) GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)
...@@ -114,7 +109,7 @@ object esmm_feature { ...@@ -114,7 +109,7 @@ object esmm_feature {
def user_feature(spark:SparkSession): Unit ={ def user_feature(spark:SparkSession): Unit ={
val yesterday = LocalDate.now().minusDays(1).toString.replace("-","") val yesterday = LocalDate.now().minusDays(1).toString.replace("-","")
println(yesterday) println(yesterday)
val sql_exist = "select device_id from user_feature" val sql_exist = "select device_id from jerry_test.user_feature"
val old = spark.sql(sql_exist) val old = spark.sql(sql_exist)
.collect().map(x => x(0).toString) .collect().map(x => x(0).toString)
val sql_yesterday = val sql_yesterday =
...@@ -130,12 +125,8 @@ object esmm_feature { ...@@ -130,12 +125,8 @@ object esmm_feature {
val df_new = rdd.filter(x => old.indexOf(x._1)== -1) val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","device_type","manufacturer","city_id","channel","date") .toDF("device_id","device_type","manufacturer","city_id","channel","date")
if (df_new.take(1).nonEmpty){ if (df_new.take(1).nonEmpty){
df_new.persist()
val jdbcuri = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbcuri, df_new, "user_feature", SaveMode.Append)
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append) GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
df_new.unpersist()
}else { }else {
println("no need to insert into user feature") println("no need to insert into user feature")
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment