Commit 6d090386 authored by 王志伟's avatar 王志伟
parents d2a571ec a6cbd3c8
......@@ -146,14 +146,14 @@ def get_data():
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time " \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time"})
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
print("esmm data ok")
print(df.head(2))
......@@ -161,8 +161,8 @@ def get_data():
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["stat_date"].str.cat([df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date"], axis=1).fillna(0.0)
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date","device_id"], axis=1).fillna(0.0)
print(df.head(2))
features = 0
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
......@@ -171,7 +171,9 @@ def get_data():
print("features:{}".format(features))
ccity_name = list(set(df["ccity_name"].values.tolist()))
ucity_id = list(set(df["ucity_id"].values.tolist()))
return df,validate_date,ucity_id,ccity_name
manufacturer = list(set(df["manufacturer"].values.tolist()))
channel = list(set(df["channel"].values.tolist()))
return df,validate_date,ucity_id,ccity_name,manufacturer,channel
def transform(a,validate_date):
......@@ -179,10 +181,13 @@ def transform(a,validate_date):
df = model.fit_transform(a, y="y", n=160000, processes=22)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["y"] = df[0].apply(lambda x: x.split(",")[2])
df["z"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[1:]))
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[2:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0,"seq"], axis=1)
print(df.head(2))
......@@ -199,7 +204,7 @@ def transform(a,validate_date):
return model
def get_predict_set(ucity_id,model,ccity_name):
def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
......@@ -215,6 +220,8 @@ def get_predict_set(ucity_id,model,ccity_name):
print("after ucity filter:")
print(df.shape)
df = df[df["ccity_name"].isin(ccity_name)]
df = df[df["manufacturer"].isin(manufacturer)]
df = df[df["channel"].isin(channel)]
print("after ccity_name filter:")
print(df.shape)
df["cid_id"] = df["cid_id"].astype("str")
......@@ -259,9 +266,9 @@ def get_predict_set(ucity_id,model,ccity_name):
if __name__ == "__main__":
path = "/home/gaoyazhe/data/"
a = time.time()
df, validate_date, ucity_id,ccity_name = get_data()
df, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
model = transform(df, validate_date)
get_predict_set(ucity_id,model,ccity_name)
get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
......@@ -30,9 +30,7 @@ def main():
df1 = pd.read_csv("/home/gaoyazhe/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
ctime = int(time.time())
df3["time"] = ctime
df3.columns = ["device_id","city_id","native_queue","time"]
df3.columns = ["device_id","city_id","native_queue"]
print("native_device_count",df3.shape)
......@@ -48,6 +46,8 @@ def main():
#union
df_all = pd.merge(df3,df4,on=['device_id','city_id'],how='outer').fillna("")
ctime = int(time.time())
df_all["time"] = ctime
print("union_device_count",df_all.shape)
......@@ -62,7 +62,14 @@ def main():
engine = create_engine(str(r"mysql+mysqldb://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
try:
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='replace',index=False)
df_merge = df_all[['device_id','city_id']].apply(lambda x: ''.join(x),axis=1)
df_merge_str = (str(list(df_merge.values))).strip('[]')
delete_str = 'delete from esmm_device_diary_queue where concat(device_id,city_id) in ({0})'.format(df_merge_str)
con = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cur = con.cursor()
cur.execute(delete_str)
con.commit()
df_all.to_sql('esmm_device_diary_queue',con=engine,if_exists='append',index=False)
except Exception as e:
print(e)
......
......@@ -14,7 +14,7 @@ rm ${DATA_PATH}/tr/*
rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
rm ${DATA_PATH}/model_ckpt/DeepCvrMTL/2018*
rm -r ${DATA_PATH}/model_ckpt/DeepCvrMTL/2018*
echo "data2ffm"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/data2ffm.py > ${DATA_PATH}/infer.log
......
......@@ -272,18 +272,18 @@ object EsmmPredData {
import sc.implicits._
val yesteday_have_seq = GmeiConfig.getMinusNDate(7)
val yesteday_have_seq = GmeiConfig.getMinusNDate(1)
//nearby_data
val raw_data = sc.sql(
s"""
|select concat(tmp1.device_id,",",tmp1.city_id) as device_city, tmp1.merge_queue from
|(select device_id,city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
|(select device_id,if(city_id='world','worldwide',city_id) city_id,similarity_cid as merge_queue from nd_device_cid_similarity_matrix
|union
|select device_id,city_id,native_queue as merge_queue from ffm_diary_queue
|select device_id,if(city_id='world','worldwide',city_id) city_id,native_queue as merge_queue from ffm_diary_queue
|union
|select device_id,city_id,search_queue as merge_queue from search_queue) as tmp1
|where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date>'${yesteday_have_seq}')
|where tmp1.device_id in (select distinct device_id from data_feed_click where stat_date='${yesteday_have_seq}')
""".stripMargin
)
raw_data.show()
......@@ -312,8 +312,9 @@ object EsmmPredData {
val native_data = sc.sql(
s"""
|select distinct a.device_id,a.city_id,b.native_queue from data_feed_click a
|left join biz_feed_diary_queue b on a.city_id = b.city_id
|where a.stat_date>'${yesteday_have_seq}' and b.native_queue != ""
|left join (select if(city_id='world','worldwide',city_id) city_id,native_queue from biz_feed_diary_queue) b
|on a.city_id = b.city_id
|where a.stat_date='${yesteday_have_seq}' and b.native_queue != ""
""".stripMargin
)
native_data.createOrReplaceTempView("native_data")
......@@ -432,7 +433,7 @@ object EsmmPredData {
}
object GetPortrait {
object GetDiaryPortrait {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
......@@ -477,9 +478,9 @@ object GetPortrait {
val diary_tag = sc.sql(
s"""
|select c.diary_id,
| concat_ws(',',collect_set(cast(c.level1_id as string))) as level1_ids,
| concat_ws(',',collect_set(cast(c.level2_id as string))) as level2_ids,
| concat_ws(',',collect_set(cast(c.level3_id as string))) as level3_ids from
| concat_ws(';',collect_set(cast(c.level1_id as string))) as level1_ids,
| concat_ws(';',collect_set(cast(c.level2_id as string))) as level2_ids,
| concat_ws(';',collect_set(cast(c.level3_id as string))) as level3_ids from
| (select a.diary_id,b.level1_id,b.level2_id,b.level3_id
| from online.tl_hdfs_diary_tags_view a
| left join online.bl_tag_hierarchy_detail b
......@@ -495,6 +496,80 @@ object GetPortrait {
GmeiConfig.writeToJDBCTable(diary_tag,"diary_feat",SaveMode.Overwrite)
sc.stop()
}
}
}
object GetDevicePortrait {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)
case class Params(env: String = "dev",
date: String = GmeiConfig.getMinusNDate(1)
) extends AbstractParams[Params] with Serializable
val defaultParams = Params()
val parser = new OptionParser[Params]("Feed_EDA") {
head("EsmmData")
opt[String]("env")
.text(s"the databases environment you used")
.action((x, c) => c.copy(env = x))
opt[String]("date")
.text(s"the date you used")
.action((x,c) => c.copy(date = x))
note(
"""
|For example, the following command runs this app on a tidb dataset:
|
| spark-submit --class com.gmei.EsmmData ./target/scala-2.11/feededa-assembly-0.1.jar \
""".stripMargin +
s"| --env ${defaultParams.env}"
)
}
def main(args: Array[String]): Unit = {
parser.parse(args, defaultParams).map { param =>
GmeiConfig.setup(param.env)
val spark_env = GmeiConfig.getSparkSession()
val sc = spark_env._2
val ti = new TiContext(sc)
ti.tidbMapTable(dbName = "jerry_prod",tableName = "data_feed_click")
ti.tidbMapTable(dbName = "jerry_prod",tableName = "diary_feat")
val stat_date = param.date.replace("-","")
val device_search_tag = sc.sql(
s"""
|select a.cl_id as device_id,COALESCE(a.params['diary_id'], a.params['business_id'], 0) as cid_id,split(c.level1_ids,',')[0] as level1_id
|from online.tl_hdfs_maidian_view as a
|left join
| (select cl_id as device_id,max(partition_date) as stat_date
| from online.tl_hdfs_maidian_view
| where action = 'on_click_diary_card'
| and (params['page_name']='search_result_diary' or params['page_name']='search_result_more')
| and partition_date > '20180601' group by cl_id) b
|on a.cl_id = b.device_id and a.partition_date=b.stat_date
|left join diary_feat c
|on COALESCE(a.params['diary_id'], a.params['business_id'], 0) = c.diary_id
|where a.partition_date > '20180601'
|and COALESCE(a.params['diary_id'], a.params['business_id'], 0) != 0
|and c.level1_ids != ""
""".stripMargin
)
device_search_tag.show()
println(device_search_tag.count())
// GmeiConfig.writeToJDBCTable(diary_tag,"diary_feat",SaveMode.Overwrite)
sc.stop()
}
......
import pandas as pd
import pymysql
import json
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result))
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
def is_json(myjson):
try:
json.loads(myjson)
except ValueError:
return False
return True
def json_format(x):
a = json.loads(x)
return set([x["appName"] for x in a])
def sort_app():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
sql = "select device_id,app_list from device_id_applist"
df = con_sql(db, sql).dropna()
df = df.rename(columns={0: "device_id", 1: "app_list"})
df = df.loc[df["app_list"].apply(is_json)]
category = {"competitor":{"新氧美容"},
"dianshang":{"京东","淘宝","唯品会","天猫","苏宁易购","国美","当当","亚马逊","网易严选","小米有品"},
"kuajing_dianshang": {"小红书", "网易考拉", "洋码头", "达令全球好货", "海狐海淘",
"HIG0", "豌豆公主", "尚品网", "丰趣海淘", "比呀比海外购"},
"zhibo": {"YY直播", "映客直播", "花椒直播", "NOW直播", "小米直播", "一直播", "KK直播", "酷狗直播",
"来疯直播", "喵播"},
"youxizhibo": {"虎牙直播", "斗鱼直播", "熊猫直播", "触手直播", "企鹅电竞", "龙珠直播", "战旗直播",
"全民直播", "CC直播", "火猫直播"},
"short_video": {"抖音短视频", "快手", "西瓜视频", "火山小视频", "秒拍", "快视频", "小影", "蛙趣视频",
"最右", "小咖秀"},
"meitu": {"美图秀秀", "美颜相机", "天天P图", "Faceu激萌", "B612咔叽", "in", "相机360", "潮自拍",
"玩图", "nice","轻颜相机","无他相机"},
"tiyu": {"直播吧", "腾讯体育", "新浪体育", "虎扑体育", "懂球帝", "CCTV5", "疯狂体育",
"球探体育比分", "PP体育", "A8体育直播"},
"read":{"掌阅", "QQ阅读", "咪咕阅读", "书旗小说", "多看阅读", "追书神器", "搜狗阅读", "微信读书",
"起点小说", "宜搜小说"},
"finance": {"21财经", "华尔街见闻", "新浪财经", "时代财经", "和讯财经", "第一财经", "FT中文网", "财经杂志", "财新", "央视财经"},
"fashion_clothes": {"蘑菇街", "聚美优品", "美丽说", "楚楚街", "穿衣助手", "有货", "优品惠", "优购时尚商城", "走秀奢侈品"},
"muying": {"贝贝网", "蜜芽", "孩子王", "妈妈100", "大V店", "宝贝格子", "乐友", "母婴之家", "国际妈咪海淘母婴商城", "美囤妈妈",
"妈妈网孕育", "宝宝树孕育", "辣妈帮", "亲宝宝", "宝宝知道", "妈妈社区", "妈妈帮", "柚宝宝", "育儿宝"},
"fresh": {"每日优鲜", "京东到家", "天天果园", "中粮我买网", "本来生活", "手机惠农", "盒马", "顺丰优选", "百果园", "易果生鲜"},
"bijia": {"美团", "拼多多", "折800", "返利网", "卷皮折扣", "淘粉吧", "聚划算", "一淘", "网购联盟", "返利淘联盟", "什么值得买", "花生日记"},
"travel": {"携程旅行", "去哪儿旅行", "同程旅游", "途牛旅游", "飞猪", "马蜂窝旅游", "艺龙旅行", "驴妈妈旅游",
"TripAdvisor猫途鹰", "美团旅行"},
"airplane": {"航班管家", "飞常准", "航旅纵横", "春秋航空", "南方航空", "中国国航", "东方航空", "海南航空", "深圳航空", "四川航空"},
"love": {"百合婚恋", "世纪佳缘", "珍爱网", "牵手婚恋", "探探", "热恋", "有缘网", "约会吧", "约爱", "快约爱"},
"stock": {"同花顺炒股票", "大智慧", "涨乐财富通", "腾讯自选股", "广发证券易淘金", "金太阳", "国泰君安君弘", "海通e海通财", "平安证券", "同花顺"},
"car": {"平安好车主", "途虎养车", "车主无忧", "汽车超人", "车e族", "汽修宝", "车点点", "汽车大师", "乐车邦", "车享家"},
"child": {"小伴龙", "儿歌多多", "宝宝巴士奇妙屋", "智慧树", "贝瓦儿歌", "儿歌点点", "宝贝听听", "宝宝小厨房", "宝宝游乐园", "叽里呱啦"},
"homework": {"作业帮", "小猿搜题", "一起作业学生端", "学霸君", "互动作业", "猿题库", "纳米盒", "阿凡题", "洋葱数学"},
"work": {"钉钉", "企业微信", "移动彩云", "云之家", "今目标", "口袋助理", "推事本", "奇鱼微办公", "工作圈", "明道"},
"home": {"最美装修", "齐家网", "土巴兔装修", "装修头条", "装修管家", "窝牛装修", "丽芙家居", "酷家乐装修", "惠装装修", "房天下装修"},
"job": {"智联招聘", "前程无忧", "斗米", "拉勾", "Boss直聘", "猎聘同道", "智联招聘"}
}
df["app_list"] = df["app_list"].apply(json_format)
for i in category.keys():
df[i] = df["app_list"].apply(lambda x: 1 if len(x & category[i]) > 0 else 0)
print(i)
print(df[i].unique())
df = df.drop("app_list",axis=1)
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@10.66.157.22:4000/jerry_test?charset=utf8')
print(df.shape)
n = 200000
for i in range(0,df.shape[0],n):
print(i)
if i == 0:
temp = df.loc[0:n]
elif i+n > df.shape[0]:
temp = df.loc[i+1:]
else:
temp = df.loc[i+1:i+n]
pd.io.sql.to_sql(temp, "app_list_sort", yconnect, schema='jerry_test', if_exists='append', index=False)
print("insert done")
if __name__ == "__main__":
sort_app()
#! -*- coding: utf8 -*-
#coding=utf-8
import pymysql
import pandas as pd
......@@ -146,14 +146,14 @@ def get_data():
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.stat_date,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time " \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join cid_type_top c on e.device_id = c.device_id left join cid_time on e.cid_id = cid_time.cid_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
print(df.shape)
df = df.rename(columns={0: "y", 1: "z", 2: "stat_date", 3: "ucity_id",4: "clevel1_id", 5: "ccity_name",
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time"})
6:"device_type",7:"manufacturer",8:"channel",9:"top",10:"time",11:"device_id"})
print("esmm data ok")
print(df.head(2))
......@@ -161,8 +161,8 @@ def get_data():
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["top"] = df["top"].astype("str")
df["y"] = df["stat_date"].str.cat([df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date"], axis=1).fillna(0.0)
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","stat_date","device_id"], axis=1).fillna(0.0)
print(df.head(2))
features = 0
for i in ["ucity_id","clevel1_id","ccity_name","device_type","manufacturer","channel"]:
......@@ -171,7 +171,10 @@ def get_data():
print("features:{}".format(features))
ccity_name = list(set(df["ccity_name"].values.tolist()))
ucity_id = list(set(df["ucity_id"].values.tolist()))
return df,validate_date,ucity_id,ccity_name
manufacturer = list(set(df["manufacturer"].values.tolist()))
channel = list(set(df["channel"].values.tolist()))
return df,validate_date,ucity_id,ccity_name,manufacturer,channel
def transform(a,validate_date):
......@@ -179,10 +182,13 @@ def transform(a,validate_date):
df = model.fit_transform(a, y="y", n=160000, processes=22)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["y"] = df[0].apply(lambda x: x.split(",")[2])
df["z"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[1:]))
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[2:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0,"seq"], axis=1)
print(df.head(2))
......@@ -199,7 +205,7 @@ def transform(a,validate_date):
return model
def get_predict_set(ucity_id,model,ccity_name):
def get_predict_set(ucity_id,model,ccity_name,manufacturer,channel):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.y,e.z,e.label,e.ucity_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel,c.top,cid_time.time,e.device_id,e.cid_id " \
......@@ -209,6 +215,7 @@ def get_predict_set(ucity_id,model,ccity_name):
df = df.rename(columns={0: "y", 1: "z", 2: "label", 3: "ucity_id", 4: "clevel1_id", 5: "ccity_name",
6: "device_type", 7: "manufacturer", 8: "channel", 9: "top", 10: "time",
11:"device_id",12:"cid_id"})
print("before filter:")
print(df.shape)
df = df[df["ucity_id"].isin(ucity_id)]
......@@ -216,6 +223,8 @@ def get_predict_set(ucity_id,model,ccity_name):
print(df.shape)
df = df[df["ccity_name"].isin(ccity_name)]
print("after ccity_name filter:")
df = df[df["manufacturer"].isin(manufacturer)]
df = df[df["channel"].isin(channel)]
print(df.shape)
df["cid_id"] = df["cid_id"].astype("str")
df["clevel1_id"] = df["clevel1_id"].astype("str")
......@@ -259,10 +268,9 @@ def get_predict_set(ucity_id,model,ccity_name):
if __name__ == "__main__":
path = "/home/gmuser/ffm/"
a = time.time()
df, validate_date, ucity_id,ccity_name = get_data()
df, validate_date, ucity_id,ccity_name,manufacturer,channel = get_data()
model = transform(df, validate_date)
get_predict_set(ucity_id,model,ccity_name)
get_predict_set(ucity_id,model,ccity_name,manufacturer,channel)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment