Commit ad1dcd76 authored by 郭羽's avatar 郭羽

update feature

parent 94fa10a8
...@@ -7,7 +7,7 @@ from elasticsearch import Elasticsearch ...@@ -7,7 +7,7 @@ from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan from elasticsearch.helpers import scan
import time import time
import redis
from pyspark import SparkContext, SparkConf from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
import pyspark.sql as sql import pyspark.sql as sql
...@@ -21,7 +21,7 @@ from collections import defaultdict ...@@ -21,7 +21,7 @@ from collections import defaultdict
import json import json
sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))) sys.path.append(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))
import utils.configUtils as configUtils import utils.configUtils as configUtils
import utils.connUtils as connUtils # import utils.connUtils as connUtils
...@@ -65,6 +65,13 @@ TRAIN_FILE_PATH = "service_feature_" + VERSION ...@@ -65,6 +65,13 @@ TRAIN_FILE_PATH = "service_feature_" + VERSION
ITEM_PREFIX = "item_" ITEM_PREFIX = "item_"
def getRedisConn():
pool = redis.ConnectionPool(host="172.16.50.145",password="XfkMCCdWDIU%ls$h",port=6379,db=0)
conn = redis.Redis(connection_pool=pool)
# conn = redis.Redis(host="172.16.50.145", port=6379, password="XfkMCCdWDIU%ls$h",db=0)
# conn = redis.Redis(host="172.18.51.10", port=6379,db=0) #test
return conn
def addItemFeatures(samples,itemDF,dataVocab,multiVocab): def addItemFeatures(samples,itemDF,dataVocab,multiVocab):
itemDF = itemDF.withColumnRenamed("id", "itemid") itemDF = itemDF.withColumnRenamed("id", "itemid")
...@@ -302,17 +309,17 @@ def getDataVocab(samples,model_columns): ...@@ -302,17 +309,17 @@ def getDataVocab(samples,model_columns):
return dataVocab return dataVocab
def dataVocabToRedis(dataVocab): def dataVocabToRedis(dataVocab):
conn = connUtils.getRedisConn() conn = getRedisConn()
conn.set(FEATURE_VOCAB_KEY,dataVocab) conn.set(FEATURE_VOCAB_KEY,dataVocab)
conn.expire(FEATURE_VOCAB_KEY,60 * 60 * 24 * 7) conn.expire(FEATURE_VOCAB_KEY,60 * 60 * 24 * 7)
def featureColumnsToRedis(columns): def featureColumnsToRedis(columns):
conn = connUtils.getRedisConn() conn = getRedisConn()
conn.set(FEATURE_COLUMN_KEY, json.dumps(columns)) conn.set(FEATURE_COLUMN_KEY, json.dumps(columns))
conn.expire(FEATURE_COLUMN_KEY, 60 * 60 * 24 * 7) conn.expire(FEATURE_COLUMN_KEY, 60 * 60 * 24 * 7)
def featureToRedis(key,datas): def featureToRedis(key,datas):
conn = connUtils.getRedisConn() conn = getRedisConn()
for k,v in datas.items(): for k,v in datas.items():
newKey = key+k newKey = key+k
conn.set(newKey,v) conn.set(newKey,v)
...@@ -334,7 +341,7 @@ def featuresToRedis(samples,columns,prefix,redisKey): ...@@ -334,7 +341,7 @@ def featuresToRedis(samples,columns,prefix,redisKey):
timestampCol = idCol+"_timestamp" timestampCol = idCol+"_timestamp"
def toRedis(datas): def toRedis(datas):
conn = connUtils.getRedisConn() conn = getRedisConn()
for d in datas: for d in datas:
k = d[idCol] k = d[idCol]
v = json.dumps(d.asDict(), ensure_ascii=False) v = json.dumps(d.asDict(), ensure_ascii=False)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment