Commit f2a05193 authored by 王志伟's avatar 王志伟
parents 6b6b6a8c 6c231f61
......@@ -226,7 +226,6 @@ object EsmmData {
object EsmmPredData {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
......@@ -425,6 +424,77 @@ object EsmmPredData {
sc.stop()
}
}
}
object GetPortrait {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = GmeiConfig.getMinusNDate(1)
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String]("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
val diary_tag = sc.sql(
s"""
|select d.diary_id,
|(case when d.tag_type = '1' then d.level1_ids else "" end) level1_ids,
|(case when d.tag_type = '2' then d.level2_ids else "" end) level2_ids,
|(case when d.tag_type = '3' then d.level3_ids else "" end) level3_ids from
| (select c.diary_id,c.tag_type,
| concat_ws(c.level1_id) as level1_ids
| concat_ws(c.level2_id) as level2_ids
| concat_ws(c.level3_id) as level3_ids from
| (select a.diary_id,a.tag_id,b.tag_type,b.level1_id,b.level2_id,b.level3_id
| from tl_hdfs_diary_tags_view a
| left join bl_tag_hierarchy_detail b
| on a.tag_id = b.id
| where a.partition_date = '20181218'
| and b.partition_date = '20181218') c
| group by c.diary_id,c.tag_type) d
|group by d.diary_id
""".stripMargin
)
diary_tag.show()
sc.stop()
}
......
......@@ -142,7 +142,7 @@ def get_data():
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=18)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
......@@ -152,20 +152,26 @@ def get_data():
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name"})
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time"})
print("esmm data ok")
print(df.head(2))
ucity_id = list(set(df["ucity_id"].values.tolist()))
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["stat_date"].str.cat([df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date"], axis=1).fillna(0.0)
print(df.head(2))
features = len(df["ucity_id"].unique())
features = 0
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
features = features + len(df[i].unique())
print("fields:{}".format(df.shape[1]-1))
print("features:{}".format(features))
return df,validate_date,ucity_id
ccity_name = list(set(df["ccity_name"].values.tolist()))
ucity_id = list(set(df["ucity_id"].values.tolist()))
return df,validate_date,ucity_id,ccity_name
def transform(a,validate_date):
......@@ -193,28 +199,34 @@ def transform(a,validate_date):
return model
def get_predict_set(ucity_id,model):
def get_predict_set(ucity_id,model,ccity_name):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time " \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid = cid_time.cid_id"
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name"})
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
11:"device_id",12:"cid_id"})
print("before filter:")
print(df.shape)
df = df[df["ucity_id"].isin(ucity_id)]
print("after ucity filter:")
print(df.shape)
df = df[df["ccity_name"].isin(ccity_name)]
print("after ccity_name filter:")
print(df.shape)
df["cid_id"] = df["cid_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["label"] = df["label"].astype("str")
df["y"] = df["label"].str.cat(
[df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
df = df.drop(["z","label","device_id"], axis=1).fillna(0.0)
df = df.drop(["z","label","device_id","cid_id"], axis=1).fillna(0.0)
print(df.head(2))
df = model.transform(df,n=160000, processes=22)
df = pd.DataFrame(df)
......@@ -247,9 +259,9 @@ def get_predict_set(ucity_id,model):
if __name__ == "__main__":
path = "/home/gmuser/ffm/"
a = time.time()
df, validate_date, ucity_id = get_data()
df, validate_date, ucity_id,ccity_name = get_data()
model = transform(df, validate_date)
# get_predict_set(ucity_id,model)
get_predict_set(ucity_id,model,ccity_name)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment