Commit 39735dfb authored by 张彦钊's avatar 张彦钊

deleye

parent abb9f06b
......@@ -40,8 +40,7 @@ if __name__ == '__main__':
"where action = 'page_precise_exposure' and page_name = 'search_result_welfare' " \
"AND partition_date='20190926'"
df = spark.sql(sql).rdd
# # df.show(6)
# # params['exposure_cards'],
df.persist()
total = []
rdd = df.map(lambda x:("a",position(eval(x[0]),10))).reduceByKey(lambda x,y:x+y).map(lambda x:x[1])
......
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import redis
import sys
import os
import json
import pymysql
import numpy as np
import pandas as pd
import time
import datetime
def Json(x):
data = json.loads(x[1])
if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params']:
if data['type'] == 'on_click_card' and data["device"]["device_id"] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6":
return True
else:
return False
else:
return False
def model(rdd):
try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
.map(lambda x:write_redis(x[0],x[1],x[2]))
return rdd
except:
print("fail")
def get_data(x):
try:
data = json.loads(x[1])
device_id = data['device']['device_id']
diary_id = data['params']["card_id"]
card = data['params']['card_content_type']
return device_id,diary_id,card
except Exception as e:
print("get_data fail")
# send_email("get_data", "get_data", e)
def write_redis(device_id,cid,card):
if card == "diary":
diary_write(device_id, cid)
elif card == "qa":
question_write(device_id, cid)
elif card == "user_post":
tractate_write(device_id, cid)
def tractate_write(device_id, cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_tractate_tag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.tractate_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_tractate a left join src_mimas_prod_api_tractate_tag b " \
"on a.id=b.tractate_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="",db = 2)
key = str(device_id) + "_dislike_tractate"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("tractate insert redis fail")
print(e)
def question_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_questiontag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.question_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_question a left join src_mimas_prod_api_questiontag b " \
"on a.id=b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.tag_type = '3' and c.id = {}".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_qa"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
print("bucunza")
return "question good"
except Exception as e:
print("question insert redis fail")
print(e)
def diary_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("diary insert redis fail")
print(e)
# sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
# ssc = StreamingContext(sc,4)
# sc.setLogLevel("WARN")
# kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
# "group.id": "dislike",
# "socket.timeout.ms": "600000",
# "auto.offset.reset": "largest"}
#
#
# stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
# transformstream = stream.transform(lambda x:model(x))
# transformstream.pprint()
#
# ssc.start()
# ssc.awaitTermination()
def make_data():
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
device_id = "868240031404281"
city_id = "beijing"
key_head = "device_diary_queue_rerank:device_id:"
# key_head = "device_diary_queue:device_id:"
key = key_head + device_id + ":city_id:" + city_id
native = ",".join([str(i) for i in (range(100, 130))])
nearby = ",".join([str(i) for i in (range(130, 150))])
nation = ",".join([str(i) for i in (range(150, 180))])
megacity = ",".join([str(i) for i in (range(180, 200))])
r.hset(name=key, key="native_queue", value=native)
r.hset(name=key, key="nearby_queue", value=nearby)
r.hset(name=key, key="nation_queue", value=nation)
r.hset(name=key, key="megacity_queue", value=megacity)
print(r.hgetall(key))
# key_head = "device_diary_queue_rerank:device_id:"
# make_data(device_id, "beijing", key_head)
# device_id = "868663038800476"
def topic():
device_id = "78687687"
dislike_key = str(device_id) + "_dislike_tractate"
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
r.sadd(dislike_key, *[1,2])
print(r.smembers(dislike_key))
search = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
a = [1]
a.extend(list(range(36, 50)))
r.hset(search, 'tractate_queue',json.dumps(a))
print(r.hgetall(search))
def delete_key():
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
keys = r.keys("TS:recommend_diary_set:device_id:*")
sum = 0
for i in keys:
if r.ttl(i) == -1:
sum = sum +1
r.delete(i)
print(sum)
keys = r.keys("TS:recommend_tractate_set:device_id:*")
for i in keys:
if r.ttl(i) == -1:
r.delete(i)
sum = sum + 1
print(sum)
keys = r.keys("TS:recommend_answer_set:device_id:*")
for i in keys:
if r.ttl(i) == -1:
r.delete(i)
sum = sum + 1
print(sum)
if __name__ == "__main__":
make_data()
delete_key()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment