Commit 3fa19de3 authored by 高雅喆's avatar 高雅喆

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

GetPortrait
parents 58644fb0 27ae8fe8
......@@ -468,7 +468,7 @@ object Gini_coefficient {
|group by temp1.diary_id
""".stripMargin
)
GmeiConfig.writeToJDBCTable(diary_clk_num, "Gini_coefficient", SaveMode.Append)
GmeiConfig.writeToJDBCTable(diary_clk_num, "Gini_coefficient", SaveMode.Overwrite)
}
......
......@@ -182,3 +182,118 @@ object temp_analysis {
}
}
object ARPU_COM {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
//println(param.date)
val partition_date = stat_date.replace("-","")
val agency_id = sc.sql(
s"""
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date >= '20180402'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
|UNION ALL
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date >= '20171101'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
""".stripMargin
)
agency_id.createOrReplaceTempView("agency_id")
val blacklist_id = sc.sql(
s"""
|SELECT device_id
|from blacklist
""".stripMargin
)
blacklist_id.createOrReplaceTempView("blacklist_id")
val final_id = sc.sql(
s"""
|select device_id
|from agency_id
|UNION ALL
|select device_id
|from blacklist_id
""".stripMargin
)
final_id.createOrReplaceTempView("final_id")
val diary_clk_all = sc.sql(
s"""
|select count(md.payment) as pay_all,count(distinct(md.device_id)) as pay_people,count(md.payment)/count(distinct(md.device_id))
|from online.ml_meigou_order_detail md left join final_id
|on md.device_id = final_id.device_id
|where md.status='2'
|and final_id.device_id is null
|and md.partition_date = '20181218'
|and md.pay_time is not null
|and md.pay_time >= '2018-01-01'
""".stripMargin
)
diary_clk_all.show(80)
}
}
}
......@@ -66,13 +66,13 @@ object testt {
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date >= '20180402'
|AND partition_date <= '20181203'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
|UNION ALL
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date >= '20171101'
|AND partition_date <= '20181203'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
""".stripMargin
)
......
......@@ -14,7 +14,7 @@ def con_sql(db,sql):
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
......@@ -138,38 +138,40 @@ class multiFFMFormatPandas:
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_test"
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=15)).strftime("%Y-%m-%d")
start = (temp - datetime.timedelta(days=30)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.device_id,e.y,e.z,e.stat_date,e.ucity_id,e.cid_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel," \
"home.jingxuan,home.zhibo,home.nose,home.eyes,home.weizheng,home.teeth,home.lunkuo," \
"home.meifu,home.xizhi,home.zhifang,home.longxiong,home.simi,home.maofa,home.gongli,home.korea " \
"from esmm_train_test e left join user_feature u on e.device_id = u.device_id " \
"left join home_tab_click home on e.device_id = home.device_id " \
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name"})
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time"})
print("esmm data ok")
print(df.head(2))
ucity_id = list(set(df["ucity_id"].values.tolist()))
cid = list(set(df["cid_id"].values.tolist()))
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","device_id"], axis=1).fillna(0.0)
df["top"] = df["top"].astype("str")
df["y"] = df["stat_date"].str.cat([df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date"], axis=1).fillna(0.0)
print(df.head(2))
features = 0
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
features = features + len(df[i].unique())
print("fields:{}".format(df.shape[1]-1))
print("features:{}".format(len(cid)))
return df,validate_date,ucity_id,cid
print("features:{}".format(features))
ccity_name = list(set(df["ccity_name"].values.tolist()))
ucity_id = list(set(df["ucity_id"].values.tolist()))
return df,validate_date,ucity_id,ccity_name
def transform(a,validate_date):
......@@ -177,13 +179,10 @@ def transform(a,validate_date):
df = model.fit_transform(a, y="y", n=160000, processes=22)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[1:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0,"seq"], axis=1)
print(df.head(2))
......@@ -200,34 +199,34 @@ def transform(a,validate_date):
return model
def get_predict_set(ucity_id, cid,model):
def get_predict_set(ucity_id,model,ccity_name):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.device_id,e.y,e.z,e.stat_date,e.ucity_id,e.cid_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel," \
"home.jingxuan,home.zhibo,home.nose,home.eyes,home.weizheng,home.teeth,home.lunkuo," \
"home.meifu,home.xizhi,home.zhifang,home.longxiong,home.simi,home.maofa,home.gongli,home.korea,e.label " \
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join home_tab_click home on e.device_id = home.device_id"
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name",26:"label"})
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
11:"device_id",12:"cid_id"})
print("before filter:")
print(df.shape)
df = df[df["cid_id"].isin(cid)]
print("after cid filter:")
print(df.shape)
df = df[df["ucity_id"].isin(ucity_id)]
print("after ucity filter:")
print(df.shape)
df["clevel1_id"] = df["clevel1_id"].astype("str")
df = df[df["ccity_name"].isin(ccity_name)]
print("after ccity_name filter:")
print(df.shape)
df["cid_id"] = df["cid_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["label"] = df["label"].astype("str")
df["y"] = df["label"].str.cat(
[df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
df = df.drop(["z","label","device_id"], axis=1).fillna(0.0)
df = df.drop(["z","label","device_id","cid_id"], axis=1).fillna(0.0)
print(df.head(2))
df = model.transform(df,n=160000, processes=22)
df = pd.DataFrame(df)
......@@ -258,13 +257,12 @@ def get_predict_set(ucity_id, cid,model):
if __name__ == "__main__":
path = "/home/gaoyazhe/esmm/data/"
path = "/home/gmuser/ffm/"
a = time.time()
df, validate_date, ucity_id, cid = get_data()
df, validate_date, ucity_id,ccity_name = get_data()
model = transform(df, validate_date)
get_predict_set(ucity_id, cid,model)
get_predict_set(ucity_id,model,ccity_name)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment