Commit d905ce69 authored by 高雅喆's avatar 高雅喆

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

add GetDevicePortrait
parents 1ec526a7 b3671933
...@@ -114,10 +114,10 @@ class multiFFMFormatPandas: ...@@ -114,10 +114,10 @@ class multiFFMFormatPandas:
x = 0 x = 0
while True: while True:
if x + step < data.__len__(): if x + step < data.__len__():
data_list.append(data.loc[x:x + step]) data_list.append(data.iloc[x:x + step])
x = x + step + 1 x = x + step
else: else:
data_list.append(data.loc[x:data.__len__()]) data_list.append(data.iloc[x:data.__len__()])
break break
return data_list return data_list
...@@ -147,7 +147,7 @@ def get_data(): ...@@ -147,7 +147,7 @@ def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \ "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \ "from esmm_train_data e left join user_feature_clean u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \ "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql) df = con_sql(db, sql)
...@@ -208,7 +208,7 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel): ...@@ -208,7 +208,7 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \ sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \ "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \ "from esmm_pre_data e left join user_feature_clean u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id" "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
df = con_sql(db, sql) df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
......
...@@ -526,3 +526,119 @@ object meigou_xiaofei_renshu { ...@@ -526,3 +526,119 @@ object meigou_xiaofei_renshu {
} }
object smart_rank_count {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = "2018-08-01"
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("WeafareStat")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String] ("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.WeafareStat ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod", tableName = "diary_video")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "blacklist")
ti.tidbMapTable(dbName = "jerry_test", tableName = "bl_device_list")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "data_feed_exposure")
ti.tidbMapTable(dbName = "jerry_prod", tableName = "merge_queue_table")
import sc.implicits._
val stat_date = GmeiConfig.getMinusNDate(1)
//println(param.date)
val partition_date = stat_date.replace("-","")
val agency_id = sc.sql(
s"""
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_day
|WHERE partition_date >= '20180402'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
|UNION ALL
|SELECT DISTINCT(cl_id) as device_id
|FROM online.ml_hospital_spam_pv_month
|WHERE partition_date >= '20171101'
|AND partition_date <= '${partition_date}'
|AND pv_ratio >= 0.95
""".stripMargin
)
agency_id.createOrReplaceTempView("agency_id")
val blacklist_id = sc.sql(
s"""
|SELECT device_id
|from blacklist
""".stripMargin
)
blacklist_id.createOrReplaceTempView("blacklist_id")
val final_id = sc.sql(
s"""
|select device_id
|from agency_id
|UNION ALL
|select device_id
|from blacklist_id
""".stripMargin
)
final_id.createOrReplaceTempView("final_id")
val meigou_price = sc.sql(
s"""
|select cl_id,city_id,params['business_id'] as meigou_id
|from online.tl_hdfs_maidian_view
|where action = "page_view"
|and params['page_name']="welfare_detail"
|and partition_date ='20181201'
|LIMIT 10
""".stripMargin
)
meigou_price.show(80)
// GmeiConfig.writeToJDBCTable(meigou_price, "meigou_price", SaveMode.Overwrite)
}
}
}
...@@ -68,12 +68,20 @@ def sort_app(): ...@@ -68,12 +68,20 @@ def sort_app():
"job": {"智联招聘", "前程无忧", "斗米", "拉勾", "Boss直聘", "猎聘同道", "智联招聘"} "job": {"智联招聘", "前程无忧", "斗米", "拉勾", "Boss直聘", "猎聘同道", "智联招聘"}
} }
df["app_list"] = df["app_list"].apply(json_format) df["app_list"] = df["app_list"].apply(json_format)
n = df.shape[0]
df["sum"] = 0
for i in category.keys(): for i in category.keys():
df[i] = df["app_list"].apply(lambda x: 1 if len(x & category[i]) > 0 else 0) df[i] = df["app_list"].apply(lambda x: 1 if len(x & category[i]) > 0 else 0)
print(i) df["sum"] = df["sum"]+df[i]
print(df[i].value_counts()) # print(i)
# print(df.loc[df[i]==1].shape[0]/n)
df = df.drop("app_list",axis=1) df = df.drop("app_list",axis=1)
# for i in df["sum"].unique():
# print(i)
# a = df.loc[df["sum"] == i].shape[0]/n
# print(a)
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8') yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
print(df.shape) print(df.shape)
n = 200000 n = 200000
......
import pandas as pd
import pymysql
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def exp():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select manufacturer,channel from user_feature"
df = con_sql(db, sql)
n = df.shape[0]
manufacturer = df[0].unique()
manufacturer_map = {}
print("manufacturer unique")
print(len(manufacturer))
for i in manufacturer:
manufacturer_map[i] = df.loc[df[0]==i].shape[0]/n
print(sorted(manufacturer_map.items(),key = lambda x:x[1]))
channel = df[1].unique()
channel_map = {}
print("channel unique")
print(len(channel))
for i in channel:
channel_map[i] = df.loc[df[1] == i].shape[0] / n
print(sorted(channel_map.items(), key=lambda x: x[1]))
def clean():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select device_id,device_type,manufacturer,channel,city_id from user_feature"
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id",1: "device_type", 2: "manufacturer", 3: "channel", 4: "city_id"})
n = df.shape[0]
manufacturer = df["manufacturer"].unique()
for i in manufacturer:
if df.loc[df["manufacturer"]==i].shape[0]/n < 0.0005:
df.loc[df["manufacturer"] == i,["manufacturer"]] = "other"
channel = df["channel"].unique()
for i in channel:
if df.loc[df["channel"] == i].shape[0] / n < 0.0001:
df.loc[df["channel"] == i, ["channel"]] = "other"
from sqlalchemy import create_engine
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
n = 200000
for i in range(0,df.shape[0],n):
print(i)
if i == 0:
temp = df.loc[0:n]
elif i+n > df.shape[0]:
temp = df.loc[i+1:]
else:
temp = df.loc[i+1:i+n]
pd.io.sql.to_sql(temp, "user_feature_clean", yconnect, schema='jerry_test', if_exists='append', index=False)
print("insert done")
if __name__ == "__main__":
clean()
...@@ -98,12 +98,7 @@ class multiFFMFormatPandas: ...@@ -98,12 +98,7 @@ class multiFFMFormatPandas:
result_map = {} result_map = {}
for i in data_list: for i in data_list:
print("before:total")
print(len(result_map))
print(len(i.get()))
result_map.update(i.get()) result_map.update(i.get())
print("result_map")
print(len(result_map))
pool.close() pool.close()
pool.join() pool.join()
...@@ -120,10 +115,10 @@ class multiFFMFormatPandas: ...@@ -120,10 +115,10 @@ class multiFFMFormatPandas:
x = 0 x = 0
while True: while True:
if x + step < data.__len__(): if x + step < data.__len__():
data_list.append(data.loc[x:x + step]) data_list.append(data.iloc[x:x + step])
x = x + step + 1 x = x + step
else: else:
data_list.append(data.loc[x:data.__len__()]) data_list.append(data.iloc[x:data.__len__()])
break break
return data_list return data_list
...@@ -153,7 +148,7 @@ def get_data(): ...@@ -153,7 +148,7 @@ def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \ sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \ "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \ "from esmm_train_data e left join user_feature_clean u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \ "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start) "where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql) df = con_sql(db, sql)
...@@ -180,6 +175,7 @@ def get_data(): ...@@ -180,6 +175,7 @@ def get_data():
manufacturer = list(set(df["manufacturer"].values.tolist())) manufacturer = list(set(df["manufacturer"].values.tolist()))
channel = list(set(df["channel"].values.tolist())) channel = list(set(df["channel"].values.tolist()))
return df,validate_date,ucity_id,ccity_name,manufacturer,channel return df,validate_date,ucity_id,ccity_name,manufacturer,channel
...@@ -203,10 +199,10 @@ def transform(a,validate_date): ...@@ -203,10 +199,10 @@ def transform(a,validate_date):
train = train.drop("stat_date",axis=1) train = train.drop("stat_date",axis=1)
test = df[df["stat_date"] == validate_date] test = df[df["stat_date"] == validate_date]
test = test.drop("stat_date",axis=1) test = test.drop("stat_date",axis=1)
# print("train shape") print("train shape")
# print(train.shape) print(train.shape)
# train.to_csv(path + "tr.csv", sep="\t", index=False) train.to_csv(path + "tr.csv", sep="\t", index=False)
# test.to_csv(path + "va.csv", sep="\t", index=False) test.to_csv(path + "va.csv", sep="\t", index=False)
return model return model
...@@ -215,7 +211,7 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel): ...@@ -215,7 +211,7 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test') db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \ sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \ "u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \ "from esmm_pre_data e left join user_feature_clean u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id" "left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id"
df = con_sql(db, sql) df = con_sql(db, sql)
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name", df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
...@@ -224,23 +220,23 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel): ...@@ -224,23 +220,23 @@ def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
print("before filter:") print("before filter:")
print(df.shape) print(df.shape)
print(df.loc[df["device_id"]=="358035085192742"].shape)
df = df[df["ucity_id"].isin(ucity_id)] df = df[df["ucity_id"].isin(ucity_id)]
print("after ucity filter:") print("after ucity filter:")
print(df.shape) print(df.shape)
print(df.loc[df["device_id"] == "358035085192742"].shape)
df = df[df["ccity_name"].isin(ccity_name)] df = df[df["ccity_name"].isin(ccity_name)]
print("after ccity_name filter:") print("after ccity_name filter:")
print(df.shape) print(df.shape)
print(df.loc[df["device_id"] == "358035085192742"].shape)
df = df[df["manufacturer"].isin(manufacturer)] df = df[df["manufacturer"].isin(manufacturer)]
print("after manufacturer filter:") print("after manufacturer filter:")
print(df.shape) print(df.shape)
print(df.loc[df["device_id"] == "358035085192742"].shape)
df = df[df["channel"].isin(channel)] df = df[df["channel"].isin(channel)]
print("after channel filter:") print("after channel filter:")
print(df.shape) print(df.shape)
print(df.loc[df["device_id"] == "358035085192742"].shape)
df["cid_id"] = df["cid_id"].astype("str") df["cid_id"] = df["cid_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str") df["clevel1_id"] = df["clevel1_id"].astype("str")
df["top"] = df["top"].astype("str") df["top"] = df["top"].astype("str")
......
import time import pandas as pd
from pyspark.context import SparkContext import pymysql
from pyspark.conf import SparkConf
conf = SparkConf().setMaster("spark://10.30.181.88:7077").setAppName("My app") def con_sql(db,sql):
sc = SparkContext(conf=conf) cursor = db.cursor()
sc.setLogLevel("WARN") try:
for i in range(1,100): cursor.execute(sql)
print(i) result = cursor.fetchall()
time.sleep(5) df = pd.DataFrame(list(result))
\ No newline at end of file except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def exp():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select native_queue from esmm_device_diary_queue where device_id = '358035085192742'"
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchone()[0]
native = tuple(result.split(","))
print("total")
print(len(native))
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select diary_id,level1_ids,level2_ids,level3_ids from diary_feat where diary_id in {}".format(native)
df = con_sql(db,sql)
n = df.shape[0]
one = df[1].unique()
one_map = {}
for i in one:
one_map[i] = df.loc[df[1]==i].shape[0]/n
print(sorted(one_map.items(),key = lambda x:x[1]))
two = df[2].unique()
two_map = {}
print("分界线")
for i in two:
two_map[i] = df.loc[df[2] == i].shape[0] / n
print(sorted(two_map.items(), key=lambda x: x[1]))
def click():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select d.cid_id,f.level1_ids,f.level2_ids from data_feed_click d left join diary_feat f " \
"on d.cid_id = f.diary_id where d.device_id = '358035085192742' " \
"and (d.cid_type = 'diary' or d.cid_type = 'diary_video') and d.stat_date > '2018-12-20'"
df = con_sql(db, sql)
n = df.shape[0]
print(n)
one = df[1].unique()
one_map = {}
for i in one:
one_map[i] = df.loc[df[1] == i].shape[0] / n
print(sorted(one_map.items(), key=lambda x: x[1],reverse=True))
two = df[2].unique()
two_map = {}
print("分界线")
for i in two:
two_map[i] = df.loc[df[2] == i].shape[0] / n
print(sorted(two_map.items(), key=lambda x: x[1],reverse=True))
if __name__ == "__main__":
click()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment