Commit 99b15e3a authored by 王志伟's avatar 王志伟
parents 9d6cc727 36608059
......@@ -4,7 +4,7 @@ package com.gmei
import java.io.Serializable
import java.time.LocalDate
import org.apache.spark.sql.{SaveMode, SparkSession, TiContext}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, TiContext}
import org.apache.log4j.{Level, Logger}
import scopt.OptionParser
import com.gmei.lib.AbstractParams
......@@ -139,8 +139,9 @@ object EsmmData {
// println(cvr_data_filter.count())
val clk_data_filter =clk_data.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
val other_click = get_other_click(sc,stat_date_not)
val all_click = clk_data.union(other_click)
val clk_data_filter =all_click.except(cvr_data).withColumn("y",lit(1)).withColumn("z",lit(0))
// clk_data_filter.createOrReplaceTempView("clk_data_filter")
// clk_data_filter.show()
// println("clk_data_filter.count()")
......@@ -222,7 +223,6 @@ object EsmmData {
""".stripMargin
)
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id2, table="esmm_train_data",SaveMode.Append)
} else {
......@@ -233,6 +233,103 @@ object EsmmData {
}
}
def get_other_click(spark:SparkSession,yesterday:String): DataFrame ={
var result01 = spark.sql(
s"""
|select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
|device["device_id"] as device_id,channel as device_type,
|city_id,params['business_id'] as cid
|from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
|and action = 'on_click_diary_card' and params['tab_name'] != '精选'
|and params['page_name'] = 'home'
""".stripMargin
)
// println(result01.count())
// result01.show(6)
val recommend = spark.sql(
s"""
|select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
|device["device_id"] as device_id,channel as device_type,
|city_id,params["business_id"] as cid
|from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
|and action = 'diarybook_detail_click_recommend_block' and params["business_type"] = "diary"
""".stripMargin
)
// println("详情页推荐日记:")
// println(recommend.count())
// recommend.show(6)
val search_zonghe = spark.sql(
s"""
|select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
|device["device_id"] as device_id,channel as device_type,city_id,params["business_id"] as cid
|from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
|and action = 'search_result_click_infomation_item' and params["business_type"] = "diary"
""".stripMargin
)
// println("搜索综合:")
// println(search_zonghe.count())
// search_zonghe.show(6)
val non_home = spark.sql(
s"""
|select from_unixtime(unix_timestamp('${yesterday}', 'yyyyMMdd'), 'yyyy-MM-dd') as stat_date,
|device["device_id"] as device_id,channel as device_type,city_id,params["diary_id"] as cid
|from online.tl_hdfs_maidian_view where partition_date = '$yesterday'
|and action = 'on_click_diary_card' and params['page_name'] != 'home'
""".stripMargin
)
// println("non home:")
// println(non_home.count())
// non_home.show(6)
result01 = result01.union(recommend).union(search_zonghe).union(non_home)
// println(result01.count())
result01.createOrReplaceTempView("temp_result")
val result02 = spark.sql(
s"""
|select * from temp_result
|where device_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5')
| and device_id not in
| (SELECT cl_id
| FROM online.ml_hospital_spam_pv_day
| WHERE partition_date>='20180402' AND partition_date<'${yesterday}'
| AND pv_ratio>=0.95
| UNION ALL
| SELECT cl_id
| FROM online.ml_hospital_spam_pv_month
| WHERE partition_date>='20171101' AND partition_date<'${yesterday}'
| AND pv_ratio>=0.95
| )
""".stripMargin
)
result02.createOrReplaceTempView("temp_result02")
val result_dairy = spark.sql(
s"""
|select
| re.stat_date as stat_date,
| re.device_id as device_id,
| re.city_id as ucity_id,
| re.cid as cid_id,
| da.service_id as diary_service_id
|from temp_result02 re
|left join online.ml_community_diary_updates da
|on re.cid = da.diary_id
|where da.partition_date='${yesterday}'
""".stripMargin
).distinct()
result_dairy
}
}
......@@ -310,7 +407,7 @@ object EsmmPredData {
}.filter(_._3!="").toDF("device_id","city_id","merge_queue")
println("nearby_device_count",raw_data1.count())
val start= LocalDate.now().minusDays(8).toString
val start= LocalDate.now().minusDays(14).toString
import sc.implicits._
val sql =
s"""
......@@ -542,9 +639,9 @@ object GetDiaryPortrait {
val diary_tag = sc.sql(
s"""
|select c.diary_id,
| concat_ws(';',collect_set(cast(c.level1_id as string))) as level1_ids,
| concat_ws(';',collect_set(cast(c.level2_id as string))) as level2_ids,
| concat_ws(';',collect_set(cast(c.level3_id as string))) as level3_ids from
| concat_ws(',',collect_set(cast(c.level1_id as string))) as level1_ids,
| concat_ws(',',collect_set(cast(c.level2_id as string))) as level2_ids,
| concat_ws(',',collect_set(cast(c.level3_id as string))) as level3_ids from
| (select a.diary_id,b.level1_id,b.level2_id,b.level3_id
| from online.tl_hdfs_diary_tags_view a
| left join online.bl_tag_hierarchy_detail b
......@@ -556,8 +653,9 @@ object GetDiaryPortrait {
)
diary_tag.show()
println(diary_tag.count())
val jdbc = "jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true"
GmeiConfig.writeToJDBCTable(diary_tag,"diary_feat",SaveMode.Overwrite)
GmeiConfig.writeToJDBCTable(jdbc,diary_tag,"diary_feat",SaveMode.Overwrite)
sc.stop()
......
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/data1/hadoop/data</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://gmei-hdfs</value>
</property>
<property>
<name>net.topology.node.switch.mapping.impl</name>
<value>org.apache.hadoop.net.ScriptBasedMapping</value>
</property>
<!--
<property>
<name>hadoop.security.group.mapping</name>
<value>org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
</property>
<property>
<name>net.topology.script.file.name</name>
<value>/opt/hadoop-2.5.1/bin/topology.py</value>
</property>
-->
<property>
<name>hadoop.security.authentication</name>
<value>simple</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>hadoop.rpc.protection</name>
<value>authentication</value>
</property>
<property>
<name>io.compression.codec.lzo.buffersize</name>
<value>69976</value>
</property>
<property>
<name>hfile.compression</name>
<value>lzo</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>4194304</value>
</property>
<property>
<name>fs.inmemory.size.mb</name>
<value>1500</value>
</property>
<property>
<name>io.seqfile.compress.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>fs.trash.interval</name>
<value>1440</value>
</property>
<property>
<name>ipc.server.listen.queue.size</name>
<value>1024</value>
</property>
<property>
<name>ipc.server.read.threadpool.size</name>
<value>10</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.oozie.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.flume.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hive.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.httpfs.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hadoop.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.dm.groups</name>
<value>*</value>
</property>
</configuration>
......@@ -18,45 +18,65 @@ def con_sql(db,sql):
return df
def multi_hot(df,column,n):
df[column] = df[column].fillna("lost_na")
app_list_value = [i.split(",") for i in df[column].unique()]
app_list_unique = []
for i in app_list_value:
app_list_unique.extend(i)
app_list_unique = list(set(app_list_unique))
number = len(app_list_unique)
app_list_map = dict(zip(app_list_unique, list(range(n, number + n))))
df[column] = df[column].apply(app_list_func, args=(app_list_map,))
return number,app_list_map
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from {}".format(train_data_set)
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=50)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=20)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,cut.time " \
sql = "select e.y,e.z,e.stat_date,e.ucity_id,feat.level2_ids,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,e.device_id,cut.time,dl.app_list " \
"from {} e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id " \
"left join diary_feat feat on e.cid_id = feat.diary_id " \
"where e.stat_date >= '{}'".format(train_data_set,start)
df = con_sql(db, sql)
# print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel2_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11: "l2",
12: "device_id", 13: "time"})
12: "device_id", 13: "time",14:"app_list"})
print("esmm data ok")
# print(df.head(2)
print("before")
print(df.shape)
print("after")
df = df.drop_duplicates()
df = df.drop_duplicates(["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1","l2", "time", "stat_date"])
# print(df.shape)
# print("exp numbers:")
# print(df[df["y"] == 0].shape)
# print("click numbers")
# print(df[(df["y"] == 1)&(df["z"] == 0)].shape)
# print("buy numbers")
# print(df[(df["y"] == 1) & (df["z"] == 1)].shape)
df = df.drop_duplicates(["ucity_id", "clevel2_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1","l2", "time", "stat_date","app_list"])
app_list_number,app_list_map = multi_hot(df,"app_list",1)
level2_number,level2_map = multi_hot(df,"clevel2_id",1+app_list_number)
# df["app_list"] = df["app_list"].fillna("lost_na")
# app_list_value = [i.split(",") for i in df["app_list"].unique()]
# app_list_unique = []
# for i in app_list_value:
# app_list_unique.extend(i)
# app_list_unique = list(set(app_list_unique))
# app_list_map = dict(zip(app_list_unique, list(range(1, len(app_list_unique) + 1))))
# df["app_list"] = df["app_list"].apply(app_list_func,args=(app_list_map,))
unique_values = []
features = ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
features = ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "time", "stat_date"]
for i in features:
df[i] = df[i].astype("str")
......@@ -75,13 +95,13 @@ def get_data():
print(len(unique_values))
print(df.head(2))
temp = list(range(1,len(unique_values)+1))
temp = list(range(1+app_list_number+level2_number, 1 + app_list_number+level2_number + len(unique_values)))
value_map = dict(zip(unique_values,temp))
df = df.drop("device_id", axis=1)
train = df[df["stat_date"] != validate_date+"stat_date"]
test = df[df["stat_date"] == validate_date+"stat_date"]
for i in ["ucity_id", "clevel1_id", "ccity_name", "device_type", "manufacturer",
for i in ["ucity_id", "ccity_name", "device_type", "manufacturer",
"channel", "top", "l1", "time", "stat_date","l2"]:
train[i] = train[i].map(value_map)
test[i] = test[i].map(value_map)
......@@ -94,7 +114,15 @@ def get_data():
write_csv(train, "tr",100000)
write_csv(test, "va",80000)
return validate_date,value_map
return validate_date,value_map,app_list_map
def app_list_func(x,l):
b = x.split(",")
e = []
for i in b:
e.append(l[i])
return ",".join([str(j) for j in e])
def write_csv(df,name,n):
......@@ -108,20 +136,23 @@ def write_csv(df,name,n):
temp.to_csv(path + name+ "/{}_{}.csv".format(name,i), index=False)
def get_predict(date,value_map):
def get_predict(date,value_map,app_list_map):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,e.cid_id,cut.time " \
"u.device_type,u.manufacturer,u.channel,c.top,cl.l1,cl.l2,e.device_id,e.cid_id,cut.time,dl.app_list " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id " \
"left join cid_level2 cl on e.cid_id = cl.cid " \
"left join cid_time_cut cut on e.cid_id = cut.cid limit 6"
"left join cid_time_cut cut on e.cid_id = cut.cid " \
"left join device_app_list dl on e.device_id = dl.device_id limit 6"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "l1",11:"l2",
12: "device_id", 13: "cid_id", 14: "time"})
12: "device_id", 13: "cid_id", 14: "time",15:"app_list"})
df["stat_date"] = date
df["app_list"] = df["app_list"].fillna("lost_na")
df["app_list"] = df["app_list"].apply(app_list_func,args=(app_list_map,))
print("predict shape")
print(df.shape)
......@@ -171,6 +202,6 @@ def get_predict(date,value_map):
if __name__ == '__main__':
train_data_set = "esmm_train_data"
path = "/data/esmm/"
date,value = get_data()
get_predict(date, value)
date,value,app_list = get_data()
get_predict(date, value,app_list)
......@@ -32,15 +32,15 @@ rm ${DATA_PATH}/nearby/nearby_*
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=2 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=200000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=200000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=2000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/train.py --ctr_task_wgt=0.9 --learning_rate=0.0001 --deep_layers=512,256,128,64,32 --dropout=0.5,0.5,0.5,0.5,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=200000 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/to_database.py
......@@ -33,10 +33,12 @@ def gen_tfrecords(in_file):
id = np.array([])
for j in feats:
id = np.append(id,df[j][i])
app_list = np.array(df["app_list"][i].split(","))
features = tf.train.Features(feature={
"y": tf.train.Feature(float_list=tf.train.FloatList(value=[df["y"][i]])),
"z": tf.train.Feature(float_list=tf.train.FloatList(value=[df["z"][i]])),
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int)))
"ids": tf.train.Feature(int64_list=tf.train.Int64List(value=id.astype(np.int))),
"app_list":tf.train.Feature(int64_list=tf.train.Int64List(value=app_list.astype(np.int)))
})
example = tf.train.Example(features = features)
......
......@@ -53,7 +53,8 @@ def input_fn(filenames, batch_size=32, num_epochs=1, perform_shuffle=False):
features = {
"y": tf.FixedLenFeature([], tf.float32),
"z": tf.FixedLenFeature([], tf.float32),
"ids": tf.FixedLenFeature([11], tf.int64)
"ids": tf.FixedLenFeature([11], tf.int64),
"app_list": tf.VarLenFeature(tf.int64)
}
parsed = tf.parse_single_example(record, features)
......@@ -99,6 +100,7 @@ def model_fn(features, labels, mode, params):
Feat_Emb = tf.get_variable(name='embeddings', shape=[feature_size, embedding_size], initializer=tf.glorot_normal_initializer())
feat_ids = features['ids']
app_list = features['app_list']
if FLAGS.task_type != "infer":
y = labels['y']
......@@ -107,8 +109,10 @@ def model_fn(features, labels, mode, params):
#------build f(x)------
with tf.variable_scope("Shared-Embedding-layer"):
embedding_id = tf.nn.embedding_lookup(Feat_Emb,feat_ids)
app_id = tf.nn.embedding_lookup_sparse(Feat_Emb, sp_ids=app_list, sp_weights=None, combiner="sum")
x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
# x_concat = tf.reshape(embedding_id,shape=[-1, common_dims]) # None * (F * K)
x_concat = tf.concat([tf.reshape(embedding_id,shape=[-1,common_dims]),app_id], axis=1)
with tf.name_scope("CVR_Task"):
if mode == tf.estimator.ModeKeys.TRAIN:
......
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>dfs.hosts.include</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
</property>
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>gmei-hdfs</value>
</property>
<property>
<name>dfs.ha.namenodes.gmei-hdfs</name>
<value>namenode1,namenode2</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode1</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode1</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/data1/qjm/journaldata</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode1</name>
<value>datacenter01:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode1</name>
<value>datacenter01:50470</value>
</property>
<property>
<name>dfs.namenode.name.dir.gmei-hdfs.namenode2</name>
<value>file:///data1/dfs/nn</value>
</property>
<property>
<name>dfs.namenode.shared.edits.dir.gmei-hdfs.namenode2</name>
<value>qjournal://datacenter01:8485;datacenter02:8485;datacenter03:8485;datacenter04:8485;datacenter05:8485/gmei-hdfs</value>
</property>
<property>
<name>dfs.namenode.rpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.gmei-hdfs.namenode2</name>
<value>datacenter02:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50070</value>
</property>
<property>
<name>dfs.namenode.https-address.gmei-hdfs.namenode2</name>
<value>datacenter02:50470</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.gmei-hdfs</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.gmei-hdfs</name>
<value>true</value>
</property>
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/home/hadoop/.ssh/id_rsa</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>zk-kafka01:2181,zk-kafka02:2181,zk-kafka03:2181</value>
</property>
<property>
<name>dfs.block.size</name>
<value>134217728</value>
</property>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.namenode.handler.count</name>
<value>128</value>
</property>
<property>
<name>dfs.namenode.service.handler.count</name>
<value>640</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/data1/dfs/dn,/data2/dfs/dn</value>
</property>
<property>
<name>dfs.datanode.handler.count</name>
<value>64</value>
<description>The number of server threads for the datanode.</description>
</property>
<property>
<name>dfs.datanode.max.transfer.threads</name>
<value>8192</value>
</property>
<property>
<name>dfs.datanode.du.reserved</name>
<value>53687091200</value>
<description>reserve 150G per disk for mapreduce</description>
</property>
<property>
<name>dfs.read.prefetch.size</name>
<value>1342177280</value>
</property>
<property>
<name>dfs.hosts.exclude</name>
<value>${hadoop.home.dir}/etc/hadoop/exclude_slaves</value>
</property>
<property>
<name>dfs.hosts</name>
<value>${hadoop.home.dir}/etc/hadoop/slaves</value>
</property>
<property>
<name>dfs.client.block.write.retries</name>
<value>5</value>
</property>
<property>
<name>dfs.datanode.max.xcievers</name>
<value>4096</value>
</property>
<property>
<name>dfs.safemode.threshold.pct</name>
<value>0.999</value>
</property>
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>10800000</value>
</property>
<property>
<name>heartbeat.recheck.interval</name>
<value>600000</value>
</property>
<property>
<name>dfs.permissions.superusergroup</name>
<value>supergroup</value>
</property>
<property>
<name>dfs.namenode.name.dir.restore</name>
<value>true</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>true</value>
</property>
<!--
<property>
<name>dfs.datanode.failed.volumes.tolerated</name>
<value>1</value>
</property>
-->
<property>
<name>dfs.datanode.balance.bandwidthPerSec</name>
<value>67108864</value>
</property>
<property>
<name>dfs.disk.balancer.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.http-address</name>
<value>nvwa:50070</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>nvwa02:50090</value>
</property>
</configuration>
This source diff could not be displayed because it is too large. You can view the blob instead.
import datetime
from pyspark.sql import HiveContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
import pytispark.pytispark as pti
def test():
......@@ -12,5 +19,33 @@ def test():
and params["page_name"] = "diary_detail" and params["referrer"] = "home" limit 10 ''').show(6)
def esmm_pre():
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(yesterday)
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# gw = SparkContext._gateway
#
# # Import TiExtensions
# java_import(gw.jvm, "org.apache.spark.sql.TiContext")
# Inject TiExtensions, and get a TiContext
# ti = gw.jvm.TiExtensions.getInstance(spark._jsparkSession).getOrCreateTiContext(spark._jsparkSession)
ti = pti.TiContext(spark)
ti.tidbMapDatabase("jerry_test")
# sql("use tpch_test")
spark.sql("select count(*) from esmm_pre_data").show(6)
# conf = SparkConf().setAppName("esmm_pre").set("spark.io.compression.codec", "lzf")
#
# spark.sql("""
# select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from (select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
# union select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
# union select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1 where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='{}'
# """.format(yesterday)).show(6)
if __name__ == '__main__':
test()
esmm_pre()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment