Commit 66f1efca authored by 郭羽's avatar 郭羽

service embedding

parent 35f1f8a0
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.mllib.feature import Word2Vec
from pyspark.ml.linalg import Vectors
import random
from collections import defaultdict
import numpy as np
from pyspark.sql import functions as F
import sys
import time
from datetime import date, timedelta
import pandas as pd
from gensim.models import Word2Vec
import pickle
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.connUtils as connUtils
VERSION = "v1"
def getClickSql(start, end):
sql = """
......@@ -26,7 +23,7 @@ def getClickSql(start, end):
where action = 'page_view'
AND partition_date>='{startDay}' and partition_date<='{endDay}'
AND page_name='welfare_detail'
-- AND page_stay>=1
AND page_stay >= 10
AND cl_id is not null
AND cl_id != ''
AND business_id is not null
......@@ -63,179 +60,28 @@ def getClickSql(start, end):
print(sql)
return sql
def addDays(n, format="%Y%m%d"):
return (date.today() + timedelta(days=n)).strftime(format)
class UdfFunction:
@staticmethod
def sortF(movie_list, timestamp_list):
"""
sort by time and return the corresponding movie sequence
eg:
input: movie_list:[1,2,3]
timestamp_list:[1112486027,1212546032,1012486033]
return [3,1,2]
"""
pairs = []
for m, t in zip(movie_list, timestamp_list):
pairs.append((m, t))
# sort by time
pairs = sorted(pairs, key=lambda x: x[1])
return [x[0] for x in pairs]
def processItemSequence(spark, rawSampleDataPath):
# rating data
ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
# ratingSamples.show(5)
# ratingSamples.printSchema()
sortUdf = udf(UdfFunction.sortF, ArrayType(StringType()))
userSeq = ratingSamples \
.where(F.col("rating") >= 3.5) \
.groupBy("userId") \
.agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds')) \
.withColumn("movieIdStr", array_join(F.col("movieIds"), " "))
# userSeq.select("userId", "movieIdStr").show(10, truncate = False)
return userSeq.select('movieIdStr').rdd.map(lambda x: x[0].split(' '))
def embeddingLSH(spark, movieEmbMap):
movieEmbSeq = []
for key, embedding_list in movieEmbMap.items():
embedding_list = [np.float64(embedding) for embedding in embedding_list]
movieEmbSeq.append((key, Vectors.dense(embedding_list)))
movieEmbDF = spark.createDataFrame(movieEmbSeq).toDF("movieId", "emb")
bucketProjectionLSH = BucketedRandomProjectionLSH(inputCol="emb", outputCol="bucketId", bucketLength=0.1,
numHashTables=3)
bucketModel = bucketProjectionLSH.fit(movieEmbDF)
embBucketResult = bucketModel.transform(movieEmbDF)
print("movieId, emb, bucketId schema:")
embBucketResult.printSchema()
print("movieId, emb, bucketId data result:")
embBucketResult.show(10, truncate=False)
print("Approximately searching for 5 nearest neighbors of the sample embedding:")
sampleEmb = Vectors.dense(0.795, 0.583, 1.120, 0.850, 0.174, -0.839, -0.0633, 0.249, 0.673, -0.237)
bucketModel.approxNearestNeighbors(movieEmbDF, sampleEmb, 5).show(truncate=False)
def trainItem2vec(spark, samples, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
word2vec = Word2Vec().setVectorSize(embLength).setWindowSize(5).setNumIterations(10)
model = word2vec.fit(samples)
synonyms = model.findSynonyms("158", 20)
for synonym, cosineSimilarity in synonyms:
print(synonym, cosineSimilarity)
embOutputDir = '/'.join(embOutputPath.split('/')[:-1])
if not os.path.exists(embOutputDir):
os.makedirs(embOutputDir)
with open(embOutputPath, 'w') as f:
for movie_id in model.getVectors():
vectors = " ".join([str(emb) for emb in model.getVectors()[movie_id]])
f.write(movie_id + ":" + vectors + "\n")
embeddingLSH(spark, model.getVectors())
return model
def generate_pair(x):
# eg:
# watch sequence:['858', '50', '593', '457']
# return:[['858', '50'],['50', '593'],['593', '457']]
pairSeq = []
previousItem = ''
for item in x:
if not previousItem:
previousItem = item
def reverseCol(x):
res = []
datas = sorted(list(x.time_card))
last_time_stamp = int(datas[0].split("_")[0])
res_line = []
for d in datas:
time_stamp = int(d.split("_")[0])
cart_id = d.split("_")[1]
if (time_stamp - last_time_stamp) > 60 * 60:
if len(res_line) > 1:
res.append(res_line)
res_line = [cart_id]
else:
pairSeq.append((previousItem, item))
previousItem = item
return pairSeq
def generateTransitionMatrix(samples):
pairSamples = samples.flatMap(lambda x: generate_pair(x))
pairCountMap = pairSamples.countByValue()
pairTotalCount = 0
transitionCountMatrix = defaultdict(dict)
itemCountMap = defaultdict(int)
for key, cnt in pairCountMap.items():
key1, key2 = key
transitionCountMatrix[key1][key2] = cnt
itemCountMap[key1] += cnt
pairTotalCount += cnt
transitionMatrix = defaultdict(dict)
itemDistribution = defaultdict(dict)
for key1, transitionMap in transitionCountMatrix.items():
for key2, cnt in transitionMap.items():
transitionMatrix[key1][key2] = transitionCountMatrix[key1][key2] / itemCountMap[key1]
for itemid, cnt in itemCountMap.items():
itemDistribution[itemid] = cnt / pairTotalCount
return transitionMatrix, itemDistribution
res_line.append(cart_id)
last_time_stamp = time_stamp
if len(res_line) > 1:
res.append(res_line)
def oneRandomWalk(transitionMatrix, itemDistribution, sampleLength):
sample = []
# pick the first element
randomDouble = random.random()
firstItem = ""
accumulateProb = 0.0
for item, prob in itemDistribution.items():
accumulateProb += prob
if accumulateProb >= randomDouble:
firstItem = item
break
sample.append(firstItem)
curElement = firstItem
i = 1
while i < sampleLength:
if (curElement not in itemDistribution) or (curElement not in transitionMatrix):
break
probDistribution = transitionMatrix[curElement]
randomDouble = random.random()
accumulateProb = 0.0
for item, prob in probDistribution.items():
accumulateProb += prob
if accumulateProb >= randomDouble:
curElement = item
break
sample.append(curElement)
i += 1
return sample
def randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength):
samples = []
for i in range(sampleCount):
samples.append(oneRandomWalk(transitionMatrix, itemDistribution, sampleLength))
return samples
def graphEmb(samples, spark, embLength, embOutputFilename, saveToRedis, redisKeyPrefix):
transitionMatrix, itemDistribution = generateTransitionMatrix(samples)
sampleCount = 20000
sampleLength = 10
newSamples = randomWalk(transitionMatrix, itemDistribution, sampleCount, sampleLength)
rddSamples = spark.sparkContext.parallelize(newSamples)
trainItem2vec(spark, rddSamples, embLength, embOutputFilename, saveToRedis, redisKeyPrefix)
def generateUserEmb(spark, rawSampleDataPath, model, embLength, embOutputPath, saveToRedis, redisKeyPrefix):
ratingSamples = spark.read.format("csv").option("header", "true").load(rawSampleDataPath)
Vectors_list = []
for key, value in model.getVectors().items():
Vectors_list.append((key, list(value)))
fields = [
StructField('movieId', StringType(), False),
StructField('emb', ArrayType(FloatType()), False)
]
schema = StructType(fields)
Vectors_df = spark.createDataFrame(Vectors_list, schema=schema)
ratingSamples = ratingSamples.join(Vectors_df, on='movieId', how='inner')
result = ratingSamples.select('userId', 'emb').rdd.map(lambda x: (x[0], x[1])) \
.reduceByKey(lambda a, b: [a[i] + b[i] for i in range(len(a))]).collect()
with open(embOutputPath, 'w') as f:
for row in result:
vectors = " ".join([str(emb) for emb in row[1]])
f.write(row[0] + ":" + vectors + "\n")
def addDays(n, format="%Y%m%d"):
return (date.today() + timedelta(days=n)).strftime(format)
return res
def get_spark(appName):
sparkConf = SparkConf()
......@@ -260,7 +106,7 @@ if __name__ == '__main__':
start = time.time()
# 入参
trainDays = int(sys.argv[1])
spark = get_spark("embedding")
spark = get_spark("service_embedding")
print('trainDays:{}'.format(trainDays), flush=True)
......@@ -269,27 +115,37 @@ if __name__ == '__main__':
print("train_data start:{} end:{}".format(startDay, endDay))
# conf = SparkConf().setAppName('embedding').setMaster('local')
# spark = SparkSession.builder.config(conf=conf).getOrCreate()
# spark.sparkContext.setLogLevel("ERROR")
# 行为数据
clickSql = getClickSql(startDay, endDay)
clickDF = spark.sql(clickSql)
df = clickDF.toPandas()
pd.DataFrame(df).to_csv("/tmp/service_click.csv",index=False)
# # Change to your own filepath
# file_path = 'file:///home/hadoop/SparrowRecSys/src/main/resources'
# rawSampleDataPath = file_path + "/webroot/sampledata/ratings.csv"
# embLength = 10
# samples = processItemSequence(spark, rawSampleDataPath)
# model = trainItem2vec(spark, samples, embLength,
# embOutputPath=file_path[7:] + "/webroot/modeldata2/item2vecEmb.csv", saveToRedis=False,
# redisKeyPrefix="i2vEmb")
# graphEmb(samples, spark, embLength, embOutputFilename=file_path[7:] + "/webroot/modeldata2/itemGraphEmb.csv",
# saveToRedis=True, redisKeyPrefix="graphEmb")
# generateUserEmb(spark, rawSampleDataPath, model, embLength,
# embOutputPath=file_path[7:] + "/webroot/modeldata2/userEmb.csv", saveToRedis=False,
# redisKeyPrefix="uEmb")
df = pd.DataFrame(df)
# pd.DataFrame(df).to_csv("/tmp/service_click.csv",index=False)
print("count",df.count())
df["time_card"] = df["time_stamp"].map(str) + "_" + df["card_id"].map(str)
new_df = df.groupby(["device_id"]).apply(reverseCol).to_frame("card_seq").reset_index()
df1 = new_df.loc[new_df["card_seq"].map(len) > 1]
print("user seq size:",df1.count())
datas = df1["card_seq"].tolist()
train_datas = []
for d in datas:
train_datas.extend(d)
print("train size:",len(train_datas))
model = Word2Vec(train_datas, sg=1, vector_size=16, window=5, epochs=50)
s = pickle.dumps(model)
# 模型保存
conn = connUtils.getRedisConn()
model_key = "strategy:word2vec:{}:{}".format("service",VERSION)
model_status_key = "strategy:word2vec:status:{}:{}".format("service",VERSION)
conn.set(model_key,s)
conn.expire(model_key,60*60*24*30)
# 模型更新状态保存
conn.set(model_status_key,"1")
conn.expire(model_status_key,60*60*24*30)
conn.close()
\ No newline at end of file
......@@ -8,7 +8,6 @@ from datetime import date, timedelta
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.connUtils as connUtils
import utils.configUtils as configUtils
ITEM_NUMBER_COLUMNS = ["item_"+c for c in ["smart_rank2"]]
......@@ -34,21 +33,6 @@ def is_float(s):
except ValueError:
return False
#数据字典
def getDataVocabFromRedis(version):
conn = connUtils.getRedisConn()
key = "Strategy:rec:vocab:service:"+version
dataVocabStr = conn.get(key)
if dataVocabStr:
dataVocab = json.loads(str(dataVocabStr, encoding="utf-8"),encoding='utf-8')
print("-----data_vocab-----")
for k, v in dataVocab.items():
print(k, len(v))
else:
dataVocab = None
return dataVocab
# 数据类型转换
def csvTypeConvert(columns,df,data_vocab):
df["label"] = df["label"].astype("int")
......
import redis
def getRedisConn():
def getRedisConn4():
pool = redis.ConnectionPool(host="172.16.50.145",password="XfkMCCdWDIU%ls$h",port=6379,db=0)
conn = redis.Redis(connection_pool=pool)
# conn = redis.Redis(host="172.16.50.145", port=6379, password="XfkMCCdWDIU%ls$h",db=0)
# conn = redis.Redis(host="172.18.51.10", port=6379,db=0) #test
return conn
def getRedisConn():
pool = redis.ConnectionPool(host="172.16.40.133",password="ReDis!GmTx*0aN6",port=6379,db=0)
conn = redis.Redis(connection_pool=pool)
# conn = redis.Redis(host="172.16.50.145", port=6379, password="XfkMCCdWDIU%ls$h",db=0)
# conn = redis.Redis(host="172.18.51.10", port=6379,db=0) #test
return conn
if __name__ == '__main__':
# REDIS_URL = 'redis://:@172.18.51.10:6379'
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment