Commit 06a45e9d authored by 张彦钊's avatar 张彦钊

Merge branch 'zhao' into 'master'

新增把esmm排序结果重排

See merge request !32
parents 25637f3e 3bfe07d5
This diff is collapsed.
# -*- coding: UTF-8 -*- # -*- coding: UTF-8 -*-
import pymysql import redis
import datetime import datetime
import pandas as pd import json
def get_yesterday_date(): if __name__ == "__main__":
today = datetime.date.today() device_id = "D17A3770-1CC7-4AFB-A9EA-6E667EE051FF"
yesterday = today - datetime.timedelta(days=1) search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id)
yesterday = yesterday.strftime("%Y-%m-%d") r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/1")
print(yesterday) cids = list(range(529405,529408))
return yesterday cids = [str(i) for i in cids]
def get_black_user():
conn2db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
cursor = conn2db.cursor()
sql = "select distinct device_id from blacklist"
cursor.execute(sql)
result = cursor.fetchall()
black_user = pd.DataFrame(list(result))[0].values.tolist()
cursor.close()
conn2db.close()
return black_user
def get_data():
conn2db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
cursor = conn2db.cursor()
sql = "select distinct device_id from ffm_diary_queue_temp where device_id regexp '[5|6]$'"
cursor.execute(sql)
result = cursor.fetchall()
device = pd.DataFrame(list(result))[0].values.tolist()
cursor.close()
conn2db.close()
device = tuple(set(device)-set(black))
return device
def ctr_all():
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
cursor = db.cursor()
sql_active = "select distinct device_id from data_feed_exposure " \
"where cid_type = 'diary'" \
"and device_id regexp'[5|6]$' and stat_date = '{}';".format(date)
cursor.execute(sql_active)
result = cursor.fetchall()
tail56 = pd.DataFrame(list(result))[0].values.tolist()
tail56 = set(tail56)-set(black)
print("当天尾号5或6活跃用户总数:")
print(len(tail56))
cover = len(tail56&set(device_id))
print("当天尾号5或6活跃用户覆盖数:")
print(cover)
cover_percent = format(cover / len(tail56), ".6f")
print("当天尾号5或6活跃用户覆盖率:")
print(cover_percent)
return len(tail56),cover,cover_percent value = json.dumps(cids)
r.hset(search_qa_recommend_key,'answer_queue',value)
def ctr():
sql_click = "select count(cid) from data_feed_click " \
"where (cid_type = 'diary' or cid_type = 'diary_video') " \
"and stat_date = '{}' and device_id in {};".format(date,device_id)
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_prod')
cursor = db.cursor()
cursor.execute(sql_click)
click = cursor.fetchone()[0]
print("实验用户点击数:"+str(click))
sql_exp = "select count(cid) from data_feed_exposure " \
"where cid_type = 'diary'" \
"and stat_date = '{}' and device_id in {};".format(date,device_id)
cursor.execute(sql_exp)
exp = cursor.fetchone()[0]
print("实验用户曝光数:"+str(exp))
print("实验用户点击率:"+str(click/exp))
return click,exp,format(click/exp,".6f") print(1)
def rate2file():
output_path = DIRECTORY_PATH + "56ctr.csv"
with open(output_path,'a+') as f:
line = date.replace('-', '')+','+str(temp_data[0])+','+str(temp_data[1])+','+str(temp_data[2])+\
","+str(data[0])+","+str(data[1])+","+str(data[2])+'\n'
f.write(line)
if __name__ == "__main__":
DIRECTORY_PATH = "/data/ffm/"
date = get_yesterday_date()
black = get_black_user()
device_id = get_data()
temp_data = ctr()
data = ctr_all()
rate2file()
This diff is collapsed.
import time import time
from prepareData import fetch_data from read_filter import fetch_data
from utils import * from utils import *
import pandas as pd import pandas as pd
from config import * from config import *
......
import pymysql
import pandas as pd
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
# 从数据库获取数据,并将数据转化成DataFrame
def get_data(sql):
cursor = db.cursor()
cursor.execute(sql)
data = cursor.fetchall()
data = pd.DataFrame(list(data)).dropna()
return data
# 获取全国点击量TOP2000日记
sql = "select city_id,cid where cid_type = 'diary' order by click_count_choice desc limit 2000"
allCitiesTop2000 = get_data(sql)
allCitiesTop2000 = allCitiesTop2000.rename(columns={0:"city_id",1:"cid"})
allCitiesTop2000.to_csv("\home\zhangyanzhao\diaryTestSet\allCitiesTop2000.csv")
print("成功获取全国日记点击量TOP2000")
# 获取全国城市列表
sql = "select distinct city_id from data_feed_click"
cityList = get_data(sql)
cityList.to_csv("\home\zhangyanzhao\diaryTestSet\cityList.csv")
cityList = cityList[0].values.tolist()
print("成功获取城市列表")
# 获取每个城市点击量TOP2000日记,如果数量小于2000,用全国点击量TOP2000日记补充
for i in cityList:
sql = "select city_id,cid from data_feed_click " \
"where cid_type = 'diary' and city_id = {0} " \
"order by click_count_choice desc limit 2000".format(i)
data = get_data(sql)
data = data.rename(columns={0:"city_id",1:"cid"})
if data.shape[0]<2000:
n = 2000-data.shape[0]
# 全国点击量TOP2000日记中去除该城市的日记
temp = allCitiesTop2000[allCitiesTop2000["city_id"]!=i].loc[:n-1]
data = data.append(temp)
else:
pass
file_name = "\home\zhangyanzhao\diaryTestSet\{0}DiaryTop2000.csv".format(i)
data.to_csv(file_name)
print("end")
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import redis
import sys
import os
import json
import pymysql
import numpy as np
import pandas as pd
import time
import datetime
def Json(x):
try:
data = json.loads(x[1])
if 'type' in data and 'device' in data:
if data['type'] == 'on_click_button' \
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
and data['params']['extra_param'][0]["card_content_type"] in ("diary","qa","user_post") \
and ("1" in data['params']['extra_param'][0]["feedback_type"]
or "2" in data['params']['extra_param'][0]["feedback_type"]):
return True
else:
return False
else:
return False
except Exception as e:
print("filter fail")
print(e)
def model(rdd):
try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
.map(lambda x:write_redis(x[0],x[1],x[2]))
return rdd
except:
print("fail")
def get_data(x):
try:
data = json.loads(x[1])
device_id = data['device']['device_id']
cid = data['params']['extra_param'][0]["card_id"]
card = data['params']['extra_param'][0]["card_content_type"]
return device_id,cid,card
except Exception as e:
print("get_data fail")
# send_email("get_data", "get_data", e)
def write_redis(device_id,cid,card):
if card == "diary":
diary_write(device_id, cid)
elif card == "qa":
question_write(device_id, cid)
elif card == "user_post":
tractate_write(device_id, cid)
def tractate_write(device_id, cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_tractate_tag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.tractate_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_tractate a left join src_mimas_prod_api_tractate_tag b " \
"on a.id=b.tractate_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="",db = 2)
key = str(device_id) + "_dislike_tractate"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("tractate insert redis fail")
print(e)
def question_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_questiontag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.question_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_question a left join src_mimas_prod_api_questiontag b " \
"on a.id=b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.tag_type = '3' and c.id = {}".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_qa"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
print("bucunza")
return "question good"
except Exception as e:
print("question insert redis fail")
print(e)
def diary_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("diary insert redis fail")
print(e)
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc,4)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
transformstream = stream.transform(lambda x:model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
This diff is collapsed.
import pymysql
import datetime
import json
import redis
import pandas as pd
from sqlalchemy import create_engine
def get_mysql_data(host,port,user,passwd,db,sql):
db = pymysql.connect(host=host, port=port, user=user, passwd=passwd,db=db)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def get_esmm_users():
try:
stat_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
sql = "select distinct device_id,city_id from data_feed_exposure_precise " \
"where stat_date = '{}'".format(stat_date)
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC','jerry_prod',sql)
result = list(result)
return result
except:
return []
def get_user_profile(device_id,top_k = 5):
try:
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = "user:portrait_tags:cl_id:" + str(device_id)
if r.exists(key):
tmp = json.loads(r.get(key).decode('utf-8'))
tag_score = {}
for i in tmp:
if i["type"] == "tag":
tag_score[i["content"]] = i["score"]
elif i["content"] in name_tag.keys():
tag_score[name_tag[i["content"]]] = i["score"]
tag_sort = sorted(tag_score.items(), key=lambda x: x[1], reverse=True)
tags = []
if len(tag_sort) > top_k:
for i in range(top_k):
tags.append(tag_sort[i][0])
else:
for i in tag_sort:
tags.append(i[0])
return tags
else:
return []
except:
return []
def get_searchworlds_to_tagid():
try:
sql = 'select id, name from api_tag where is_online = 1 and tag_type < 4'
tag_id = get_mysql_data('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql)
searchworlds_to_tagid = {}
for i in tag_id:
searchworlds_to_tagid[i[1]] = i[0]
return searchworlds_to_tagid
except Exception as e:
return {}
def get_queues(device_id,city_id):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
sql = "select native_queue, nearby_queue, nation_queue, megacity_queue from esmm_device_diary_queue " \
"where device_id = '{}' and city_id = '{}'".format(device_id, city_id)
cursor.execute(sql)
result = cursor.fetchone()
db.close()
if result is not None:
return list(result)
else:
return []
except:
return []
def tag_boost(cid_str, tag_list):
if cid_str is not None and cid_str != "":
cids = cid_str.split(",")
try:
if len(cids) > 6 and len(tag_list) > 0:
sql = "select id,group_concat(diary_id) from " \
"(select a.diary_id,b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type < '4' and a.diary_id in {}) tmp " \
"where id in {} group by id".format(tuple(cids), tuple(tag_list))
result = get_mysql_data('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC','eagle',sql)
if len(result) > 0:
tag_cids = {}
left_cids = []
for i in result:
tmp = i[1].split(",")
tmp = [i for i in cids if i in tmp]
tag_cids[i[0]] = tmp
left_cids.extend(tmp)
left_cids = list(set(left_cids))
right_cids = [i for i in cids if i not in left_cids]
tag_cids["right"] = right_cids
tag_list.append("right")
sort_cids = []
n = 0
while n != len(tag_cids) - 1:
for i in tag_list:
if i in tag_cids.keys():
if len(tag_cids[i]) > 0:
sort_cids.append(tag_cids[i][0])
value = tag_cids[i]
value.pop(0)
tag_cids[i] = value
if len(value) == 0 and i != "right":
n = n + 1
if len(tag_cids["right"]) > 0:
sort_cids.extend(tag_cids["right"])
news_ids = []
for id in sort_cids:
if id not in news_ids:
news_ids.append(id)
new_str = ",".join([str(i) for i in news_ids])
return new_str
else:
return cid_str
else:
return cid_str
except:
#TODO 往sentry发,并且在本地也要打出日志
return cid_str
else:
return cid_str
def to_data_base(df):
sql = "select distinct device_id from esmm_resort_diary_queue"
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC', 'jerry_test',sql)
old_uid = [i[0] for i in result]
if len(old_uid) > 0:
old_uid = set(df["device_id"].values)&set(old_uid)
old_number = len(old_uid)
if old_number > 0:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "delete from esmm_resort_diary_queue where device_id in {}".format(tuple(old_uid))
cursor = db.cursor()
cursor.execute(sql)
db.commit()
cursor.close()
db.close()
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.158:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(df, "esmm_resort_diary_queue", yconnect, schema='jerry_test', if_exists='append', index=False,
chunksize=200)
print("insert done")
if __name__ == "__main__":
users_list = get_esmm_users()
print("user number")
print(len(users_list))
name_tag = get_searchworlds_to_tagid()
n = 500
split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
for child_users_list in split_users_list:
total_samples = list()
for uid_city in child_users_list:
tag_list = get_user_profile(uid_city[0])
queues = get_queues(uid_city[0], uid_city[1])
if len(queues) > 0 and len(tag_list) > 0:
new_native = tag_boost(queues[0], tag_list)
new_nearby = tag_boost(queues[1], tag_list)
insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
total_samples.append(sample)
if len(total_samples) > 0:
df = pd.DataFrame(total_samples)
df = df.rename(columns={0: "device_id", 1: "city_id",2:"native_queue",
3:"nearby_queue",4:"nation_queue",5:"megacity_queue",6:"time"})
to_data_base(df)
This diff is collapsed.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import json
import msgpack
import pymysql
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import redis
import datetime
# filter logging
def gbk_decoder(s):
if s is None:
return None
try:
data = msgpack.loads(s,encoding='utf-8')
return data
except:
data = json.loads(s)
return data
def maidian(x):
try:
data = json.loads(x[1])
if 'type' in data and 'device' in data:
if data['type'] == 'on_click_button' \
and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
and data['params']['button_name'] == 'user_feedback_type' \
and data['params']['extra_param'][0]["card_content_type"] == "diary" \
and ("1" in data['params']['extra_param'][0]["feedback_type"]
or "2" in data['params']['extra_param'][0]["feedback_type"]):
return True
else:
return False
else:
return False
except Exception as e:
print("filter fail")
print(e)
def get_data(x):
try:
device_id = x[1]['device']['device_id']
diary_id = x[1]['params']['extra_param'][0]["card_id"]
return device_id,diary_id
except Exception as e:
print("get_data fail")
send_email("get_data", "get_data", e)
def write_redis(device_id,cid_list):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
tags = list(set([i[0] for i in result]))
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id in {} and c.tag_type = '3'".format(tuple(tags))
cursor.execute(sql)
result = cursor.fetchall()
if result is not None:
cids = list(set([i[0] for i in result]))
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = eval(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
else:
r.set(key, json.dumps(cids))
r.expire(key, 7*24*60*60)
except Exception as e:
print("insert redis fail")
print(e)
def model(rdd):
try:
rdd.filter(lambda x: maidian(x)).map(lambda x:get_data(x).na.drop().groupByKey())\
.map(lambda x:write_redis(x[0],x[1]))
except Exception as e:
print("fail")
print(e)
if __name__ == '__main__':
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set(
"spark.io.compression.codec", "lzf"))
ssc = StreamingContext(sc, 10)
sc.setLogLevel("WARN")
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id": "dislike",
"socket.timeout.ms": "600000",
"auto.offset.reset": "largest"}
try:
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder,
valueDecoder=gbk_decoder)
transformstream = stream.transform(lambda x: model(x))
transformstream.pprint()
ssc.start()
ssc.awaitTermination()
except Exception as e:
print(e)
# send_email(sc.appName, sc.applicationId, e)
import os
import time
def check():
out = os.popen("ps aux | grep diaryQueueUpdate.py").read()
flag = 1
for line in out.splitlines():
if 'python diaryQueueUpdate.py' in line:
flag = 2
return flag
if __name__ == "__main__":
#TODO 正式上线后,把下面的循环和time.sleep打开
# while True:
if check() == 1:
os.popen('python diaryQueueUpdate.py')
print("成功重启diaryQueueUpdate")
# time.sleep(300)
\ No newline at end of file
This diff is collapsed.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
import redis
import sys
import os
import json
import pymysql
import numpy as np
import pandas as pd
import time
import datetime
def Json(x):
data = json.loads(x[1])
if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params']:
if data['type'] == 'on_click_card' and data["device"]["device_id"] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6":
return True
else:
return False
else:
return False
def model(rdd):
try:
rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
.map(lambda x:write_redis(x[0],x[1],x[2]))
return rdd
except:
print("fail")
def get_data(x):
try:
data = json.loads(x[1])
device_id = data['device']['device_id']
diary_id = data['params']["card_id"]
card = data['params']['card_content_type']
return device_id,diary_id,card
except Exception as e:
print("get_data fail")
# send_email("get_data", "get_data", e)
def write_redis(device_id,cid,card):
if card == "diary":
diary_write(device_id, cid)
elif card == "qa":
question_write(device_id, cid)
elif card == "user_post":
tractate_write(device_id, cid)
def tractate_write(device_id, cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_tractate_tag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.tractate_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_tractate a left join src_mimas_prod_api_tractate_tag b " \
"on a.id=b.tractate_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="",db = 2)
key = str(device_id) + "_dislike_tractate"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("tractate insert redis fail")
print(e)
def question_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_questiontag a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.question_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_question a left join src_mimas_prod_api_questiontag b " \
"on a.id=b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and c.tag_type = '3' and c.id = {}".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_qa"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
print("cunza")
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
print("bucunza")
return "question good"
except Exception as e:
print("question insert redis fail")
print(e)
def diary_write(device_id,cid):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
if len(result) > 0:
tags = result[0][0]
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id = {} and c.tag_type = '3'".format(tags)
cursor.execute(sql)
result = cursor.fetchall()
db.close()
if len(result) > 0:
cids = [str(i[0]) for i in result]
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = json.loads(r.get(key).decode('utf-8'))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, cids)
else:
r.set(key, json.dumps(cids))
r.expire(key, 7 * 24 * 60 * 60)
except Exception as e:
print("diary insert redis fail")
print(e)
# sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
# ssc = StreamingContext(sc,4)
# sc.setLogLevel("WARN")
# kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
# "group.id": "dislike",
# "socket.timeout.ms": "600000",
# "auto.offset.reset": "largest"}
#
#
# stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
# transformstream = stream.transform(lambda x:model(x))
# transformstream.pprint()
#
# ssc.start()
# ssc.awaitTermination()
def make_data(device_id,city_id,key_head):
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
key = key_head + device_id + ":city_id:" + city_id
r.hset(name=key, key="native_queue", value=native)
r.hset(name=key, key="nearby_queue", value=nearby)
r.hset(name=key, key="nation_queue", value=nation)
r.hset(name=key, key="megacity_queue", value=megacity)
print(r.hgetall(key))
native = ",".join([str(i) for i in (range(100, 102))])
nearby = ",".join([str(i) for i in (range(102, 106))])
nation = ",".join([str(i) for i in (range(106, 110))])
megacity = ",".join([str(i) for i in (range(110, 118))])
key_head = "device_diary_queue_rerank:device_id:"
# key_head = "device_diary_queue:device_id:"
device_id = "868663038800471"
# make_data(device_id, "beijing", key_head)
# device_id = "868663038800476"
city_id = "beijing"
if __name__ == "__main__":
users_list = list(range(1,90))
n = 3
split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
for child_users_list in split_users_list:
total_samples = list()
for uid_city in child_users_list:
# tag_list = get_user_profile(uid_city[0])
# queues = get_queues(uid_city[0], uid_city[1])
# if len(queues) > 0 and len(tag_list) > 0:
# new_native = tag_boost(queues[0], tag_list)
# new_nearby = tag_boost(queues[1], tag_list)
#
# insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
# sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
total_samples.append(uid_city)
if len(total_samples) > 0:
df = pd.DataFrame(total_samples)
df = df.rename(columns={0: "device_id"})
print("df numbers")
print(df.shape[0])
# to_data_base(df)
from utils import con_sql
import datetime
import time
import pymysql
def fetch_data(start_date, end_date):
# 获取点击表里的device_id
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "select distinct device_id from data_feed_click"
click_device_id = con_sql(db,sql)[0].values.tolist()
print("成功获取点击表里的device_id")
# 获取点击表里的数据
sql = "select cid,device_id,time,stat_date from data_feed_click " \
"where stat_date >= '{0}' and stat_date <= '{1}'".format(start_date, end_date)
# 因为上面的db已经关了,需要再写一遍
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
click = con_sql(db,sql)
click = click.rename(columns={0: "cid", 1: "device_id", 2: "time_date", 3: "stat_date"})
print("成功获取点击表里的数据")
# 从time特征中抽取hour
click["hour"] = click["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).hour)
click["minute"] = click["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
click = click.drop("time_date", axis=1)
# 获取曝光表里的数据
sql = "select cid,device_id,time,stat_date from data_feed_exposure " \
"where stat_date >= '{0}' and stat_date <= '{1}'".format(start_date, end_date)
start = time.time()
# 因为上面的db已经关了,需要再写一遍
db = pymysql.connect(host='10.66.157.22', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
exposure = con_sql(db,sql)
end = time.time()
print("获取曝光表耗时{}分".format((end-start)/60))
exposure = exposure.rename(columns={0: "cid", 1: "device_id", 2: "time_date", 3: "stat_date"})
print("成功获取曝光表里的数据")
# 从time特征中抽取hour
exposure["hour"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).hour)
exposure["minute"] = exposure["time_date"].apply(lambda x: datetime.datetime.fromtimestamp(x).minute)
exposure = exposure.drop("time_date", axis=1)
return exposure, click, click_device_id
import redis
import datetime
import json
def filter_history(device_id,cid_list):
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/1")
all_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
old_key = "TS:recommend_tractate_set:device_id:'{}':'{}'"\
.format(device_id,(datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_tractate_set:device_id:'{}':'{}'"\
.format(device_id,datetime.date.today().strftime("%Y-%m-%d"))
if r.exists(today_key):
r.sadd(today_key, *cid_list)
else:
r.sadd(today_key, *cid_list)
r.expire(today_key,15*24*60*60)
if r.exists(all_key) and r.exists(old_key):
r.sdiffstore(all_key, all_key, old_key)
r.delete(old_key)
r.expire(all_key, time=13*24*60*60)
r.sadd(all_key, *r.smembers(today_key))
def get_dairy():
device_id = "868080041007173"
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/0")
dislike_key = str(device_id) + "_dislike_diary"
dislike_cids = [2,20,40,61,81,101,121]
r.sadd(dislike_key,*dislike_cids)
print("不喜欢")
print(r.smembers(dislike_key))
user_portrait_diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % \
(device_id, datetime.datetime.now().strftime('%Y-%m-%d'))
user_cids = list(range(2,6))
user_cids = [str(i) for i in user_cids]
r.hset(user_portrait_diary_key,'diary_queue',json.dumps(user_cids))
r.hset(user_portrait_diary_key, 'cursor', "0")
r.hset(user_portrait_diary_key, 'len_cursor', "0")
print("画像")
print(r.hgetall(user_portrait_diary_key))
search_diary_recommend_key = "TS:search_recommend_diary_queue:device_id:" + str(device_id)
serach_cids = list(range(20,26))
serach_cids = [str(i) for i in serach_cids]
r.hset(search_diary_recommend_key, 'diary_queue', json.dumps(serach_cids))
print("search")
print(r.hgetall(search_diary_recommend_key))
diary_recommend_key = "TS:recommend_diary_queue:device_id:" + str(device_id)
ts_cids = list(range(40,46))
ts_cids = [str(i) for i in ts_cids]
r.hset(diary_recommend_key, 'diary_queue', json.dumps(ts_cids))
print("ts")
print(r.hgetall(diary_recommend_key))
use_city_id = "beijing"
personal_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
native_quue = ",".join([str(i) for i in range(60,80)])
nearby_quue = ",".join([str(i) for i in range(80,100)])
mea_queue = ",".join([str(i) for i in range(100,120)])
nation_queue = ",".join([str(i) for i in range(120,130)])
r.hset(personal_key,"native_queue",native_quue)
r.hset(personal_key, "nearby_queue", nearby_quue)
r.hset(personal_key, "nation_queue", nation_queue)
r.hset(personal_key, "megacity_queue", mea_queue)
print("personnal ")
print(r.hgetall(personal_key))
def get_qa():
device_id = "868080041007173"
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/0")
dislike_key = str(device_id) + "_dislike_qa"
dislike_cids = [529401,529412,529403]
r.sadd(dislike_key, *dislike_cids)
print("不喜欢")
print(r.smembers(dislike_key))
search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id)
r.hset(search_qa_recommend_key,'answer_queue',json.dumps(list(range(529401,529408))))
print(r.hgetall(search_qa_recommend_key))
def get_topic():
device_id = "868080041007173"
r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/0")
dislike_key = str(device_id) + "_dislike_tractate"
dislike_cids = [2,37]
r.sadd(dislike_key, *dislike_cids)
print("不喜欢")
print(r.smembers(dislike_key))
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
r.hset(search_topic_recommend_key,'tractate_queue',json.dumps(list(range(1,4))))
print(r.hgetall(search_topic_recommend_key))
def yanzheng():
device_id = "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6"
r = redis.Redis(host="172.16.40.135", port=5379, db=2, socket_timeout=2000)
if __name__ == "__main__":
# cid = [16,18,20]
# filter_history("hello",cid)
get_topic()
import pymysql
import datetime
import json
import redis
import pandas as pd
from sqlalchemy import create_engine
def get_mysql_data(host,port,user,passwd,db,sql):
db = pymysql.connect(host=host, port=port, user=user, passwd=passwd,db=db)
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
db.close()
return result
def get_esmm_users():
try:
stat_date = (datetime.date.today() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
sql = "select distinct device_id,city_id from data_feed_exposure_precise " \
"where stat_date = '{}'".format(stat_date)
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC','jerry_prod',sql)
result = list(result)
return result
except:
return []
def get_user_profile(device_id,top_k = 5):
try:
r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
key = "user:portrait_tags:cl_id:" + str(device_id)
if r.exists(key):
tmp = json.loads(r.get(key).decode('utf-8'))
tag_score = {}
for i in tmp:
if i["type"] == "tag":
tag_score[i["content"]] = i["score"]
elif i["content"] in name_tag.keys():
tag_score[name_tag[i["content"]]] = i["score"]
tag_sort = sorted(tag_score.items(), key=lambda x: x[1], reverse=True)
tags = []
if len(tag_sort) > top_k:
for i in range(top_k):
tags.append(tag_sort[i][0])
else:
for i in tag_sort:
tags.append(i[0])
return tags
else:
return []
except:
return []
def get_searchworlds_to_tagid():
try:
sql = 'select id, name from api_tag where is_online = 1 and tag_type < 4'
tag_id = get_mysql_data('172.16.30.141', 3306, 'work', 'BJQaT9VzDcuPBqkd', 'zhengxing', sql)
searchworlds_to_tagid = {}
for i in tag_id:
searchworlds_to_tagid[i[1]] = i[0]
return searchworlds_to_tagid
except Exception as e:
return {}
def get_queues(device_id,city_id):
try:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
cursor = db.cursor()
sql = "select native_queue, nearby_queue, nation_queue, megacity_queue from esmm_device_diary_queue " \
"where device_id = '{}' and city_id = '{}'".format(device_id, city_id)
cursor.execute(sql)
result = cursor.fetchone()
db.close()
if result is not None:
return list(result)
else:
return []
except:
return []
def tag_boost(cid_str, tag_list):
if cid_str is not None and cid_str != "":
cids = cid_str.split(",")
try:
if len(cids) > 6 and len(tag_list) > 0:
sql = "select id,group_concat(diary_id) from " \
"(select a.diary_id,b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type < '4' and a.diary_id in {}) tmp " \
"where id in {} group by id".format(tuple(cids), tuple(tag_list))
result = get_mysql_data('172.16.40.158', 4000, 'root', '3SYz54LS9#^9sBvC','eagle',sql)
if len(result) > 0:
tag_cids = {}
left_cids = []
for i in result:
tmp = i[1].split(",")
tmp = [i for i in cids if i in tmp]
tag_cids[i[0]] = tmp
left_cids.extend(tmp)
left_cids = list(set(left_cids))
right_cids = [i for i in cids if i not in left_cids]
tag_cids["right"] = right_cids
tag_list.append("right")
sort_cids = []
n = 0
while n != len(tag_cids) - 1:
for i in tag_list:
if i in tag_cids.keys():
if len(tag_cids[i]) > 0:
sort_cids.append(tag_cids[i][0])
value = tag_cids[i]
value.pop(0)
tag_cids[i] = value
if len(value) == 0 and i != "right":
n = n + 1
if len(tag_cids["right"]) > 0:
sort_cids.extend(tag_cids["right"])
news_ids = []
for id in sort_cids:
if id not in news_ids:
news_ids.append(id)
new_str = ",".join([str(i) for i in news_ids])
return new_str
else:
return cid_str
else:
return cid_str
except:
#TODO 往sentry发,并且在本地也要打出日志
return cid_str
else:
return cid_str
def to_data_base(df):
sql = "select distinct device_id from esmm_resort_diary_queue"
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC', 'jerry_test',sql)
old_uid = [i[0] for i in result]
if len(old_uid) > 0:
old_uid = set(df["device_id"].values)&set(old_uid)
old_number = len(old_uid)
if old_number > 0:
db = pymysql.connect(host='172.16.40.158', port=4000, user='root',
passwd='3SYz54LS9#^9sBvC', db='jerry_test')
sql = "delete from esmm_resort_diary_queue where device_id in {}".format(tuple(old_uid))
cursor = db.cursor()
cursor.execute(sql)
db.commit()
cursor.close()
db.close()
yconnect = create_engine('mysql+pymysql://root:3SYz54LS9#^9sBvC@172.16.40.158:4000/jerry_test?charset=utf8')
pd.io.sql.to_sql(df, "esmm_resort_diary_queue", yconnect, schema='jerry_test', if_exists='append', index=False,
chunksize=200)
print("insert done")
def get_all_users():
try:
sql = "select distinct device_id,city_id from esmm_device_diary_queue"
result = get_mysql_data('172.16.40.158', 4000, 'root','3SYz54LS9#^9sBvC','jerry_test',sql)
result = list(result)
return result
except:
return []
if __name__ == "__main__":
# users_list = get_esmm_users()
# print("user number")
# print(len(users_list))
users_list = get_all_users()
name_tag = get_searchworlds_to_tagid()
n = 500
split_users_list = [users_list[i:i + n] for i in range(0, len(users_list), n)]
for child_users_list in split_users_list:
total_samples = list()
for uid_city in child_users_list:
tag_list = get_user_profile(uid_city[0])
queues = get_queues(uid_city[0], uid_city[1])
if len(queues) > 0 and len(tag_list) > 0:
new_native = tag_boost(queues[0], tag_list)
new_nearby = tag_boost(queues[1], tag_list)
insert_time = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
sample = [uid_city[0], uid_city[1], new_native, new_nearby, queues[2], queues[3], insert_time]
total_samples.append(sample)
if len(total_samples) > 0:
df = pd.DataFrame(total_samples)
df = df.rename(columns={0: "device_id", 1: "city_id",2:"native_queue",
3:"nearby_queue",4:"nation_queue",5:"megacity_queue",6:"time"})
to_data_base(df)
...@@ -6,15 +6,16 @@ ...@@ -6,15 +6,16 @@
import pandas as pd import pandas as pd
import pymysql import pymysql
from sqlalchemy import create_engine from sqlalchemy import create_engine
import redis
import json
def test(): def test():
conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf") conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
sc = SparkContext(conf = conf) sc = SparkContext(conf = conf)
spark = SparkSession.builder.enableHiveSupport().getOrCreate() spark = SparkSession.builder.enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark) ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test") ti.tidbMapDatabase("jerry_test")
spark = SparkSession.builder.appName("hello test").enableHiveSupport().getOrCreate() spark = SparkSession.builder.appName("hello test").enableHiveSupport().getOrCreate()
...@@ -24,11 +25,11 @@ def test(): ...@@ -24,11 +25,11 @@ def test():
spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
# hive_context.sql("SET mapreduce.job.queuename=data") sql = """select cl_id as device_id,params["business_id"] as cid_id,
# hive_context.sql("SET mapred.input.dir.recursive=true") (params["out"]-params["in"]) as dur_time from online.bl_hdfs_maidian_updates where action="page_view"
# hive_context.sql("SET hive.mapred.supports.subdirectories=true") and params["page_name"]="diary_detail" and partition_date = '20190801'
sql = "select user_id from online.tl_hdfs_maidian_view where partition_date = '20190412' limit 10" """
spark.sql(sql).show(6) df = spark.sql(sql)
...@@ -40,39 +41,37 @@ def con_sql(db,sql): ...@@ -40,39 +41,37 @@ def con_sql(db,sql):
db.close() db.close()
return df return df
def write_redis(device_id,cid_list):
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
"on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
cursor = db.cursor()
cursor.execute(sql)
result = cursor.fetchall()
tags = list(set([i[0] for i in result]))
if tags is not None:
sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
"on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
"where a.is_online = 1 and a.content_level >= '3' " \
"and c.id in {} and c.tag_type = '3'".format(tuple(tags))
cursor.execute(sql)
result = cursor.fetchall()
if result is not None:
cids = list(set([i[0] for i in result]))
r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
key = str(device_id) + "_dislike_diary"
if r.exists(key):
value = eval(r.get(key))
value.extend(cids)
cids = json.dumps(list(set(value)))
r.set(key, json.dumps(cids))
if __name__ == '__main__': if __name__ == '__main__':
# sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \ a = [15202811, 15825403, 16480766, 15432195, 15759876]
# .set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \ d = "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6"
# .set("spark.tispark.plan.allow_index_double_read", "false") \ write_redis(d, a)
# .set("spark.tispark.plan.allow_index_read", "true") \
# .set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
# .set("spark.tispark.pd.addresses", "172.16.40.158:2379").set("spark.io.compression.codec", "lzf") \
# .set("spark.driver.maxResultSize", "8g")
#
# spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
# ti = pti.TiContext(spark)
# ti.tidbMapDatabase("jerry_test")
# spark.sparkContext.setLogLevel("WARN")
# sql = "select stat_date,cid_id,y,ccity_name from esmm_train_data limit 60"
# spark.sql(sql).show(6)
sql = "select level2_id,concat('t',treatment_method)," \
"concat('min',price_min),concat('max',price_max)," \
"concat('tr',treatment_time),concat('m',maintain_time)," \
"concat('r',recover_time) from jerry_test.train_Knowledge_network_data"
db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='jerry_test')
df = con_sql(db, sql)
df = df.rename(columns={0: "level2_id", 1: "treatment_method",2:"price_min",3:"price_max",4:"treatment_time",
5:"maintain_time",6:"recover_time"})
print(df.head(6))
host = '172.16.40.158'
port = 4000
user = 'root'
password = '3SYz54LS9#^9sBvC'
db = 'jerry_test'
charset = 'utf8'
engine = create_engine(str(r"mysql+pymysql://%s:" + '%s' + "@%s:%s/%s") % (user, password, host, port, db))
df.to_sql('knowledge', con=engine, if_exists='append', index=False, chunksize=8000)
print("insert done")
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment