Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
S
serviceRec
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
郭羽
serviceRec
Commits
e776b8b5
Commit
e776b8b5
authored
3 years ago
by
郭羽
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
service model 优化
parent
190731d8
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
861 additions
and
0 deletions
+861
-0
featureEng2.py
spark/featureEng2.py
+861
-0
No files found.
spark/featureEng2.py
0 → 100644
View file @
e776b8b5
import
sys
import
os
from
datetime
import
date
,
timedelta
from
elasticsearch
import
Elasticsearch
from
elasticsearch.helpers
import
scan
import
time
import
redis
from
pyspark
import
SparkContext
,
SparkConf
from
pyspark.sql
import
SparkSession
import
pyspark.sql
as
sql
from
pyspark.sql.functions
import
when
,
col
from
pyspark.sql.types
import
*
from
pyspark.sql
import
functions
as
F
from
pyspark.ml
import
Pipeline
from
pyspark.ml.feature
import
StringIndexer
,
QuantileDiscretizer
,
MinMaxScaler
from
collections
import
defaultdict
import
json
sys
.
path
.
append
(
os
.
path
.
dirname
(
os
.
path
.
abspath
(
os
.
path
.
dirname
(
__file__
))))
import
utils.configUtils
as
configUtils
# import utils.connUtils as connUtils
import
pandas
as
pd
# os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
"""
特征工程
"""
ITEM_MULTI_COLUMN_EXTRA_MAP
=
{
"first_demands"
:
1
,
"second_demands"
:
5
,
"first_solutions"
:
1
,
"second_solutions"
:
5
,
"first_positions"
:
1
,
"second_positions"
:
5
,
"tags_v3"
:
10
,
}
USER_MULTI_COLUMN_EXTRA_MAP
=
{
"first_demands"
:
1
,
"second_demands"
:
3
,
"first_solutions"
:
1
,
"second_solutions"
:
3
,
"first_positions"
:
1
,
"second_positions"
:
3
,
"tags_v3"
:
5
,
}
ITEM_NUMBER_COLUMNS
=
[
"lowest_price"
,
"smart_rank2"
,
"case_count"
,
"ordered_user_ids_count"
]
ITEM_CATE_COLUMNS
=
[
"service_type"
,
"merchant_id"
,
"doctor_type"
,
"doctor_id"
,
"doctor_famous"
,
"hospital_id"
,
"hospital_city_tag_id"
,
"hospital_type"
,
"hospital_is_high_quality"
]
NUMBER_PRECISION
=
2
VERSION
=
configUtils
.
SERVICE_VERSION
FEATURE_USER_KEY
=
"Strategy:rec:feature:service:"
+
VERSION
+
":user:"
FEATURE_ITEM_KEY
=
"Strategy:rec:feature:service:"
+
VERSION
+
":item:"
FEATURE_VOCAB_KEY
=
"Strategy:rec:vocab:service:"
+
VERSION
FEATURE_COLUMN_KEY
=
"Strategy:rec:column:service:"
+
VERSION
TRAIN_FILE_PATH
=
"service_feature_"
+
VERSION
ITEM_PREFIX
=
"item_"
def
getRedisConn
():
pool
=
redis
.
ConnectionPool
(
host
=
"172.16.50.145"
,
password
=
"XfkMCCdWDIU
%
ls$h"
,
port
=
6379
,
db
=
0
)
conn
=
redis
.
Redis
(
connection_pool
=
pool
)
# conn = redis.Redis(host="172.16.50.145", port=6379, password="XfkMCCdWDIU%ls$h",db=0)
# conn = redis.Redis(host="172.18.51.10", port=6379,db=0) #test
return
conn
def
addItemFeatures
(
samples
,
itemDF
,
dataVocab
,
multiVocab
):
itemDF
=
itemDF
.
withColumnRenamed
(
"id"
,
"itemid"
)
# 数据过滤:无医生
itemDF
=
itemDF
.
filter
(
col
(
"doctor_id"
)
!=
"-1"
)
# itemid
vocabList
=
collectColumnToVocab
(
itemDF
,
"itemid"
)
dataVocab
[
"itemid"
]
=
vocabList
# null处理
for
c
in
ITEM_NUMBER_COLUMNS
:
print
(
"null count:"
,
c
,
itemDF
.
filter
(
col
(
c
)
.
isNull
())
.
count
())
itemDF
=
itemDF
.
withColumn
(
ITEM_PREFIX
+
c
,
when
(
col
(
c
)
.
isNull
(),
0
)
.
otherwise
(
col
(
c
))
.
cast
(
"float"
))
.
drop
(
c
)
for
c
in
ITEM_CATE_COLUMNS
:
print
(
"null count:"
,
c
,
itemDF
.
filter
(
col
(
c
)
.
isNull
())
.
count
())
itemDF
=
itemDF
.
withColumn
(
ITEM_PREFIX
+
c
,
F
.
when
(
F
.
col
(
c
)
.
isNull
(),
"-1"
)
.
otherwise
(
F
.
col
(
c
)))
.
drop
(
c
)
# 字典添加
dataVocab
[
ITEM_PREFIX
+
c
]
=
collectColumnToVocab
(
itemDF
,
ITEM_PREFIX
+
c
)
# 离散特征处理
for
c
,
v
in
ITEM_MULTI_COLUMN_EXTRA_MAP
.
items
():
print
(
"null count:"
,
c
,
itemDF
.
filter
(
col
(
c
)
.
isNull
())
.
count
())
itemDF
=
itemDF
.
withColumn
(
c
,
F
.
when
(
F
.
col
(
c
)
.
isNull
(),
"-1"
)
.
otherwise
(
F
.
col
(
c
)))
multiVocab
[
c
]
=
collectMutiColumnToVocab
(
itemDF
,
c
)
for
i
in
range
(
1
,
v
+
1
):
new_c
=
ITEM_PREFIX
+
c
+
"__"
+
str
(
i
)
itemDF
=
itemDF
.
withColumn
(
new_c
,
F
.
split
(
F
.
col
(
c
),
","
)[
i
-
1
])
itemDF
=
itemDF
.
withColumn
(
new_c
,
F
.
when
(
F
.
col
(
new_c
)
.
isNull
(),
"-1"
)
.
otherwise
(
F
.
col
(
new_c
)))
dataVocab
[
new_c
]
=
multiVocab
[
c
]
samples
=
samples
.
join
(
itemDF
,
on
=
[
'itemid'
],
how
=
'inner'
)
# 统计特征处理
print
(
"统计特征处理..."
)
staticFeatures
=
samples
.
groupBy
(
'itemid'
)
.
agg
(
F
.
count
(
F
.
lit
(
1
))
.
alias
(
'itemRatingCount'
),
F
.
avg
(
F
.
col
(
'rating'
))
.
alias
(
'itemRatingAvg'
),
F
.
stddev
(
F
.
col
(
'rating'
))
.
alias
(
'itemRatingStddev'
))
.
fillna
(
0
)
\
.
withColumn
(
'itemRatingStddev'
,
F
.
format_number
(
F
.
col
(
'itemRatingStddev'
),
NUMBER_PRECISION
)
.
cast
(
"float"
))
\
.
withColumn
(
'itemRatingAvg'
,
F
.
format_number
(
F
.
col
(
'itemRatingAvg'
),
NUMBER_PRECISION
)
.
cast
(
"float"
))
# join item rating features
samples
=
samples
.
join
(
staticFeatures
,
on
=
[
'itemid'
],
how
=
'left'
)
print
(
"连续特征处理..."
)
# todo 分桶比较耗时,可以考虑做非线性转换
# 连续特征处理
pipelineStage
=
[]
# Normalization
# for c in ["itemRatingAvg","itemRatingStddev"]:
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c+"Scale"))
# bucketing
bucketColumns
=
[
ITEM_PREFIX
+
"case_count"
,
ITEM_PREFIX
+
"ordered_user_ids_count"
,
ITEM_PREFIX
+
"lowest_price"
,
"itemRatingCount"
,
"itemRatingStddev"
,
"itemRatingAvg"
]
for
c
in
bucketColumns
:
pipelineStage
.
append
(
QuantileDiscretizer
(
numBuckets
=
10
,
inputCol
=
c
,
outputCol
=
c
+
"Bucket"
))
featurePipeline
=
Pipeline
(
stages
=
pipelineStage
)
samples
=
featurePipeline
.
fit
(
samples
)
.
transform
(
samples
)
# 转string
for
c
in
bucketColumns
:
samples
=
samples
.
withColumn
(
c
+
"Bucket"
,
F
.
col
(
c
+
"Bucket"
)
.
cast
(
"string"
))
.
drop
(
c
)
dataVocab
[
c
+
"Bucket"
]
=
[
str
(
float
(
i
))
for
i
in
range
(
11
)]
samples
.
printSchema
()
# samples.show(5, truncate=False)
return
samples
def
extractTags
(
genres_list
):
genres_dict
=
defaultdict
(
int
)
for
genres
in
genres_list
:
for
genre
in
genres
.
split
(
','
):
genres_dict
[
genre
]
+=
1
sortedGenres
=
sorted
(
genres_dict
.
items
(),
key
=
lambda
x
:
x
[
1
],
reverse
=
True
)
return
[
x
[
0
]
for
x
in
sortedGenres
]
# sql版本不支持F.reverse
def
arrayReverse
(
arr
):
arr
.
reverse
()
return
arr
def
addUserFeatures
(
samples
,
dataVocab
,
multiVocab
):
dataVocab
[
"userid"
]
=
collectColumnToVocab
(
samples
,
"userid"
)
dataVocab
[
"user_os"
]
=
[
"ios"
,
"android"
,
"-1"
]
extractTagsUdf
=
F
.
udf
(
extractTags
,
ArrayType
(
StringType
()))
arrayReverseUdf
=
F
.
udf
(
arrayReverse
,
ArrayType
(
StringType
()))
samples
=
samples
.
withColumnRenamed
(
"cl_id"
,
"userid"
)
print
(
"user历史数据处理..."
)
# user历史记录
samples
=
samples
\
.
withColumn
(
'userPositiveHistory'
,
F
.
collect_list
(
when
(
F
.
col
(
'label'
)
==
1
,
F
.
col
(
'itemid'
))
.
otherwise
(
F
.
lit
(
None
)))
.
over
(
sql
.
Window
.
partitionBy
(
"userid"
)
.
orderBy
(
F
.
col
(
"timestamp"
))
.
rowsBetween
(
-
100
,
-
1
)))
\
.
withColumn
(
"userPositiveHistory"
,
arrayReverseUdf
(
F
.
col
(
"userPositiveHistory"
)))
for
i
in
range
(
1
,
11
):
samples
=
samples
.
withColumn
(
"userRatedHistory"
+
str
(
i
),
F
.
when
(
F
.
col
(
"userPositiveHistory"
)[
i
-
1
]
.
isNotNull
(),
F
.
col
(
"userPositiveHistory"
)[
i
-
1
])
.
otherwise
(
"-1"
))
dataVocab
[
"userRatedHistory"
+
str
(
i
)]
=
dataVocab
[
"itemid"
]
samples
=
samples
.
drop
(
"userPositiveHistory"
)
# user历史点击分值统计
print
(
"统计特征处理..."
)
samples
=
samples
\
.
withColumn
(
'userRatingCount'
,
F
.
count
(
F
.
lit
(
1
))
.
over
(
sql
.
Window
.
partitionBy
(
'userid'
)
.
orderBy
(
'timestamp'
)
.
rowsBetween
(
-
100
,
-
1
)))
\
.
withColumn
(
"userRatingAvg"
,
F
.
format_number
(
F
.
avg
(
F
.
col
(
"rating"
))
.
over
(
sql
.
Window
.
partitionBy
(
'userid'
)
.
orderBy
(
'timestamp'
)
.
rowsBetween
(
-
100
,
-
1
)),
NUMBER_PRECISION
)
.
cast
(
"float"
))
\
.
withColumn
(
"userRatingStddev"
,
F
.
format_number
(
F
.
stddev
(
F
.
col
(
"rating"
))
.
over
(
sql
.
Window
.
partitionBy
(
'userid'
)
.
orderBy
(
'timestamp'
)
.
rowsBetween
(
-
100
,
-
1
)),
NUMBER_PRECISION
)
.
cast
(
"float"
))
\
.
filter
(
F
.
col
(
"userRatingCount"
)
>
1
)
# user偏好
for
c
,
v
in
USER_MULTI_COLUMN_EXTRA_MAP
.
items
():
new_col
=
"user"
+
"__"
+
c
samples
=
samples
.
withColumn
(
new_col
,
extractTagsUdf
(
F
.
collect_list
(
when
(
F
.
col
(
'label'
)
==
1
,
F
.
col
(
c
))
.
otherwise
(
F
.
lit
(
None
)))
.
over
(
sql
.
Window
.
partitionBy
(
'userid'
)
.
orderBy
(
'timestamp'
)
.
rowsBetween
(
-
100
,
-
1
))))
for
i
in
range
(
1
,
v
+
1
):
samples
=
samples
.
withColumn
(
new_col
+
"__"
+
str
(
i
),
F
.
when
(
F
.
col
(
new_col
)[
i
-
1
]
.
isNotNull
(),
F
.
col
(
new_col
)[
i
-
1
])
.
otherwise
(
"-1"
))
dataVocab
[
new_col
+
"__"
+
str
(
i
)]
=
multiVocab
[
c
]
samples
=
samples
.
drop
(
new_col
)
# .drop(c).drop(new_col)
print
(
"连续特征处理..."
)
pipelineStage
=
[]
# Normalization
# for c in ["userRatingAvg", "userRatingStddev"]:
# pipelineStage.append(MinMaxScaler(inputCol=c, outputCol=c + "Scale"))
# bucketing
bucketColumns
=
[
"userRatingCount"
,
"userRatingAvg"
,
"userRatingStddev"
]
for
c
in
bucketColumns
:
pipelineStage
.
append
(
QuantileDiscretizer
(
numBuckets
=
10
,
inputCol
=
c
,
outputCol
=
c
+
"Bucket"
))
featurePipeline
=
Pipeline
(
stages
=
pipelineStage
)
samples
=
featurePipeline
.
fit
(
samples
)
.
transform
(
samples
)
# 转string
for
c
in
bucketColumns
:
samples
=
samples
.
withColumn
(
c
+
"Bucket"
,
F
.
col
(
c
+
"Bucket"
)
.
cast
(
"string"
))
.
drop
(
c
)
dataVocab
[
c
+
"Bucket"
]
=
[
str
(
float
(
i
))
for
i
in
range
(
11
)]
samples
.
printSchema
()
# samples.show(5,truncate=False)
return
samples
def
addSampleLabel
(
ratingSamples
):
ratingSamples
=
ratingSamples
.
withColumn
(
'label'
,
when
(
F
.
col
(
'rating'
)
>=
1
,
1
)
.
otherwise
(
0
))
ratingSamples
.
show
(
5
,
truncate
=
False
)
ratingSamples
.
printSchema
()
return
ratingSamples
def
samplesNegAndUnion
(
samplesPos
,
samplesNeg
):
# 正负样本 1:4
pos_count
=
samplesPos
.
count
()
neg_count
=
samplesNeg
.
count
()
print
(
"before filter posSize:{},negSize:{}"
.
format
(
str
(
pos_count
),
str
(
neg_count
)))
samplesNeg
=
samplesNeg
.
sample
(
pos_count
*
4
/
neg_count
)
samples
=
samplesNeg
.
union
(
samplesPos
)
dataSize
=
samples
.
count
()
print
(
"dataSize:{}"
.
format
(
str
(
dataSize
)))
return
samples
def
splitAndSaveTrainingTestSamplesByTimeStamp
(
samples
,
splitTimestamp
,
file_path
):
samples
=
samples
.
withColumn
(
"timestampLong"
,
F
.
col
(
"timestamp"
)
.
cast
(
LongType
()))
# quantile = smallSamples.stat.approxQuantile("timestampLong", [0.8], 0.05)
# splitTimestamp = quantile[0]
train
=
samples
.
where
(
F
.
col
(
"timestampLong"
)
<=
splitTimestamp
)
.
drop
(
"timestampLong"
)
test
=
samples
.
where
(
F
.
col
(
"timestampLong"
)
>
splitTimestamp
)
.
drop
(
"timestampLong"
)
print
(
"split train size:{},test size:{}"
.
format
(
str
(
train
.
count
()),
str
(
test
.
count
())))
trainingSavePath
=
file_path
+
'_train'
testSavePath
=
file_path
+
'_test'
train
.
write
.
option
(
"header"
,
"true"
)
.
option
(
"delimiter"
,
"|"
)
.
mode
(
'overwrite'
)
.
csv
(
trainingSavePath
)
test
.
write
.
option
(
"header"
,
"true"
)
.
option
(
"delimiter"
,
"|"
)
.
mode
(
'overwrite'
)
.
csv
(
testSavePath
)
def
collectColumnToVocab
(
samples
,
column
):
datas
=
samples
.
select
(
column
)
.
distinct
()
.
collect
()
vocabSet
=
set
()
for
d
in
datas
:
if
d
[
column
]:
vocabSet
.
add
(
str
(
d
[
column
]))
vocabSet
.
add
(
"-1"
)
# 空值的默认
return
list
(
vocabSet
)
def
collectMutiColumnToVocab
(
samples
,
column
):
datas
=
samples
.
select
(
column
)
.
distinct
()
.
collect
()
tagSet
=
set
()
for
d
in
datas
:
if
d
[
column
]:
for
tag
in
d
[
column
]
.
split
(
","
):
tagSet
.
add
(
tag
)
tagSet
.
add
(
"-1"
)
# 空值默认
return
list
(
tagSet
)
def
getDataVocab
(
samples
,
model_columns
):
dataVocab
=
{}
multiVocab
=
{}
# 多值特征
for
c
in
ITEM_MULTI_COLUMN_EXTRA_MAP
.
keys
():
print
(
c
)
multiVocab
[
c
]
=
collectMutiColumnToVocab
(
samples
,
c
)
samples
=
samples
.
drop
(
c
)
# id类特征 和 类别特征
for
c
in
[
"userid"
]:
print
(
c
)
dataVocab
[
c
]
=
collectColumnToVocab
(
samples
,
c
)
for
c
in
model_columns
:
# 判断是否以Bucket结尾
if
c
.
endswith
(
"Bucket"
):
datas
=
samples
.
select
(
c
)
.
distinct
()
.
collect
()
vocabSet
=
set
()
for
d
in
datas
:
if
d
[
c
]:
vocabSet
.
add
(
str
(
d
[
c
]))
vocabSet
.
add
(
"-1"
)
# 空值的默认
dataVocab
[
c
]
=
list
(
vocabSet
)
# elif c.count("userRatedHistory") > 0:
# dataVocab[c] = dataVocab["itemid"]
else
:
# 判断是否多值离散列
for
cc
,
v
in
multiVocab
.
items
():
if
c
.
count
(
cc
)
>
0
:
dataVocab
[
c
]
=
v
return
dataVocab
def
dataVocabToRedis
(
dataVocab
):
conn
=
getRedisConn
()
conn
.
set
(
FEATURE_VOCAB_KEY
,
dataVocab
)
conn
.
expire
(
FEATURE_VOCAB_KEY
,
60
*
60
*
24
*
7
)
def
featureColumnsToRedis
(
columns
):
conn
=
getRedisConn
()
conn
.
set
(
FEATURE_COLUMN_KEY
,
json
.
dumps
(
columns
))
conn
.
expire
(
FEATURE_COLUMN_KEY
,
60
*
60
*
24
*
7
)
def
featureToRedis
(
key
,
datas
):
conn
=
getRedisConn
()
for
k
,
v
in
datas
.
items
():
newKey
=
key
+
k
conn
.
set
(
newKey
,
v
)
conn
.
expire
(
newKey
,
60
*
60
*
24
*
7
)
def
collectFeaturesToDict
(
samples
,
columns
,
prefix
):
idCol
=
prefix
+
"id"
timestampCol
=
idCol
+
"_timestamp"
#根据timestamp获取每个user最新的记录
prefixSamples
=
samples
.
groupBy
(
idCol
)
.
agg
(
F
.
max
(
"timestamp"
)
.
alias
(
timestampCol
))
resDatas
=
samples
.
join
(
prefixSamples
,
on
=
[
idCol
],
how
=
'left'
)
.
where
(
F
.
col
(
"timestamp"
)
==
F
.
col
(
timestampCol
))
resDatas
=
resDatas
.
select
(
*
columns
)
.
distinct
()
.
collect
()
print
(
prefix
,
len
(
resDatas
))
return
{
d
[
idCol
]:
json
.
dumps
(
d
.
asDict
(),
ensure_ascii
=
False
)
for
d
in
resDatas
}
def
featuresToRedis
(
samples
,
columns
,
prefix
,
redisKey
):
idCol
=
prefix
+
"id"
timestampCol
=
idCol
+
"_timestamp"
def
toRedis
(
datas
):
conn
=
getRedisConn
()
for
d
in
datas
:
k
=
d
[
idCol
]
v
=
json
.
dumps
(
d
.
asDict
(),
ensure_ascii
=
False
)
newKey
=
redisKey
+
k
conn
.
set
(
newKey
,
v
)
conn
.
expire
(
newKey
,
60
*
60
*
24
*
7
)
#根据timestamp获取每个user最新的记录
prefixSamples
=
samples
.
groupBy
(
idCol
)
.
agg
(
F
.
max
(
"timestamp"
)
.
alias
(
timestampCol
))
resDatas
=
samples
.
join
(
prefixSamples
,
on
=
[
idCol
],
how
=
'left'
)
.
where
(
F
.
col
(
"timestamp"
)
==
F
.
col
(
timestampCol
))
resDatas
=
resDatas
.
select
(
*
columns
)
.
distinct
()
print
(
prefix
,
resDatas
.
count
())
resDatas
.
repartition
(
8
)
.
foreachPartition
(
toRedis
)
"""
数据加载
"""
CONTENT_TYPE
=
"service"
SERVICE_HOSTS
=
[
{
'host'
:
"172.16.52.33"
,
'port'
:
9200
},
{
'host'
:
"172.16.52.19"
,
'port'
:
9200
},
{
'host'
:
"172.16.52.48"
,
'port'
:
9200
},
{
'host'
:
"172.16.52.27"
,
'port'
:
9200
},
{
'host'
:
"172.16.52.34"
,
'port'
:
9200
}
]
ES_INDEX
=
"gm-dbmw-service-read"
ES_INDEX_TEST
=
"gm_test-service-read"
ACTION_REG
=
r"""^\\d+$"""
def
getEsConn_test
():
host_config
=
[{
'host'
:
'172.18.52.14'
,
'port'
:
9200
},
{
'host'
:
'172.18.52.133'
,
'port'
:
9200
},
{
'host'
:
'172.18.52.7'
,
'port'
:
9200
}]
return
Elasticsearch
(
host_config
,
http_auth
=
(
'elastic'
,
'gm_test'
),
timeout
=
3600
)
def
getEsConn
():
return
Elasticsearch
(
SERVICE_HOSTS
,
http_auth
=
(
'elastic'
,
'gengmei!@#'
),
timeout
=
3600
)
def
getClickSql
(
start
,
end
):
sql
=
"""
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.card_id,t1.time_stamp,t1.page_stay,t1.cl_type as os,t1.city_id as user_city_id
FROM
(
select partition_date,city_id,cl_id,business_id as card_id,time_stamp,page_stay,cl_type
from online.bl_hdfs_maidian_updates
where action = 'page_view'
AND partition_date>='{startDay}' and partition_date<='{endDay}'
AND page_name='welfare_detail'
-- AND page_stay>=1
AND cl_id is not null
AND cl_id != ''
AND business_id is not null
AND business_id != ''
group by partition_date,city_id,cl_id,business_id,time_stamp,page_stay,cl_type
) AS t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<='{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion
\
_jf
\
_
%
'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
"""
.
format
(
startDay
=
start
,
endDay
=
end
)
print
(
sql
)
return
sql
def
getExposureSql
(
start
,
end
):
sql
=
"""
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay,cl_type as os,t1.city_id as user_city_id
from
( --新首页卡片曝光
SELECT partition_date,city_id,cl_type,cl_id,card_id,max(time_stamp) as time_stamp
FROM online.ml_community_precise_exposure_detail
where partition_date>='{startDay}' and partition_date<='{endDay}'
and action in ('page_precise_exposure','home_choiceness_card_exposure')
and cl_id IS NOT NULL
and card_id IS NOT NULL
and is_exposure='1'
--and page_name='home'
--and tab_name='精选'
--and page_name in ('home','search_result_more')
and ((page_name='home' and tab_name='精选') or (page_name='category' and tab_name = '商品'))
and card_type in ('card','video')
and card_content_type in ('service')
and (get_json_object(exposure_card,'$.in_page_pos') is null or get_json_object(exposure_card,'$.in_page_pos') != 'seckill')
group by partition_date,city_id,cl_type,cl_id,card_id,app_session_id
) t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<='{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion
\
_jf
\
_
%
'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
"""
.
format
(
startDay
=
start
,
endDay
=
end
)
print
(
sql
)
return
sql
def
getClickSql2
(
start
,
end
):
sql
=
"""
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.business_id card_id,t1.time_stamp time_stamp,t1.page_stay as page_stay
FROM
(select partition_date,cl_id,business_id,action,page_name,page_stay,time_stamp,page_stay
from online.bl_hdfs_maidian_updates
where action = 'page_view'
AND partition_date BETWEEN '{}' AND '{}'
AND page_name='welfare_detail'
AND page_stay>=1
AND cl_id is not null
AND cl_id != ''
AND business_id is not null
AND business_id != ''
AND business_id rlike '{}'
) AS t1
JOIN
(select partition_date,active_type,first_channel_source_type,device_id
from online.ml_device_day_active_status
where partition_date BETWEEN '{}' AND '{}'
AND active_type IN ('1', '2', '4')
AND first_channel_source_type not IN ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
AND first_channel_source_type not LIKE 'promotion
\\
_jf
\\
_
%
') as t2
ON t1.cl_id = t2.device_id
AND t1.partition_date = t2.partition_date
LEFT JOIN
(
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY = regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)dev
on t1.cl_id=dev.device_id
WHERE dev.device_id is null
"""
.
format
(
start
,
end
,
ACTION_REG
,
start
,
end
)
print
(
sql
)
return
sql
def
getExposureSql2
(
start
,
end
):
sql
=
"""
SELECT DISTINCT t1.partition_date,t1.cl_id device_id,t1.card_id,t1.time_stamp, 0 as page_stay
FROM
(SELECT partition_date,cl_id,card_id,time_stamp
FROM online.ml_community_precise_exposure_detail
WHERE cl_id IS NOT NULL
AND card_id IS NOT NULL
AND card_id rlike '{}'
AND action='page_precise_exposure'
AND card_content_type = '{}'
AND is_exposure = 1 ) AS t1
LEFT JOIN online.ml_device_day_active_status AS t2 ON t1.cl_id = t2.device_id
AND t1.partition_date = t2.partition_date
LEFT JOIN
( SELECT DISTINCT device_id
FROM ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
WHERE PARTITION_DAY = regexp_replace(DATE_SUB(CURRENT_DATE,1),'-','')
AND is_abnormal_device = 'true' )dev
ON t1.cl_id=dev.device_id
WHERE dev.device_id IS NULL
AND t2.partition_date BETWEEN '{}' AND '{}'
AND t2.active_type IN ('1',
'2',
'4')
AND t2.first_channel_source_type NOT IN ('yqxiu1',
'yqxiu2',
'yqxiu3',
'yqxiu4',
'yqxiu5',
'mxyc1',
'mxyc2',
'mxyc3' ,
'wanpu',
'jinshan',
'jx',
'maimai',
'zhuoyi',
'huatian',
'suopingjingling',
'mocha',
'mizhe',
'meika',
'lamabang' ,
'js-az1',
'js-az2',
'js-az3',
'js-az4',
'js-az5',
'jfq-az1',
'jfq-az2',
'jfq-az3',
'jfq-az4',
'jfq-az5',
'toufang1' ,
'toufang2',
'toufang3',
'toufang4',
'toufang5',
'toufang6',
'TF-toufang1',
'TF-toufang2',
'TF-toufang3',
'TF-toufang4' ,
'TF-toufang5',
'tf-toufang1',
'tf-toufang2',
'tf-toufang3',
'tf-toufang4',
'tf-toufang5',
'benzhan',
'promotion_aso100' ,
'promotion_qianka',
'promotion_xiaoyu',
'promotion_dianru',
'promotion_malioaso',
'promotion_malioaso-shequ' ,
'promotion_shike',
'promotion_julang_jl03',
'promotion_zuimei')
AND t2.first_channel_source_type NOT LIKE 'promotion
\\
_jf
\\
_
%
'
"""
.
format
(
ACTION_REG
,
CONTENT_TYPE
,
start
,
end
)
print
(
sql
)
return
sql
def
connectDoris
(
spark
,
table
):
return
spark
.
read
\
.
format
(
"jdbc"
)
\
.
option
(
"driver"
,
"com.mysql.jdbc.Driver"
)
\
.
option
(
"url"
,
"jdbc:mysql://172.16.30.136:3306/doris_prod"
)
\
.
option
(
"dbtable"
,
table
)
\
.
option
(
"user"
,
"doris"
)
\
.
option
(
"password"
,
"o5gbA27hXHHm"
)
\
.
load
()
def
get_spark
(
appName
):
sparkConf
=
SparkConf
()
sparkConf
.
set
(
"spark.sql.crossJoin.enabled"
,
True
)
sparkConf
.
set
(
"spark.debug.maxToStringFields"
,
"100"
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_double_read"
,
False
)
sparkConf
.
set
(
"spark.tispark.plan.allow_index_read"
,
True
)
sparkConf
.
set
(
"spark.hive.mapred.supports.subdirectories"
,
True
)
sparkConf
.
set
(
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"
,
True
)
sparkConf
.
set
(
"spark.serializer"
,
"org.apache.spark.serializer.KryoSerializer"
)
sparkConf
.
set
(
"mapreduce.output.fileoutputformat.compress"
,
False
)
sparkConf
.
set
(
"mapreduce.map.output.compress"
,
False
)
spark
=
(
SparkSession
.
builder
.
config
(
conf
=
sparkConf
)
.
appName
(
appName
)
.
enableHiveSupport
()
.
getOrCreate
())
return
spark
def
init_es_query
():
q
=
{
"_source"
:
{
"includes"
:[]
},
"query"
:
{
"bool"
:
{
"must"
:
[],
"must_not"
:
[],
"should"
:
[]
}
}
}
return
q
def
parseSource
(
_source
):
id
=
str
(
_source
.
setdefault
(
"id"
,
-
1
))
case_count
=
_source
.
setdefault
(
"case_count"
,
0
)
sales_count
=
_source
.
setdefault
(
"sales_count"
,
0
)
service_type
=
str
(
_source
.
setdefault
(
"service_type"
,
-
1
))
second_demands
=
','
.
join
(
_source
.
setdefault
(
"second_demands"
,[
"-1"
]))
second_solutions
=
','
.
join
(
_source
.
setdefault
(
"second_solutions"
,[
"-1"
]))
second_positions
=
','
.
join
(
_source
.
setdefault
(
"second_positions"
,[
"-1"
]))
# sku
sku_list
=
_source
.
setdefault
(
"sku_list"
,[])
sku_tags_list
=
[]
sku_show_tags_list
=
[]
sku_price_list
=
[
0.0
]
for
sku
in
sku_list
:
sku_tags_list
+=
sku
.
setdefault
(
"sku_tags_id"
,[])
sku_show_tags_list
.
append
(
sku
.
setdefault
(
"show_project_type_name"
,
""
))
sku_price_list
.
append
(
sku
.
setdefault
(
"price"
,
0.0
))
sku_tags
=
","
.
join
([
str
(
i
)
for
i
in
sku_tags_list
])
if
len
(
sku_tags_list
)
>
0
else
"-1"
sku_show_tags
=
","
.
join
(
sku_show_tags_list
)
if
len
(
sku_show_tags_list
)
>
0
else
"-1"
sku_price
=
min
(
sku_price_list
)
#merchant_id
merchant_id
=
str
(
_source
.
setdefault
(
"merchant_id"
,
"-1"
))
# doctor_type id famous_doctor
doctor
=
_source
.
setdefault
(
"doctor"
,{})
doctor_type
=
str
(
doctor
.
setdefault
(
"doctor_type"
,
"-1"
))
doctor_id
=
str
(
doctor
.
setdefault
(
"id"
,
"-1"
))
doctor_famous
=
str
(
int
(
doctor
.
setdefault
(
"famous_doctor"
,
False
)))
# hospital id city_tag_id hospital_type is_high_quality
hospital
=
doctor
.
setdefault
(
"hospital"
,
{})
hospital_id
=
str
(
hospital
.
setdefault
(
"id"
,
"-1"
))
hospital_city_tag_id
=
str
(
hospital
.
setdefault
(
"city_tag_id"
,
-
1
))
hospital_type
=
str
(
hospital
.
setdefault
(
"hospital_type"
,
"-1"
))
hospital_is_high_quality
=
str
(
int
(
hospital
.
setdefault
(
"is_high_quality"
,
False
)))
data
=
[
id
,
case_count
,
sales_count
,
service_type
,
merchant_id
,
doctor_type
,
doctor_id
,
doctor_famous
,
hospital_id
,
hospital_city_tag_id
,
hospital_type
,
hospital_is_high_quality
,
second_demands
,
second_solutions
,
second_positions
,
sku_tags
,
sku_show_tags
,
sku_price
]
return
data
# es中获取特征
def
get_service_feature_df
():
es_columns
=
[
"id"
,
"sales_count"
,
"doctor"
,
"case_count"
,
"service_type"
,
"merchant_id"
,
"second_demands"
,
"second_solutions"
,
"second_positions"
,
"sku_list"
]
query
=
init_es_query
()
query
[
"_source"
][
"includes"
]
=
es_columns
print
(
json
.
dumps
(
query
),
flush
=
True
)
es_cli
=
getEsConn
()
scan_re
=
scan
(
client
=
es_cli
,
index
=
ES_INDEX
,
query
=
query
,
scroll
=
'3m'
)
datas
=
[]
for
res
in
scan_re
:
_source
=
res
[
'_source'
]
data
=
parseSource
(
_source
)
datas
.
append
(
data
)
print
(
"item size:"
,
len
(
datas
))
itemColumns
=
[
'id'
,
'case_count'
,
'sales_count'
,
'service_type'
,
'merchant_id'
,
'doctor_type'
,
'doctor_id'
,
'doctor_famous'
,
'hospital_id'
,
'hospital_city_tag_id'
,
'hospital_type'
,
'hospital_is_high_quality'
,
'second_demands'
,
'second_solutions'
,
'second_positions'
,
'sku_tags'
,
'sku_show_tags'
,
'sku_price'
]
df
=
pd
.
DataFrame
(
datas
,
columns
=
itemColumns
)
return
df
def
addDays
(
n
,
format
=
"
%
Y
%
m
%
d"
):
return
(
date
.
today
()
+
timedelta
(
days
=
n
))
.
strftime
(
format
)
if
__name__
==
'__main__'
:
start
=
time
.
time
()
#入参
trainDays
=
int
(
sys
.
argv
[
1
])
print
(
'trainDays:{}'
.
format
(
trainDays
),
flush
=
True
)
endDay
=
addDays
(
0
)
startDay
=
addDays
(
-
int
(
trainDays
))
print
(
"train_data start:{} end:{}"
.
format
(
startDay
,
endDay
))
conf
=
SparkConf
()
.
setAppName
(
'featureEngineering'
)
.
setMaster
(
'local'
)
spark
=
SparkSession
.
builder
.
config
(
conf
=
conf
)
.
getOrCreate
()
spark
.
sparkContext
.
setLogLevel
(
"ERROR"
)
# 行为数据
clickSql
=
getClickSql
(
startDay
,
endDay
)
expSql
=
getExposureSql
(
startDay
,
endDay
)
clickDF
=
spark
.
sql
(
clickSql
)
expDF
=
spark
.
sql
(
expSql
)
# ratingDF = samplesNegAndUnion(clickDF,expDF)
ratingDF
=
clickDF
.
union
(
expDF
)
ratingDF
=
ratingDF
.
withColumnRenamed
(
"time_stamp"
,
"timestamp"
)
\
.
withColumnRenamed
(
"device_id"
,
"userid"
)
\
.
withColumnRenamed
(
"card_id"
,
"itemid"
)
\
.
withColumnRenamed
(
"page_stay"
,
"rating"
)
\
.
withColumnRenamed
(
"os"
,
"user_os"
)
print
(
ratingDF
.
columns
)
print
(
ratingDF
.
show
(
10
,
truncate
=
False
))
print
(
"添加label..."
)
ratingSamplesWithLabel
=
addSampleLabel
(
ratingDF
)
df
=
ratingSamplesWithLabel
.
toPandas
()
df
=
pd
.
DataFrame
(
df
)
posCount
=
df
.
loc
[
df
[
"label"
]
==
0
][
"label"
]
.
count
()
negCount
=
df
.
loc
[
df
[
"label"
]
==
1
][
"label"
]
.
count
()
print
(
"pos size:"
+
str
(
posCount
),
"neg size:"
+
str
(
negCount
))
itemDF
=
get_service_feature_df
(
spark
)
print
(
itemDF
.
columns
)
print
(
itemDF
.
show
(
10
,
truncate
=
False
))
itemDF
.
to_csv
(
"/tmp/service_{}.csv"
.
format
(
endDay
))
df
.
to_csv
(
"/tmp/service_train_{}.csv"
.
format
(
endDay
))
# # 数据字典
# dataVocab = {}
# multiVocab = {}
#
# print("处理item特征...")
# timestmp1 = int(round(time.time()))
# samplesWithItemFeatures = addItemFeatures(ratingSamplesWithLabel, itemDF, dataVocab,multiVocab)
# timestmp2 = int(round(time.time()))
# print("处理item特征, 耗时s:{}".format(timestmp2 - timestmp1))
# print("multiVocab:")
# print(multiVocab.keys())
#
# print("处理user特征...")
# samplesWithUserFeatures = addUserFeatures(samplesWithItemFeatures,dataVocab,multiVocab)
# timestmp3 = int(round(time.time()))
# print("处理user特征, 耗时s:{}".format(timestmp3 - timestmp2))
#
# # user columns
# user_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("user")]
# print("collect feature for user:{}".format(str(user_columns)))
# # item columns
# item_columns = [c for c in samplesWithUserFeatures.columns if c.startswith("item")]
# print("collect feature for item:{}".format(str(item_columns)))
# # model columns
# print("model columns to redis...")
# model_columns = user_columns + item_columns
# featureColumnsToRedis(model_columns)
#
# print("数据字典save...")
# print("dataVocab:", str(dataVocab.keys()))
# vocab_path = "../vocab/{}_vocab.json".format(VERSION)
# dataVocabStr = json.dumps(dataVocab, ensure_ascii=False)
# open(configUtils.VOCAB_PATH, mode='w', encoding='utf-8').write(dataVocabStr)
#
# """特征数据存入redis======================================"""
# # user特征数据存入redis
# featuresToRedis(samplesWithUserFeatures, user_columns, "user", FEATURE_USER_KEY)
# timestmp5 = int(round(time.time()))
# print("user feature to redis 耗时s:{}".format(timestmp5 - timestmp3))
# # userDatas = collectFeaturesToDict(samplesWithUserFeatures, user_columns, "user")
# # featureToRedis(FEATURE_USER_KEY, userDatas)
# # itemDatas = collectFeaturesToDict(samplesWithUserFeatures, item_columns, "item")
# # featureToRedis(FEATURE_ITEM_KEY, itemDatas)
#
# # item特征数据存入redis
# # todo 添加最近一个月有行为的item,待优化:扩大item范围
# featuresToRedis(samplesWithUserFeatures, item_columns, "item", FEATURE_ITEM_KEY)
# timestmp6 = int(round(time.time()))
# print("item feature to redis 耗时s:{}".format(timestmp6 - timestmp5))
#
# """训练数据保存 ======================================"""
# timestmp3 = int(round(time.time()))
# train_columns = model_columns + ["label", "timestamp", "rating"]
# trainSamples = samplesWithUserFeatures.select(*train_columns)
# print("write to hdfs start...")
# splitTimestamp = int(time.mktime(time.strptime(addDays(0), "%Y%m%d")))
# splitAndSaveTrainingTestSamplesByTimeStamp(trainSamples, splitTimestamp, TRAIN_FILE_PATH)
# print("write to hdfs success...")
# timestmp4 = int(round(time.time()))
# print("数据写入hdfs 耗时s:{}".format(timestmp4 - timestmp3))
#
# print("总耗时m:{}".format((timestmp4 - start)/60))
#
# spark.stop()
\ No newline at end of file
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment