Commit a1d73464 authored by 郭羽's avatar 郭羽

service embedding

parent ad1dcd76
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.mllib.feature import Word2Vec
from pyspark.ml.linalg import Vectors
import random
from collections import defaultdict
import numpy as np
from pyspark.sql import functions as F
import sys
import time
from datetime import date, timedelta
import pandas as pd
def getClickSql(start, end):
sql = """
SELECT DISTINCT t1.partition_date, t1.cl_id device_id, t1.card_id,t1.time_stamp,t1.page_stay
FROM
(
select partition_date,cl_id,business_id as card_id,time_stamp,page_stay
from online.bl_hdfs_maidian_updates
where action = 'page_view'
AND partition_date>='{startDay}' and partition_date<='{endDay}'
AND page_name='welfare_detail'
-- AND page_stay>=1
AND cl_id is not null
AND cl_id != ''
AND business_id is not null
AND business_id != ''
group by partition_date,cl_id,business_id,time_stamp,page_stay
) AS t1
join
( --渠道,新老
SELECT distinct device_id
FROM online.ml_device_day_active_status
where partition_date>='{startDay}' and partition_date<'{endDay}'
AND active_type in ('1','2','4')
and first_channel_source_type not in ('yqxiu1','yqxiu2','yqxiu3','yqxiu4','yqxiu5','mxyc1','mxyc2','mxyc3'
,'wanpu','jinshan','jx','maimai','zhuoyi','huatian','suopingjingling','mocha','mizhe','meika','lamabang'
,'js-az1','js-az2','js-az3','js-az4','js-az5','jfq-az1','jfq-az2','jfq-az3','jfq-az4','jfq-az5','toufang1'
,'toufang2','toufang3','toufang4','toufang5','toufang6','TF-toufang1','TF-toufang2','TF-toufang3','TF-toufang4'
,'TF-toufang5','tf-toufang1','tf-toufang2','tf-toufang3','tf-toufang4','tf-toufang5','benzhan','promotion_aso100'
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei','','unknown')
AND first_channel_source_type not like 'promotion\_jf\_%'
) t2
on t1.cl_id = t2.device_id
LEFT JOIN
( --去除黑名单
select distinct device_id
from ML.ML_D_CT_DV_DEVICECLEAN_DIMEN_D
where PARTITION_DAY =regexp_replace(DATE_SUB(current_date,1) ,'-','')
AND is_abnormal_device = 'true'
)t3
on t3.device_id=t2.device_id
WHERE t3.device_id is null
""".format(startDay=start,endDay=end)
print(sql)
return sql
class UdfFunction:
@staticmethod
def sortF(movie_list, timestamp_list):
"""
sort by time and return the corresponding movie sequence
eg:
input: movie_list:[1,2,3]
timestamp_list:[1112486027,1212546032,1012486033]
return [3,1,2]
"""
pairs = []
for m, t in zip(movie_list, timestamp_list):
pairs.append((m, t))
# sort by time
pairs = sorted(pairs, key=lambda x: x[1])
return [x[0] for x in pairs]
def processItemSequence(spark, rawSampleDataPath):
# rating data
ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
# ratingSamples.show(5)
# ratingSamples.printSchema()
sortUdf = udf(UdfFunction.sortF, ArrayType(StringType()))
userSeq = ratingSamples \
.where(F.col("rating") >= 3.5) \
.groupBy("userId") \
.agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds')) \
.withColumn("movieIdStr", array_join(F.col("movieIds"), " "))
# userSeq.select("userId", "movieIdStr").show(10, truncate = False)
return userSeq.select('movieIdStr').rdd.map(lambda x: x[0].split(' '))
def embeddingLSH(spark, movieEmbMap):
movieEmbSeq = []
for key, embedding_list in movieEmbMap.items():
embedding_list = [np.float64(embedding) for embedding in embedding_list]
movieEmbSeq.append((key, Vectors.dense(embedding_list)))
movieEmbDF = spark.createDataFrame(movieEmbSeq).toDF("movieId", "emb")
bucketProjectionLSH = BucketedRandomProjectionLSH(inputCol="emb", outputCol="bucketId", bucketLength=0.1,
numHashTables=3)
bucketModel = bucketProjectionLSH.fit(movieEmbDF)
embBucketResult = bucketModel.transform(movieEmbDF)
print("movieId, emb, bucketId schema:")
embBucketResult.printSchema()
print("movieId, emb, bucketId data result:")
embBucketResult.show(10, truncate=False)
print("Approximately searching for 5 nearest neighbors of the sample embedding:")
sampleEmb = Vectors.dense(0.795, 0.583, 1.120, 0.850, 0.174, -0.839, -0.0633, 0.249, 0.673, -0.237)
bucketModel.approxNearestNeighbors(movieEmbDF, sampleEmb, 5).show(truncate=False)
def trainItem2vec(spark, samples, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
word2vec = Word2Vec().setVectorSize(embLength).setWindowSize(5).setNumIterations(10)
model = word2vec.fit(samples)
synonyms = model.findSynonyms("158", 20)
for synonym, cosineSimilarity in synonyms:
print(synonym, cosineSimilarity)
embOutputDir = '/'.join(embOutputPath.split('/')[:-1])
if not os.path.exists(embOutputDir):
os.makedirs(embOutputDir)
with open(embOutputPath, 'w') as f:
for movie_id in model.getVectors():
vectors = " ".join([str(emb) for emb in model.getVectors()[movie_id]])
f.write(movie_id + ":" + vectors + "\n")
embeddingLSH(spark, model.getVectors())
return model
def generate_pair(x):
# eg:
# watch sequence:['858', '50', '593', '457']
# return:[['858', '50'],['50', '593'],['593', '457']]
pairSeq = []
previousItem = ''
for item in x:
if not previousItem:
previousItem = item
else:
pairSeq.append((previousItem, item))
previousItem = item
return pairSeq
def generateTransitionMatrix(samples):
pairSamples = samples.flatMap(lambda x: generate_pair(x))
pairCountMap = pairSamples.countByValue()
pairTotalCount = 0
transitionCountMatrix = defaultdict(dict)
itemCountMap = defaultdict(int)
for key, cnt in pairCountMap.items():
key1, key2 = key
transitionCountMatrix[key1][key2] = cnt
itemCountMap[key1] += cnt
pairTotalCount += cnt
transitionMatrix = defaultdict(dict)
itemDistribution = defaultdict(dict)
for key1, transitionMap in transitionCountMatrix.items():
for key2, cnt in transitionMap.items():
transitionMatrix[key1][key2] = transitionCountMatrix[key1][key2] / itemCountMap[key1]
for itemid, cnt in itemCountMap.items():
itemDistribution[itemid] = cnt / pairTotalCount
return transitionMatrix, itemDistribution
def oneRandomWalk(transitionMatrix, itemDistribution, sampleLength):
sample = []
# pick the first element
randomDouble = random.random()
firstItem = ""
accumulateProb = 0.0
for item, prob in itemDistribution.items():
accumulateProb += prob
if accumulateProb >= randomDouble:
firstItem = item
break
sample.append(firstItem)
curElement = firstItem
i = 1
while i < sampleLength:
if (curElement not in itemDistribution) or (curElement not in transitionMatrix):
break
probDistribution = transitionMatrix[curElement]
randomDouble = random.random()
accumulateProb = 0.0
for item, prob in probDistribution.items():
accumulateProb += prob
if accumulateProb >= randomDouble:
curElement = item
break
sample.append(curElement)
i += 1
return sample
def randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength):
samples = []
for i in range(sampleCount):
samples.append(oneRandomWalk(transitionMatrix, itemDistribution, sampleLength))
return samples
def graphEmb(samples, spark, embLength, embOutputFilename, saveToRedis, redisKeyPrefix):
transitionMatrix, itemDistribution = generateTransitionMatrix(samples)
sampleCount = 20000
sampleLength = 10
newSamples = randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength)
rddSamples = spark.sparkContext.parallelize(newSamples)
trainItem2vec(spark, rddSamples, embLength, embOutputFilename, saveToRedis, redisKeyPrefix)
def generateUserEmb(spark, rawSampleDataPath, model, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
Vectors_list = []
for key, value in model.getVectors().items():
Vectors_list.append((key, list(value)))
fields = [
StructField('movieId', StringType(), False),
StructField('emb', ArrayType(FloatType()), False)
]
schema = StructType(fields)
Vectors_df = spark.createDataFrame(Vectors_list, schema=schema)
ratingSamples = ratingSamples.join(Vectors_df, on='movieId', how='inner')
result = ratingSamples.select('userId', 'emb').rdd.map(lambda x: (x[0], x[1])) \
.reduceByKey(lambda a, b: [a[i] + b[i] for i in range(len(a))]).collect()
with open(embOutputPath, 'w') as f:
for row in result:
vectors = " ".join([str(emb) for emb in row[1]])
f.write(row[0] + ":" + vectors + "\n")
def addDays(n, format="%Y%m%d"):
return (date.today() + timedelta(days=n)).strftime(format)
def get_spark(appName):
sparkConf = SparkConf()
sparkConf.set("spark.sql.crossJoin.enabled", True)
sparkConf.set("spark.debug.maxToStringFields", "100")
sparkConf.set("spark.tispark.plan.allow_index_double_read", False)
sparkConf.set("spark.tispark.plan.allow_index_read", True)
sparkConf.set("spark.hive.mapred.supports.subdirectories", True)
sparkConf.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", True)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.set("mapreduce.output.fileoutputformat.compress", False)
sparkConf.set("mapreduce.map.output.compress", False)
spark = (SparkSession
.builder
.config(conf=sparkConf)
.appName(appName)
.enableHiveSupport()
.getOrCreate())
return spark
if __name__ == '__main__':
start = time.time()
# 入参
trainDays = int(sys.argv[1])
# spark = get_spark("embedding")
print('trainDays:{}'.format(trainDays), flush=True)
endDay = addDays(-1)
startDay = addDays(-(1 + int(trainDays)))
print("train_data start:{} end:{}".format(startDay, endDay))
conf = SparkConf().setAppName('embedding').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# 行为数据
clickSql = getClickSql(startDay, endDay)
clickDF = spark.sql(clickSql)
df = clickDF.toPandas()
pd.DataFrame(df).to_csv("/tmp/service_click.csv",index=False)
# # Change to your own filepath
# file_path = 'file:///home/hadoop/SparrowRecSys/src/main/resources'
# rawSampleDataPath = file_path + "/webroot/sampledata/ratings.csv"
# embLength = 10
# samples = processItemSequence(spark, rawSampleDataPath)
# model = trainItem2vec(spark, samples, embLength,
# embOutputPath=file_path[7:] + "/webroot/modeldata2/item2vecEmb.csv", saveToRedis=False,
# redisKeyPrefix="i2vEmb")
# graphEmb(samples, spark, embLength, embOutputFilename=file_path[7:] + "/webroot/modeldata2/itemGraphEmb.csv",
# saveToRedis=True, redisKeyPrefix="graphEmb")
# generateUserEmb(spark, rawSampleDataPath, model, embLength,
# embOutputPath=file_path[7:] + "/webroot/modeldata2/userEmb.csv", saveToRedis=False,
# redisKeyPrefix="uEmb")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment