Commit 90482418 authored by 王志伟's avatar 王志伟
parents 65e952af a56d35ff
......@@ -225,7 +225,6 @@ object EsmmData {
}
object EsmmPredData {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
......@@ -432,7 +431,6 @@ object EsmmPredData {
}
object GetDiaryPortrait {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
......@@ -543,6 +541,7 @@ object GetDevicePortrait {
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
import sc.implicits._
val stat_date = param.date.replace("-","")
......@@ -570,19 +569,94 @@ object GetDevicePortrait {
val max_count_tag = sc.sql(
s"""
|select a.device_id,a.stat_date,a.level1_id,a.level1_count
|select a.device_id,a.stat_date,a.level1_id as max_level1_id,a.level1_count as max_level1_count
|from tag_count a
|inner join
|(select device_id,max(level1_count) as max_count from tag_count group by device_id) b
|on a.level1_count = b.max_count and a.device_id = b.device_id
""".stripMargin
)
max_count_tag.show()
println(max_count_tag.count())
// .rdd.map(x => (x(0).toString,x(1).toString,x(2).toString,x(3).toString))
// max_count_tag.foreachPartition(GmeiConfig.updateDeviceFeat)
//
// max_count_tag.take(10).foreach(println)
// println(max_count_tag.count())
//drop duplicates
val max_count_tag_rdd = max_count_tag.rdd.groupBy(_.getAs[String]("device_id")).map {
case (device_id,data) =>
val stat_date = data.map(_.getAs[String]("stat_date")).head
val max_level1_id = data.map(_.getAs[String]("max_level1_id")).head.toString
val max_level1_count = data.map(_.getAs[Long]("max_level1_count")).head.toString
(device_id,stat_date,max_level1_id,max_level1_count)
}.filter(_._1!=null)
max_count_tag_rdd.foreachPartition(GmeiConfig.updateDeviceFeat)
max_count_tag_rdd.take(10).foreach(println)
println(max_count_tag_rdd.count())
sc.stop()
}
}
}
object GetLevelCount {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
// GmeiConfig.writeToJDBCTable(diary_tag,"diary_feat",SaveMode.Overwrite)
case class Params(env: String = "dev",
path: String = null
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String]("path")
.text(s"the path you used")
.action((x,c) => c.copy(path = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
val diary_queue = sc.read.json(param.path).rdd.map(x => x(0).toString).distinct().collect().toList.mkString(",")
val diary_level1 = sc.sql(
s"""
|select diary_id,explode(split(level1_ids,';')) from diary_feat
|where diary_id in (${diary_queue})
""".stripMargin
)
diary_level1.show()
println(diary_level1.count())
val level1_count = diary_level1.rdd.map(x => (x(1).toString)).map(level1 => (level1,1)).reduceByKey((a,b) => a+b).map(p => (p._1,p._2)).sortByKey(false,2)
level1_count.collect()
sc.stop()
......
......@@ -82,7 +82,7 @@ object GmeiConfig extends Serializable {
def updateDeviceFeat(iterator: Iterator[(String,String,String,String)]): Unit ={
var conn: Connection= null
var ps:java.sql.PreparedStatement=null
val sql=s"replace into device_feat(device_id,stat_date,level1_id,level1_count) values(?,?,?,?)"
val sql=s"replace into device_feat(device_id,stat_date,max_level1_id,max_level1_count) values(?,?,?,?)"
conn=DriverManager.getConnection("jdbc:mysql://10.66.157.22:4000/jerry_prod","root","3SYz54LS9#^9sBvC")
ps = conn.prepareStatement(sql)
try{
......
......@@ -29,11 +29,17 @@ def json_format(x):
def sort_app():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select device_id,app_list from device_id_applist"
sql = "select device_id,app_list,stat_date from device_id_applist"
df = con_sql(db, sql).dropna()
df = df.rename(columns={0: "device_id", 1: "app_list"})
df = df.rename(columns={0: "device_id", 1: "app_list",2:"stat_date"})
print(df.shape)
df = df.sort_values(by="stat_date",ascending=False)
df = df.drop("stat_date",axis=1)
df = df.drop_duplicates("device_id")
print(df.shape)
df = df.loc[df["app_list"].apply(is_json)]
category = {"competitor":{"新氧美容","悦美","美呗整形","悦美微整形","如丽美容","医美咖","整形去哪儿","美黛拉","整形思密达","美芽"},
"dianshang":{"京东","淘宝","唯品会","天猫","苏宁易购","国美","当当","亚马逊","网易严选","小米有品"},
"kuajing_dianshang": {"小红书", "网易考拉", "洋码头", "达令全球好货", "海狐海淘",
......@@ -88,11 +94,11 @@ def sort_app():
for i in range(0,df.shape[0],n):
print(i)
if i == 0:
temp = df.loc[0:n]
temp = df.iloc[0:n]
elif i+n > df.shape[0]:
temp = df.loc[i+1:]
temp = df.iloc[i:]
else:
temp = df.loc[i+1:i+n]
temp = df.loc[i:i+n]
pd.io.sql.to_sql(temp, "app_list_sort", yconnect, schema='jerry_test', if_exists='append', index=False)
print("insert done")
......
......@@ -152,9 +152,11 @@ def get_data():
# "a.read,a.finance,a.fashion_clothes,a.muying,a.fresh,a.bijia,a.travel,a.airplane," \
# "a.love,a.stock,a.car,a.child,a.homework,a.work,a.job " \
# "from esmm_train_data e left join user_feature_clean u on e.device_id = u.device_id " \
# "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
# "left join cid_type_top c on e.device_id = c.device_id " \
# "left join cid_time on e.cid_id = cid_time.cid_id " \
# "left join app_list_sort a on e.device_id = a.device_id " \
# "where e.stat_date >= '{}'".format(start)
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
"from esmm_train_data e left join user_feature_clean u on e.device_id = u.device_id " \
......@@ -162,28 +164,26 @@ def get_data():
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time", 11: "device_id"})
print("esmm data ok")
print(df.head(2))
for i in range(12,35):
df[i] = df[i].astype("str")
df[i] = df[i].fillna(0)
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date","device_id"], axis=1)
df = df.fillna(0)
print(df.head(2))
features = 0
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel","top"]:
features = features + len(df[i].unique())
df[i] = df[i].fillna(0)
df["time"] = df["time"].fillna(df["time"].mode()[0])
print(df.head(2))
print("fields:{}".format(df.shape[1]-1))
print("features:{}".format(features+46))
print("features:{}".format(features))
ccity_name = list(set(df["ccity_name"].values.tolist()))
ucity_id = list(set(df["ucity_id"].values.tolist()))
manufacturer = list(set(df["manufacturer"].values.tolist()))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment