Commit 23677020 authored by 张彦钊's avatar 张彦钊

change tets

parent aee31959
import time
from prepareData import fetch_data
from read_filter import fetch_data
from utils import *
import pandas as pd
from config import *
......
import pymysql
import pandas as pd
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# 从数据库获取数据,并将数据转化成DataFrame
def get_data(sql):
cursor = db.cursor()
cursor.execute(sql)
data = cursor.fetchall()
data = pd.DataFrame(list(data)).dropna()
return data
# 获取全国点击量TOP2000日记
sql = "select city_id,cid where cid_type = 'diary' order by click_count_choice desc limit 2000"
allCitiesTop2000 = get_data(sql)
allCitiesTop2000 = allCitiesTop2000.rename(columns={0:"city_id",1:"cid"})
allCitiesTop2000.to_csv("\home\zhangyanzhao\diaryTestSet\allCitiesTop2000.csv")
print("成功获取全国日记点击量TOP2000")
# 获取全国城市列表
sql = "select distinct city_id from data_feed_click"
cityList = get_data(sql)
cityList.to_csv("\home\zhangyanzhao\diaryTestSet\cityList.csv")
cityList = cityList[0].values.tolist()
print("成功获取城市列表")
# 获取每个城市点击量TOP2000日记,如果数量小于2000,用全国点击量TOP2000日记补充
for i in cityList:
sql = "select city_id,cid from data_feed_click " \
"where cid_type = 'diary' and city_id = {0} " \
"order by click_count_choice desc limit 2000".format(i)
data = get_data(sql)
data = data.rename(columns={0:"city_id",1:"cid"})
if data.shape[0]<2000:
n = 2000-data.shape[0]
# 全国点击量TOP2000日记中去除该城市的日记
temp = allCitiesTop2000[allCitiesTop2000["city_id"]!=i].loc[:n-1]
data = data.append(temp)
else:
pass
file_name = "\home\zhangyanzhao\diaryTestSet\{0}DiaryTop2000.csv".format(i)
data.to_csv(file_name)
print("end")
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import redis
import sys
import os
import json
import pymysql
import numpy as np
import pandas as pd
import time
import datetime
def Json(x):
try:
data = json.loads(x[1])
if 'type' in data and 'device' in data:
if data['type'] == 'on_click_button' \
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
and data['params']['extra_param'][0]["card_content_type"] in ("diary","qa","user_post") \
and ("1" in data['params']['extra_param'][0]["feedback_type"]
or "2" in data['params']['extra_param'][0]["feedback_type"]):
return True
else:
return False
else:
return False
except Exception as e:
print("filter fail")
print(e)
def model(rdd):
try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
.map(lambda x:write_redis(x[0],x[1],x[2]))
return rdd
except:
print("fail")
def get_data(x):
try:
data = json.loads(x[1])
device_id = data['device']['device_id']
cid = data['params']['extra_param'][0]["card_id"]
card = data['params']['extra_param'][0]["card_content_type"]
return device_id,cid,card
except Exception as e:
print("get_data fail")
# send_email("get_data", "get_data", e)
def write_redis(device_id,cid,card):
if card == "diary":
diary_write(device_id, cid)
elif card == "qa":
question_write(device_id, cid)
elif card == "user_post":
tractate_write(device_id, cid)
def tractate_write(device_id, cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_tractate_tag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.tractate_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_tractate a left join src_mimas_prod_api_tractate_tag b " \
"on a.id=b.tractate_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="",db = 2)
key = str(device_id) + "_dislike_tractate"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("tractate insert redis fail")
print(e)
def question_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_questiontag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.question_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_question a left join src_mimas_prod_api_questiontag b " \
"on a.id=b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.tag_type = '3' and c.id = {}".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_qa"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
print("bucunza")
return "question good"
except Exception as e:
print("question insert redis fail")
print(e)
def diary_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("diary insert redis fail")
print(e)
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc,4)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
transformstream = stream.transform(lambda x:model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
......@@ -32,8 +32,8 @@ def gbk_decoder(s):
def maidian(x):
try:
if 'type' in x[1] and 'device' in x[1]:
data = x[1]
data = json.loads(x[1])
if 'type' in data and 'device' in data:
if data['type'] == 'on_click_button' \
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
......
from utils import con_sql
import datetime
import time
import pymysql
def fetch_data(start_date, end_date):
# 获取点击表里的device_id
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct device_id from data_feed_click"
click_device_id = con_sql(db,sql)[0].values.tolist()
print("成功获取点击表里的device_id")
# 获取点击表里的数据
sql = "select cid,device_id,time,stat_date from data_feed_click " \
"where stat_date >= '{0}' and stat_date <= '{1}'".format(start_date, end_date)
# 因为上面的db已经关了,需要再写一遍
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
click = con_sql(db,sql)
click = click.rename(columns={0: "cid", 1: "device_id", 2: "time_date", 3: "stat_date"})
print("成功获取点击表里的数据")
# 从time特征中抽取hour
click["hour"] = click["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).hour)
click["minute"] = click["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
click = click.drop("time_date", axis=1)
# 获取曝光表里的数据
sql = "select cid,device_id,time,stat_date from data_feed_exposure " \
"where stat_date >= '{0}' and stat_date <= '{1}'".format(start_date, end_date)
start = time.time()
# 因为上面的db已经关了,需要再写一遍
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
exposure = con_sql(db,sql)
end = time.time()
print("获取曝光表耗时{}分".format((end-start)/60))
exposure = exposure.rename(columns={0: "cid", 1: "device_id", 2: "time_date", 3: "stat_date"})
print("成功获取曝光表里的数据")
# 从time特征中抽取hour
exposure["hour"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).hour)
exposure["minute"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
exposure = exposure.drop("time_date", axis=1)
return exposure, click, click_device_id
import redis
import datetime
import json
def filter_history(device_id,cid_list):
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/1")
all_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
old_key = "TS:recommend_tractate_set:device_id:'{}':'{}'"\
.format(device_id,(datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_tractate_set:device_id:'{}':'{}'"\
.format(device_id,datetime.date.today().strftime("%Y-%m-%d"))
if r.exists(today_key):
r.sadd(today_key, *cid_list)
else:
r.sadd(today_key, *cid_list)
r.expire(today_key,15*24*60*60)
if r.exists(all_key) and r.exists(old_key):
r.sdiffstore(all_key, all_key, old_key)
r.delete(old_key)
r.expire(all_key, time=13*24*60*60)
r.sadd(all_key, *r.smembers(today_key))
def get_data():
device_id = "868080041007173"
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/0")
dislike_key = str(device_id) + "_dislike_diary"
dislike_cids = [2,20,40,61,81,101,121]
r.sadd(dislike_key,*dislike_cids)
print("不喜欢")
print(r.smembers(dislike_key))
user_portrait_diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % \
(device_id, datetime.datetime.now().strftime('%Y-%m-%d'))
user_cids = list(range(2,20))
user_cids = [str(i) for i in user_cids]
r.hset(user_portrait_diary_key,'diary_queue',json.dumps(user_cids))
r.hset(user_portrait_diary_key, 'cursor', "0")
r.hset(user_portrait_diary_key, 'len_cursor', "0")
print("画像")
print(r.hgetall(user_portrait_diary_key))
search_diary_recommend_key = "TS:search_recommend_diary_queue:device_id:" + str(device_id)
serach_cids = list(range(20,39))
serach_cids = [str(i) for i in serach_cids]
r.hset(search_diary_recommend_key, 'diary_queue', json.dumps(serach_cids))
print("search")
print(r.hgetall(search_diary_recommend_key))
diary_recommend_key = "TS:recommend_diary_queue:device_id:" + str(device_id)
ts_cids = list(range(40,60))
ts_cids = [str(i) for i in ts_cids]
r.hset(diary_recommend_key, 'diary_queue', json.dumps(ts_cids))
print("ts")
print(r.hgetall(diary_recommend_key))
use_city_id = "beijing"
personal_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
native_quue = ",".join([str(i) for i in range(60,80)])
nearby_quue = ",".join([str(i) for i in range(80,100)])
mea_queue = ",".join([str(i) for i in range(100,120)])
nation_queue = ",".join([str(i) for i in range(120,130)])
r.hset(personal_key,"native_queue",native_quue)
r.hset(personal_key, "nearby_queue", nearby_quue)
r.hset(personal_key, "nation_queue", nation_queue)
r.hset(personal_key, "megacity_queue", mea_queue)
print("personnal ")
print(r.hgetall(personal_key))
if __name__ == "__main__":
# cid = [16,18,20]
# filter_history("hello",cid)
get_data()
def get_esmm_users():
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
cursor = db.cursor()
stat_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
print(stat_date)
sql = "select distinct device_id,city_id from data_feed_exposure_precise " \
"where stat_date = '{}' limit 2".format(stat_date)
cursor.execute(sql)
result = cursor.fetchall()
print(result)
def get_user_profile(device_id):
pass
def transform_to_tag(user_profile):
pass
def get_queues(device_id,city_id):
pass
def tag_boost(cid_list, tag_score):
pass
def to_data_base(df,table_name = "tag_boost_device_diary_queue"):
pass
def make_sample(uid,city_id,native_queue,nearby_queue,megacity_queue,nation_queue):
pass
if __name__ == "__main__":
# users_list = get_esmm_users()
# total_samples = list()
# for i in users_list:
# tag_score = get_user_profile(i[0])
# native,nearby,megacity,nation = get_queues(i[0],i[1])
#
# native_sort_list = tag_boost(native, tag_score)
# nearby_sort_list = tag_boost(nearby, tag_score)
#
# sample = make_sample(uid,city_id,native_queue,nearby_queue,megacity_queue,nation_queue)
# total_samples.append(sample)
#
# total_samples.todf
# to_data_base(df)
get_esmm_users()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment