Commit 13f4ccb4 authored by 张彦钊's avatar 张彦钊

Merge branch 'master' of git.wanmeizhensuo.com:ML/ffm-baseline

change path
parents 1d4172ba 53c0bf7d
#coding=utf-8
import pymysql
import pandas as pd
from multiprocessing import Pool
import numpy as np
import datetime
import time
from sqlalchemy import create_engine
def con_sql(db,sql):
cursor = db.cursor()
try:
cursor.execute(sql)
result = cursor.fetchall()
df = pd.DataFrame(list(result)).dropna()
except Exception:
print("发生异常", Exception)
df = pd.DataFrame()
finally:
db.close()
return df
# def test():
# sql = "select max(update_time) from ffm_diary_queue"
# db = pymysql.connect(host='192.168.15.12', port=4000, user='root', db='eagle')
# cursor = db.cursor()
# cursor.execute(sql)
# result = cursor.fetchone()[0]
# db.close()
# print(result)
class multiFFMFormatPandas:
def __init__(self):
self.field_index_ = None
self.feature_index_ = None
self.y = None
def fit(self, df, y=None):
self.y = y
df_ffm = df[df.columns.difference([self.y])]
if self.field_index_ is None:
self.field_index_ = {col: i for i, col in enumerate(df_ffm)}
if self.feature_index_ is not None:
last_idx = max(list(self.feature_index_.values()))
if self.feature_index_ is None:
self.feature_index_ = dict()
for col in df.columns:
self.feature_index_[col] = 1
last_idx = 1
vals = df[col].unique()
for val in vals:
if pd.isnull(val):
continue
name = '{}_{}'.format(col, val)
if name not in self.feature_index_:
self.feature_index_[name] = last_idx
last_idx += 1
return self
def fit_transform(self, df, y=None,n=50000,processes=4):
# n是每个线程运行最大的数据条数,processes是线程数
self.fit(df, y)
n = n
processes = processes
return self.transform(df,n,processes)
def transform_row_(self, row, t):
ffm = []
for col, val in row.loc[row.index != self.y].to_dict().items():
col_type = t[col]
name = '{}_{}'.format(col, val)
if col_type.kind == 'O':
ffm.append('{}:{}:1'.format(self.field_index_[col]+1, self.feature_index_[name]))
elif col_type.kind != 'O':
ffm.append('{}:{}:{}'.format(self.field_index_[col]+1, self.feature_index_[col], val))
result = ' '.join(ffm)
if self.y is not None:
result = str(row.loc[row.index == self.y][0]) + "," + result
if self.y is None:
result = str(0) + "," + result
return result
def transform(self, df,n=1500,processes=2):
# n是每个线程运行最大的数据条数,processes是线程数
t = df.dtypes.to_dict()
data_list = self.data_split_line(df,n)
# 设置进程的数量
pool = Pool(processes)
print("总进度: " + str(len(data_list)))
for i in range(len(data_list)):
data_list[i] = pool.apply_async(self.pool_function, (data_list[i], t,))
result_map = {}
for i in data_list:
result_map.update(i.get())
pool.close()
pool.join()
return pd.Series(result_map)
# 多进程计算方法
def pool_function(self, df, t):
return {idx: self.transform_row_(row, t) for idx, row in df.iterrows()}
# 切分数据方法,传人dataframe和切分条数的步长,返回dataframe的集合,每个dataframe中含有若干条数据
def data_split_line(self, data, step):
data_list = []
x = 0
while True:
if x + step < data.__len__():
data_list.append(data.loc[x:x + step])
x = x + step + 1
else:
data_list.append(data.loc[x:data.__len__()])
break
return data_list
# 原生转化方法,不需要多进程
def native_transform(self, df):
t = df.dtypes.to_dict()
return pd.Series({idx: self.transform_row_(row, t) for idx, row in df.iterrows()})
# 下面这个方法不是这个类原有的方法,是新增的。目的是用来判断这个用户是不是在训练数据集中存在
def is_feature_index_exist(self, name):
if name in self.feature_index_:
return True
else:
return False
def get_data():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select max(stat_date) from esmm_train_data"
validate_date = con_sql(db, sql)[0].values.tolist()[0]
print("validate_date:" + validate_date)
temp = datetime.datetime.strptime(validate_date, "%Y-%m-%d")
start = (temp - datetime.timedelta(days=15)).strftime("%Y-%m-%d")
print(start)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.device_id,e.y,e.z,e.stat_date,e.ucity_id,e.cid_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel," \
"home.jingxuan,home.zhibo,home.nose,home.eyes,home.weizheng,home.teeth,home.lunkuo," \
"home.meifu,home.xizhi,home.zhifang,home.longxiong,home.simi,home.maofa,home.gongli,home.korea " \
"from esmm_train_data e left join user_feature u on e.device_id = u.device_id " \
"left join home_tab_click home on e.device_id = home.device_id " \
"where e.stat_date >= '{}'".format(start)
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name"})
print("esmm data ok")
print(df.head(2))
ucity_id = list(set(df["ucity_id"].values.tolist()))
cid = list(set(df["cid_id"].values.tolist()))
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["y"] = df["stat_date"].str.cat([df["device_id"].values.tolist(),df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(),df["z"].values.tolist()], sep=",")
df = df.drop(["z","device_id"], axis=1).fillna(0.0)
print(df.head(2))
print("fields:{}".format(df.shape[1]-1))
print("features:{}".format(len(cid)))
return df,validate_date,ucity_id,cid
def transform(a,validate_date):
model = multiFFMFormatPandas()
df = model.fit_transform(a, y="y", n=160000, processes=26)
df = pd.DataFrame(df)
df["stat_date"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0,"seq"], axis=1)
print(df.head(2))
train = df[df["stat_date"] != validate_date]
train = train.drop("stat_date",axis=1)
test = df[df["stat_date"] == validate_date]
test = test.drop("stat_date",axis=1)
print("train shape")
print(train.shape)
train.to_csv(path + "tr.csv", sep="\t", index=False)
test.to_csv(path + "va.csv", sep="\t", index=False)
return model
def get_predict_set(ucity_id, cid,model):
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select e.device_id,e.y,e.z,e.stat_date,e.ucity_id,e.cid_id,e.clevel1_id,e.ccity_name," \
"u.device_type,u.manufacturer,u.channel," \
"home.jingxuan,home.zhibo,home.nose,home.eyes,home.weizheng,home.teeth,home.lunkuo," \
"home.meifu,home.xizhi,home.zhifang,home.longxiong,home.simi,home.maofa,home.gongli,home.korea,e.label " \
"from esmm_pre_data e left join user_feature u on e.device_id = u.device_id " \
"left join home_tab_click home on e.device_id = home.device_id"
df = con_sql(db, sql)
df = df.rename(columns={0: "device_id", 1: "y", 2: "z", 3: "stat_date", 4: "ucity_id", 5: "cid_id",
6: "clevel1_id", 7: "ccity_name",26:"label"})
print("before filter:")
print(df.shape)
df = df[df["cid_id"].isin(cid)]
print("after cid filter:")
print(df.shape)
df = df[df["ucity_id"].isin(ucity_id)]
print("after ucity filter:")
print(df.shape)
df["clevel1_id"] = df["clevel1_id"].astype("str")
df["cid_id"] = df["cid_id"].astype("str")
df["y"] = df["y"].astype("str")
df["z"] = df["z"].astype("str")
df["label"] = df["label"].astype("str")
df["y"] = df["label"].str.cat(
[df["device_id"].values.tolist(), df["ucity_id"].values.tolist(), df["cid_id"].values.tolist(),
df["y"].values.tolist(), df["z"].values.tolist()], sep=",")
df = df.drop(["z","label","device_id"], axis=1).fillna(0.0)
print(df.head(2))
df = model.transform(df,n=160000, processes=22)
df = pd.DataFrame(df)
df["label"] = df[0].apply(lambda x: x.split(",")[0])
df["device_id"] = df[0].apply(lambda x: x.split(",")[1])
df["city_id"] = df[0].apply(lambda x: x.split(",")[2])
df["cid"] = df[0].apply(lambda x: x.split(",")[3])
df["number"] = np.random.randint(1, 2147483647, df.shape[0])
df["seq"] = list(range(df.shape[0]))
df["seq"] = df["seq"].astype("str")
df["data"] = df[0].apply(lambda x: ",".join(x.split(",")[4:]))
df["data"] = df["seq"].str.cat(df["data"], sep=",")
df = df.drop([0, "seq"], axis=1)
print(df.head())
native_pre = df[df["label"] == "0"]
native_pre = native_pre.drop("label", axis=1)
native_pre.to_csv(path+"native.csv",sep="\t",index=False)
# print("native_pre shape")
# print(native_pre.shape)
nearby_pre = df[df["label"] == "1"]
nearby_pre = nearby_pre.drop("label", axis=1)
nearby_pre.to_csv(path + "nearby.csv", sep="\t", index=False)
# print("nearby_pre shape")
# print(nearby_pre.shape)
if __name__ == "__main__":
path = "/home/gaoyazhe/data/"
a = time.time()
df, validate_date, ucity_id, cid = get_data()
model = transform(df, validate_date)
get_predict_set(ucity_id, cid,model)
b = time.time()
print("cost(分钟)")
print((b-a)/60)
\ No newline at end of file
#!/usr/bin/env python
#coding=utf-8
from __future__ import absolute_import
......@@ -26,10 +25,10 @@ tf.app.flags.DEFINE_integer("threads", 16, "threads num")
#User_Fileds = set(['101','109_14','110_14','127_14','150_14','121','122','124','125','126','127','128','129'])
#Ad_Fileds = set(['205','206','207','210','216'])
#Context_Fileds = set(['508','509','702','853','301'])
#Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11','12':'12','13':'13','14':'14','15':'15','16':'16','17':'17','18':'18','19':'19','20':'20','21':'21','22':'22','23':'23','24':'24','25':'25','26':'26','27':'27','28':'28','29':'29','30':'30'}
Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11'}
UMH_Fileds = {'109_14':('u_cat','12'),'110_14':('u_shop','13'),'127_14':('u_brand','14'),'150_14':('u_int','15')} #user multi-hot feature
Ad_Fileds = {'206':('a_cat','16'),'207':('a_shop','17'),'210':('a_int','18'),'216':('a_brand','19')} #ad feature for DIN
Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11','12':'12','13':'13','14':'14','15':'15','16':'16','17':'17','18':'18','19':'19','20':'20','21':'21','22':'22','23':'23'}
#Common_Fileds = {'1':'1','2':'2','3':'3','4':'4','5':'5','6':'6','7':'7','8':'8','9':'9','10':'10','11':'11'}
UMH_Fileds = {'109_14':('u_cat','12'),'110_14':('u_shop','13'),'127_14':('u_brand','14'),'150_14':('u_int','15')} #user multi-hot feature
Ad_Fileds = {'206':('a_cat','16'),'207':('a_shop','17'),'210':('a_int','18'),'216':('a_brand','19')} #ad feature for DIN
#40362692,0,0,216:9342395:1.0 301:9351665:1.0 205:7702673:1.0 206:8317829:1.0 207:8967741:1.0 508:9356012:2.30259 210:9059239:1.0 210:9042796:1.0 210:9076972:1.0 210:9103884:1.0 210:9063064:1.0 127_14:3529789:2.3979 127_14:3806412:2.70805
def gen_tfrecords(in_file):
......
#!/usr/bin/env python
#coding=utf-8
#from __future__ import absolute_import
......@@ -346,7 +345,7 @@ def main(_):
print("-"*100)
with open(FLAGS.data_dir + "/pred.txt", "w") as fo:
for prob in preds:
fo.write("%f\t%f\n" % (prob['pctr'], prob['pcvr']))
fo.write("%f\t%f\t%f\n" % (prob['pctr'], prob['pcvr'], prob['pctcvr']))
elif FLAGS.task_type == 'export':
print("Not Implemented, Do It Yourself!")
#feature_spec = tf.feature_column.make_parse_example_spec(feature_columns)
......
# -*- coding: utf-8 -*-
#coding=utf-8
import smtplib
from email.mime.text import MIMEText
......
#coding=utf-8
from sqlalchemy import create_engine
import pandas as pd
import pymysql
......@@ -17,39 +19,30 @@ def con_sql(sql):
return result
def set_join(lst):
return ','.join(set(lst))
return ','.join([str(i) for i in set(lst)])
def main():
sql = "select device_id,city_id,cid from esmm_data2ffm_infer_native"
result = con_sql(sql)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
df1 = pd.read_csv("/home/gaoyazhe/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
# native queue
df2 = pd.read_csv('/home/gaoyazhe/data/native.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2['cid_id'] = df2['cid_id'].astype('object')
df1 = pd.read_csv("/home/gaoyazhe/data/native/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"],df2["cvr"],df2["ctcvr"] = df1["ctr"],df1["cvr"],df1["ctcvr"]
df3 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
ctime = int(time.time())
df3["time"] = ctime
df3.columns = ["device_id","city_id","native_queue","time"]
print("native_device_count",df3.shape)
sql_nearby = "select device_id,city_id,cid from esmm_data2ffm_infer_nearby"
result = con_sql(sql_nearby)
dct = {"uid":[],"city":[],"cid_id":[]}
for i in result:
dct["uid"].append(i[0])
dct["city"].append(i[1])
dct["cid_id"].append(i[2])
# nearby queue
df2 = pd.read_csv('/home/gaoyazhe/data/nearby.csv',usecols=[0,1,2],header=0,names=['uid','city','cid_id'],sep='\t')
df2['cid_id'] = df2['cid_id'].astype('object')
df1 = pd.read_csv("/home/gaoyazhe/data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr"])
df2 = pd.DataFrame(dct)
df2["ctr"],df2["cvr"] = df1["ctr"],df1["cvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="cvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df1 = pd.read_csv("/home/gaoyazhe/data/nearby/pred.txt",sep='\t',header=None,names=["ctr","cvr","ctcvr"])
df2["ctr"], df2["cvr"], df2["ctcvr"] = df1["ctr"], df1["cvr"], df1["ctcvr"]
df4 = df2.groupby(by=["uid","city"]).apply(lambda x: x.sort_values(by="ctcvr",ascending=False)).reset_index(drop=True).groupby(by=["uid","city"]).agg({'cid_id':set_join}).reset_index(drop=False)
df4.columns = ["device_id","city_id","nearby_queue"]
print("nearby_device_count",df4.shape)
......
......@@ -15,11 +15,8 @@ rm ${DATA_PATH}/va/*
rm ${DATA_PATH}/native/*
rm ${DATA_PATH}/nearby/*
echo "mysql to csv"
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_train" > ${DATA_PATH}/tr.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_cv" > ${DATA_PATH}/va.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_native" > ${DATA_PATH}/native.csv
mysql -u root -p3SYz54LS9#^9sBvC -h 10.66.157.22 -P 4000 -D jerry_test -e "select number,data from esmm_data2ffm_infer_nearby" > ${DATA_PATH}/nearby.csv
echo "data2ffm"
${PYTHON_PATH} ${MODEL_PATH}/Feature_pipline/data2ffm.py > ${DATA_PATH}/infer.log
echo "split data"
split -l $((`wc -l < ${DATA_PATH}/tr.csv`/15)) ${DATA_PATH}/tr.csv -d -a 4 ${DATA_PATH}/tr/tr_ --additional-suffix=.csv
......@@ -50,7 +47,7 @@ currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
echo "train..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir="${DATA_PATH}" --task_type="train"
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=23 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH} --task_type=train
echo "train time"
current=$(date "+%Y-%m-%d %H:%M:%S")
......@@ -59,11 +56,11 @@ currentTimeStamp=$((timeStamp*1000+`date "+%N"`/1000000))
echo $current
echo "infer native..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir="${DATA_PATH}/native" --task_type="infer" > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/native --task_type=infer > ${DATA_PATH}/infer.log
echo "infer nearby..."
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir="${DATA_PATH}/nearby" --task_type="infer" > ${DATA_PATH}/infer.log
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/DeepCvrMTL.py --ctr_task_wgt=0.3 --learning_rate=0.0001 --deep_layers=256,128 --dropout=0.8,0.5 --optimizer=Adam --num_epochs=1 --embedding_size=16 --batch_size=1024 --field_size=11 --feature_size=354332 --l2_reg=0.005 --log_steps=100 --num_threads=36 --model_dir=${DATA_PATH}/model_ckpt/DeepCvrMTL/ --data_dir=${DATA_PATH}/nearby --task_type=infer > ${DATA_PATH}/infer.log
echo "sort and 2sql"
${PYTHON_PATH} ${MODEL_PATH}/Model_pipline/sort_and_2sql.py
......
......@@ -69,13 +69,24 @@ object EsmmData {
if (max_stat_date_str != param.date){
val stat_date = param.date
println(stat_date)
// val imp_data = sc.sql(
// s"""
// |select distinct stat_date,device_id,city_id as ucity_id,
// | cid_id,diary_service_id
// |from data_feed_exposure
// |where cid_type = 'diary'
// |and stat_date ='${stat_date}'
// """.stripMargin
// )
val imp_data = sc.sql(
s"""
|select distinct stat_date,device_id,city_id as ucity_id,
| cid_id,diary_service_id
|select * from
|(select stat_date,device_id,city_id as ucity_id,cid_id,diary_service_id
|from data_feed_exposure
|where cid_type = 'diary'
|and stat_date ='${stat_date}'
|group by stat_date,device_id,city_id,cid_id,diary_service_id having count(*) > 1) a
""".stripMargin
)
// imp_data.show()
......@@ -200,7 +211,7 @@ object EsmmData {
)
// union_data_scity_id.createOrReplaceTempView("union_data_scity_id")
union_data_scity_id.show()
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_data",SaveMode.Append)
GmeiConfig.writeToJDBCTable("jdbc:mysql://10.66.157.22:4000/jerry_test?user=root&password=3SYz54LS9#^9sBvC&rewriteBatchedStatements=true",union_data_scity_id, table="esmm_train_test",SaveMode.Append)
} else {
println("esmm_train_data already have param.date data")
......
......@@ -76,23 +76,23 @@ object temp_analysis {
agency_id.createOrReplaceTempView("agency_id")
//每日新用户
val device_id_newUser = sc.sql(
s"""
|select distinct(device_id) as device_id
|from online.ml_device_day_active_status
|where active_type != '4'
|and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
| ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
| ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
| ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
| ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
| ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
| ,'promotion_shike','promotion_julang_jl03')
|and partition_date ='${partition_date}'
""".stripMargin
)
device_id_newUser.createOrReplaceTempView("device_id_new")
// //每日新用户
// val device_id_newUser = sc.sql(
// s"""
// |select distinct(device_id) as device_id
// |from online.ml_device_day_active_status
// |where active_type != '4'
// |and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
// | ,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
// | ,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
// | ,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
// | ,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
// | ,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
// | ,'promotion_shike','promotion_julang_jl03')
// |and partition_date ='${partition_date}'
// """.stripMargin
// )
// device_id_newUser.createOrReplaceTempView("device_id_new")
val blacklist_id = sc.sql(
s"""
......@@ -108,16 +108,34 @@ object temp_analysis {
|from agency_id
|UNION ALL
|select device_id
|from device_id_new
|UNION ALL
|select device_id
|from blacklist_id
""".stripMargin
)
final_id.createOrReplaceTempView("final_id")
val diary_clk_all = sc.sql(
s"""
|select ov.partition_date,count(ov.cl_id) as clk_num,count(distinct(ov.cl_id)),count(ov.cl_id)/count(distinct(ov.cl_id))
|from online.tl_hdfs_maidian_view ov left join final_id
|on ov.cl_id = final_id.device_id
|where ov.action = "page_view"
|and params['page_name']="diary_detail"
|and ov.cl_id != "NULL"
|and ov.partition_date >='20181201'
|and final_id.device_id is null
|group by ov.partition_date
|order by ov.partition_date
""".stripMargin
)
diary_clk_all.show(80)
//日记本点击
val referrer=List("all_case_service_comment","all_cases","diary_detail","diary_list","diary_listof_related_service",
val referrer=List("about_me_message_list","all_case_service_comment","all_cases","diary_detail","diary_list"
,"diary_listof_related_service","answer_detail","community_home","conversation_detail","create_diary_title","diary_listof_related_service",
"doctor_all_cases","hospital_all_cases","my_favor","my_order","order_detail","personal_store_diary_list","received_votes",
"topic_detail","welfare_detail","welfare_list","welfare_special","wiki_detail","zone_detail",
"expert_detail","free_activity_detail","home","message_home","my_diary","organization_detail","other_homepage","question_detail",
"search_result_diary","search_result_more","welfare_detail","zone_v3")
for( a <- referrer ){
......@@ -130,7 +148,7 @@ object temp_analysis {
|and params['page_name']="diary_detail"
|and params['referrer']='${a}'
|and ov.cl_id != "NULL"
|and ov.partition_date >='20181101'
|and ov.partition_date >='20181201'
|and final_id.device_id is null
|group by ov.partition_date
|order by ov.partition_date
......@@ -141,6 +159,8 @@ object temp_analysis {
}
//5.登录人数
val log_device_temp = sc.sql(
s"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment