Commit 288911ea authored by 王志伟's avatar 王志伟
parents 6e9fda44 bc68a743
...@@ -215,7 +215,7 @@ object EsmmData { ...@@ -215,7 +215,7 @@ object EsmmData {
""".stripMargin """.stripMargin
) )
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append) GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)
} else { } else {
println("jerry_test.esmm_train_data already have param.date data") println("jerry_test.esmm_train_data already have param.date data")
...@@ -549,7 +549,7 @@ object EsmmPredData { ...@@ -549,7 +549,7 @@ object EsmmPredData {
val jdbcDF = sc.read val jdbcDF = sc.read
.format("jdbc") .format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") .option("driver", "com.mysql.jdbc.Driver")
.option("url", "jdbc:mysql://rdsfewzdmf0jfjp9un8xj.mysql.rds.aliyuncs.com:3306/zhengxing") .option("url", "jdbc:mysql://172.16.30.143:3306/zhengxing")
.option("dbtable", "api_punishment") .option("dbtable", "api_punishment")
.option("user", "work") .option("user", "work")
.option("password", "BJQaT9VzDcuPBqkd") .option("password", "BJQaT9VzDcuPBqkd")
...@@ -595,7 +595,7 @@ object EsmmPredData { ...@@ -595,7 +595,7 @@ object EsmmPredData {
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id") // union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
// println(union_data_scity_id2.count()) // println(union_data_scity_id2.count())
GmeiConfig.writeToJDBCTable("jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable("jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_pre_data",SaveMode.Overwrite)
...@@ -669,7 +669,7 @@ object GetDiaryPortrait { ...@@ -669,7 +669,7 @@ object GetDiaryPortrait {
|select diary_id,level1_ids,level2_ids,level3_ids,split(level2_ids,",")[0] as level2 from t |select diary_id,level1_ids,level2_ids,level3_ids,split(level2_ids,",")[0] as level2 from t
""".stripMargin """.stripMargin
) )
val jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(jdbc,result,"diary_feat",SaveMode.Overwrite) GmeiConfig.writeToJDBCTable(jdbc,result,"diary_feat",SaveMode.Overwrite)
......
...@@ -32,7 +32,9 @@ object GmeiConfig extends Serializable { ...@@ -32,7 +32,9 @@ object GmeiConfig extends Serializable {
} }
def getSparkSession():(SparkContext, SparkSession) = { def getSparkSession():(SparkContext, SparkSession) = {
val sparkConf = new SparkConf val sparkConf = new SparkConf().set("spark.tispark.plan.allow_index_read", "false")
.set("spark.hive.mapred.supports.subdirectories","true")
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true")
sparkConf.set("spark.sql.crossJoin.enabled", "true") sparkConf.set("spark.sql.crossJoin.enabled", "true")
sparkConf.set("spark.debug.maxToStringFields", "130") sparkConf.set("spark.debug.maxToStringFields", "130")
sparkConf.set("spark.sql.broadcastTimeout", "6000") sparkConf.set("spark.sql.broadcastTimeout", "6000")
......
...@@ -78,7 +78,7 @@ object esmm_feature { ...@@ -78,7 +78,7 @@ object esmm_feature {
val new_user = rdd.filter(x => old.indexOf(x._1)== -1) val new_user = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","os","app_list","update_date") .toDF("device_id","os","app_list","update_date")
if (new_user.take(1).nonEmpty){ if (new_user.take(1).nonEmpty){
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append) GmeiConfig.writeToJDBCTable(tecent_jdbc, new_user,"device_app_list", SaveMode.Append)
}else{ }else{
...@@ -125,7 +125,7 @@ object esmm_feature { ...@@ -125,7 +125,7 @@ object esmm_feature {
val df_new = rdd.filter(x => old.indexOf(x._1)== -1) val df_new = rdd.filter(x => old.indexOf(x._1)== -1)
.toDF("device_id","device_type","manufacturer","city_id","channel","date") .toDF("device_id","device_type","manufacturer","city_id","channel","date")
if (df_new.take(1).nonEmpty){ if (df_new.take(1).nonEmpty){
val tecent_jdbc = "jdbc:mysql://152.136.44.138:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true" val tecent_jdbc = "jdbc:mysql://172.16.40.158:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append) GmeiConfig.writeToJDBCTable(tecent_jdbc, df_new, "user_feature", SaveMode.Append)
}else { }else {
println("no need to insert into user feature") println("no need to insert into user feature")
......
...@@ -3,16 +3,33 @@ from pyspark.sql import HiveContext ...@@ -3,16 +3,33 @@ from pyspark.sql import HiveContext
from pyspark.context import SparkContext from pyspark.context import SparkContext
from pyspark.conf import SparkConf from pyspark.conf import SparkConf
import pytispark.pytispark as pti import pytispark.pytispark as pti
from pyspark.sql import SQLContext # from pyspark.sql import SQLContext
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import datetime import datetime
def test(): def test():
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
ti = pti.TiContext(spark) sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true")\
ti.tidbMapDatabase("jerry_test") .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true")\
df = spark.sql("select max(stat_date) from esmm_train_data") .set("spark.tispark.plan.allow_index_double_read", "false") \
.set("spark.tispark.plan.allow_index_read", "true")\
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")\
.set("spark.tispark.pd.addresses", "172.16.40.158:2379")
spark = SparkSession.builder.config(conf= sparkConf).enableHiveSupport().getOrCreate()
# spark.sql("use online")
# spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
# spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
# spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
# spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
spark.sparkContext.setLogLevel("WARN")
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
df = spark.sql("select max(stat_date) from jerry_test.esmm_train_data")
df.show() df.show()
t = df.rdd.map(lambda x: str(x[0])).collect() t = df.rdd.map(lambda x: str(x[0])).collect()
print(t.count()) print(t.count())
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment