Commit 80a52b52 authored by 张彦钊's avatar 张彦钊

change

parent 7389bd28
from utils import *
from config import *
if __name__ == "__main__":
test = pd.read_csv(DIRECTORY_PATH + "test_ffm_data.csv", header=None)
test_label = test[0].apply(lambda x: x[0]).values
predict = pd.read_csv(DIRECTORY_PATH + "test_set_predict_output.txt", header=None)[0].values
get_roc_curve(test_label, predict, "1")
from itertools import chain, islice, cycle
import datetime
from collections import Counter
from gm_types.gaia import DIARY_ORDER_TYPE
from gm_types.doris import ANSWER_SORT_TYPE
from gm_types.doris import ARTICLE_SORT_TYPE
from gm_types.mimas import CONTENT_CLASS
from gm_types.doris import CARD_TYPE
from gm_types.gaia import CITY_LEVEL
from gm_rpcd.all import bind
import traceback
from search.utils.diary import recall_diary
from search.utils.answer import recall_answers
from search.utils.article import recall_articles
from gm_rpcd.all import context
from libs.algorithms import drop_dup
from libs.cache import redis_client
from libs.error import logging_exception
from extend.models.gaia import City, CityScale
from extend.models.gold import (
QAQueue,
WikiQueue,
IconQueue,
UserTopicQueue,
DoctorTopicQueue,
DiaryQueue,
ArticleQueue,
AnswerQueue,
DeviceQAQueue,
DeviceIconQueue,
DeviceUserTopicQueue,
DeviceDoctorTopicQueue,
DeviceAnswerQueue,
DeviceArticleQueue,
DeviceDiaryQueue,
QuestionQueue,
DeviceQuestionQueue
)
import logging
import redis
import json
from django.conf import settings
import traceback
MAX_LOAD = 200
logger = logging.getLogger(__name__)
@bind("doris/recommend/get_diaries")
def get_diaries(tags, city, offset=0, size=10, city_tag_id=None):
# NOTE: city as city id
sort_params = {}
if city_tag_id:
sort_params["user_city_tag_id"] = city_tag_id
elif city:
try:
x = City.objects.get(id=city)
sort_params["user_city_tag_id"] = x.tag_id
except City.DoesNotExist:
pass
filters = {
"is_sink": False,
"has_before_cover": True,
"has_after_cover": True,
"content_level_is_good": True
}
if tags:
filters["closure_tag_ids"] = tags
tail = offset + size
diaries_ids = []
if tail < MAX_LOAD:
diaries = recall_diary(None, 0, 200, filters, DIARY_ORDER_TYPE.RECOMMEND, sort_params, fields=["id", "user.id"])
diaries_items = [(diary['id'], diary['user']['id']) for diary in diaries]
drop_dup_diaries = drop_dup(diaries_items)
drop_dup_size = len(drop_dup_diaries)
if tail <= drop_dup_size:
diaries_ids = [item[0] for item in drop_dup_diaries[offset:tail]]
if len(diaries_ids) == 0: # 如果头200条去重结束 后面的排序不去重
diaries = recall_diary(None, offset, size, filters, DIARY_ORDER_TYPE.RECOMMEND, sort_params, fields=["id"])
diaries_ids = [diary['id'] for diary in diaries]
return {"diaries_ids": diaries_ids}
@bind("doris/recommend/get_articles")
def get_articles(tags, offset=0, size=10):
filters = {
"content_level": [CONTENT_CLASS.EXCELLENT, CONTENT_CLASS.FINE]
}
if tags:
filters["tag_ids"] = tags
articles = recall_articles(None, offset, size, filters, ARTICLE_SORT_TYPE.RECOMMEND, {})
article_ids = [article['id'] for article in articles]
return {"article_ids": article_ids}
@bind("doris/recommend/get_answers")
def get_answers(tags, offset=0, size=10):
filters = {
"content_level": [CONTENT_CLASS.EXCELLENT, CONTENT_CLASS.FINE]
}
if tags:
filters["tag_ids"] = tags
tail = offset + size
answer_ids = []
if tail < MAX_LOAD:
answers = recall_answers(None, 0, MAX_LOAD, filters, ANSWER_SORT_TYPE.RECOMMEND, {}, fields=["id", "user_id"])
answers = filter(lambda answer: "id" in answer and "user_id" in answer, answers)
answer_items = [(answer["id"], answer["user_id"]) for answer in answers]
drop_dup_answers = drop_dup(answer_items)
if tail <= len(drop_dup_answers):
answer_ids = [item[0] for item in drop_dup_answers[offset:tail]]
if len(answer_ids) == 0:
answers = recall_answers(None, offset, size, filters, ANSWER_SORT_TYPE.RECOMMEND, {})
answer_ids = [answer['id'] for answer in answers]
return {"answer_ids": answer_ids}
@bind('doris/recommend/icon')
def fetch_icon(device_id, size):
try:
card_type = "icon"
try:
que = DeviceIconQueue.objects.get(device_id=device_id)
except DeviceIconQueue.DoesNotExist:
que = IconQueue.objects.last()
if not que:
return {"icon": []}
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
return {card_type: list(map(int, data))}
except:
logging_exception()
return {"icon": []}
@bind('doris/recommend/homepage_polymer')
def fetch_polymer_ids(device_id, size):
try:
card_type = "polymer_ids"
try:
que = DeviceIconQueue.objects.get(device_id=device_id)
except DeviceIconQueue.DoesNotExist:
que = IconQueue.objects.last()
if not que:
return {"polymer_ids": []}
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
data = list(islice(cycle(que), cursor, cursor + size))
return {card_type: list(map(int, data))}
except:
logging_exception()
return {"polymer_ids": []}
@bind('doris/recommend/feed')
def recommend_feed(device_id, card_type, city_id, size):
try:
return RecommendFeed.dispatch(device_id, card_type,
city_id, size)
except:
logging_exception()
return {card_type: []}
class RecommendFeed:
@classmethod
def dispatch(cls, device_id, card_type, city_id, size):
data = []
if card_type == CARD_TYPE.QA:
data = cls.fetch_qa(device_id, card_type, size)
elif card_type == CARD_TYPE.ANSWER:
data = cls.fetch_answer(device_id, card_type, size)
data = list(map(int, data))
elif card_type == CARD_TYPE.ARTICLE:
data = cls.fetch_article(device_id, card_type, size)
data = list(map(int, data))
elif card_type == CARD_TYPE.QUESTION:
data = cls.fetch_question(device_id, card_type, size)
data = list(map(int, data))
elif card_type == CARD_TYPE.DIARY:
data = cls.fetch_diary(device_id, card_type, city_id, size)
elif card_type == CARD_TYPE.USERTOPIC:
data = cls.fetch_user_topic(device_id,card_type,size)
elif card_type == CARD_TYPE.DOCTORTOPIC:
data = cls.fetch_doctor_topic(device_id,card_type,size)
data = list(map(int, data))
elif card_type == CARD_TYPE.ENCYCLOPEDIA:
data = cls.fetch_wiki(device_id,card_type,size)
return {card_type: data}
@staticmethod
def current_date():
return datetime.datetime.now().strftime('%Y-%m-%d')
@staticmethod
def fetch_question(device_id, card_type, size):
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceQuestionQueue.objects.get(device_id=device_id)
except DeviceQuestionQueue.DoesNotExist:
que = QuestionQueue.objects.last()
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return list(islice(cycle(que), cursor, cursor + size))
@staticmethod
def fetch_icon(device_id, card_type, size):
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceIconQueue.objects.get(device_id=device_id)
except DeviceIconQueue.DoesNotExist:
que = IconQueue.objects.last()
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return list(islice(cycle(que), cursor, cursor + size))
@staticmethod
def fetch_wiki(device_id, card_type, size):
try:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
que = WikiQueue.objects.last()
if not que:
return []
# que = list(filter(None, que.queue.split(',')))
que = json.loads(que.queue)
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return list(islice(cycle(que), cursor, cursor + size))
except:
logging_exception()
return []
@staticmethod
def fetch_answer(device_id, card_type, size):
try:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceAnswerQueue.objects.get(device_id=device_id)
except DeviceAnswerQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return list(islice(cycle(que), cursor, cursor + size))
except:
logging_exception()
return []
@staticmethod
def fetch_qa(device_id, card_type, size):
try:
def get_after_filter_qa():
try:
return json.loads(gmkv.get(after_filter_key))
except:
return []
def write_after_filter_qa(cid_list):
try:
if gmkv.exists(after_filter_key):
gmkv.set(after_filter_key, json.dumps(cid_list))
else:
gmkv.set(after_filter_key, json.dumps(cid_list),ex = 6*60*60)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
def filter_qa(device_id,cid_list):
try:
key = str(device_id) + "_dislike_qa"
if gmkv.exists(key):
dislike = gmkv.smembers(key)
cid_list = [i for i in cid_list if str(i) not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def read_history(cid_list):
if redis_client.exists(today_qa_key):
redis_client.sadd(today_qa_key, *cid_list)
else:
redis_client.sadd(today_qa_key, *cid_list)
redis_client.expire(today_qa_key, 15 * 24 * 60 * 60)
if redis_client.exists(read_qa_key) and redis_client.exists(old_qa_key):
redis_client.sdiffstore(read_qa_key, read_qa_key, old_qa_key)
redis_client.delete(old_qa_key)
redis_client.expire(read_qa_key, time=13 * 24 * 60 * 60)
redis_client.sadd(read_qa_key, *cid_list)
search_qa_recommend_list = list()
read_qa_key = "TS:recommend_answer_set:device_id:" + str(device_id)
old_qa_key = "TS:recommend_answer_set:device_id:{}:{}"\
.format(device_id,(datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_qa_key = "TS:recommend_answer_set:device_id:{}:{}"\
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
answer_queue_key = "qa_is_tail:" + str(device_id)
after_filter_key = "device_qa_after_filter:device_id:" + str(device_id)
gmkv = redis.Redis(host="172.16.40.135", port=5379, db=2)
if device_id != '0':
search_qa_recommend_key = "TS:search_recommend_answer_queue:device_id:" + str(device_id)
if redis_client.exists(search_qa_recommend_key):
search_qa_recommend_dict = redis_client.hgetall(search_qa_recommend_key)
queue_list = json.loads(search_qa_recommend_dict[b'answer_queue'])
queue_list = filter_qa(device_id, queue_list)
if len(queue_list) == 0:
redis_client.delete(search_qa_recommend_key)
elif len(queue_list) == 1:
size = size - 1
search_qa_recommend_list = queue_list
redis_client.delete(search_qa_recommend_key)
else:
size = size - 1
search_qa_recommend_list.append(queue_list[0])
redis_client.hset(search_qa_recommend_key,"answer_queue",json.dumps(queue_list[1:]))
if gmkv.exists(answer_queue_key):
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif gmkv.exists(after_filter_key):
que = get_after_filter_qa()
que = filter_qa(device_id,que)
if len(que) == 0:
gmkv.set(answer_queue_key,"tail",ex = 6*60*60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(que) <= size:
search_qa_recommend_list.extend(que)
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
search_qa_recommend_list.extend(que[:size])
write_after_filter_qa(que[size:])
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
try:
que = DeviceQAQueue.objects.get(device_id=device_id)
except DeviceQAQueue.DoesNotExist:
que = AnswerQueue.objects.last()
if not que:
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
qa = list(filter(None, que.queue.split(',')))
if device_id != "0":
qa = filter_qa(device_id,qa)
if len(qa) == 0:
if device_id != "0":
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
if len(search_qa_recommend_list) > 0:
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
read_history(search_qa_recommend_list)
return search_qa_recommend_list
elif len(qa) <= size:
search_qa_recommend_list.extend(qa)
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
if device_id != "0":
gmkv.set(answer_queue_key, "tail", ex=6 * 60 * 60)
read_history(search_qa_recommend_list)
return search_qa_recommend_list
else:
search_qa_recommend_list.extend(qa[:size])
search_qa_recommend_list = list(map(int, search_qa_recommend_list))
if device_id != "0":
write_after_filter_qa(qa[size:])
read_history(search_qa_recommend_list)
return search_qa_recommend_list
except:
logging_exception()
return []
@staticmethod
def fetch_article(device_id, card_type, size):
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceArticleQueue.objects.get(device_id=device_id)
except DeviceArticleQueue.DoesNotExist:
que = ArticleQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return list(islice(cycle(que), cursor, cursor + size))
@staticmethod
def fetch_user_topic(device_id, card_type, size):
try:
def filter_topic(cid_list):
try:
if gmkv.exists(dislike_key):
dislike = gmkv.smembers(dislike_key)
cid_list = [i for i in cid_list if str(i) not in dislike]
return cid_list
else:
return cid_list
except:
return cid_list
def write_after_filter_tractate(cid_list):
try:
if gmkv.exists(after_filter_key):
gmkv.set(after_filter_key, json.dumps(cid_list))
else:
gmkv.set(after_filter_key, json.dumps(cid_list), ex=6 * 60 * 60)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
def get_filter_tractate():
try:
return json.loads(gmkv.get(after_filter_key))
except:
return []
def read_history(cid_list):
if redis_client.exists(today_key):
redis_client.sadd(today_key, *cid_list)
else:
redis_client.sadd(today_key, *cid_list)
redis_client.expire(today_key, 15 * 24 * 60 * 60)
if redis_client.exists(read_key) and redis_client.exists(old_key):
redis_client.sdiffstore(read_key, read_key, old_key)
redis_client.delete(old_key)
redis_client.expire(read_key, time=13 * 24 * 60 * 60)
redis_client.sadd(read_key, *cid_list)
dislike_key = str(device_id) + "_dislike_tractate"
search_topic_recommend_key = "TS:search_recommend_tractate_queue:device_id:" + str(device_id)
after_filter_key = "device_tractate_after_filter:device_id:" + str(device_id)
tractate_key = "tractate_is_tail" + str(device_id)
read_key = "TS:recommend_tractate_set:device_id:" + str(device_id)
old_key = "TS:recommend_tractate_set:device_id:{}:{}"\
.format(device_id,(datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_tractate_set:device_id:{}:{}"\
.format(device_id,datetime.date.today().strftime("%Y-%m-%d"))
search_list = list()
gmkv = redis.Redis(host="172.16.40.135", port=5379, db=2)
if (device_id != '0') and size >= 2:
if redis_client.exists(search_topic_recommend_key):
search_topic_recommend_dict = redis_client.hgetall(search_topic_recommend_key)
search_topic_recommend_list = json.loads(search_topic_recommend_dict[b'tractate_queue'])
search_topic_recommend_list = filter_topic(search_topic_recommend_list)
if len(search_topic_recommend_list) == 0:
redis_client.delete(search_topic_recommend_key)
elif len(search_topic_recommend_list) <= 2:
search_list = search_topic_recommend_list
size = size - len(search_list)
redis_client.delete(search_topic_recommend_key)
else:
search_list = search_topic_recommend_list[:2]
size = size - 2
redis_client.hset(search_topic_recommend_key, 'tractate_queue',
json.dumps(search_topic_recommend_list[2:]))
if gmkv.exists(tractate_key):
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif gmkv.exists(after_filter_key):
que = get_filter_tractate()
que = filter_topic(que)
if len(que) == 0:
gmkv.set(tractate_key,"tail",ex = 2*60*60)
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif len(que) <= size:
search_list.extend(que)
gmkv.set(tractate_key, "tail",ex = 2*60*60)
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
else:
search_list.extend(que[:size])
write_after_filter_tractate(que[size:])
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
try:
que = DeviceUserTopicQueue.objects.get(device_id=device_id)
except DeviceUserTopicQueue.DoesNotExist:
que = UserTopicQueue.objects.last()
if not que:
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
qa = list(filter(None, que.queue.split(',')))
if device_id != "0":
qa = filter_topic(qa)
if len(qa) == 0:
if device_id != "0":
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
if len(search_list) > 0:
search_list = list(map(int, search_list))
read_history(search_list)
return search_list
elif len(qa) <= size:
search_list.extend(qa)
search_list = list(map(int, search_list))
if device_id != "0":
gmkv.set(tractate_key, "tail", ex=2 * 60 * 60)
read_history(search_list)
return search_list
else:
search_list.extend(qa[:size])
search_list = list(map(int, search_list))
if device_id != "0":
write_after_filter_tractate(qa[size:])
read_history(search_list)
return search_list
except:
logging_exception()
return []
@staticmethod
def fetch_doctor_topic(device_id, card_type, size):
try:
key = '{device_id}-{card_type}-{date}'.format(device_id=device_id,
card_type=card_type, date=RecommendFeed.current_date())
try:
que = DeviceDoctorTopicQueue.objects.get(device_id=device_id)
except DeviceDoctorTopicQueue.DoesNotExist:
que = DoctorTopicQueue.objects.last()
if not que:
return []
que = list(filter(None, que.queue.split(',')))
# adjust args.
cursor = redis_client.get(key) or 0
cursor = int(cursor) % len(que)
size = min(size, len(que))
redis_client.set(key, cursor + size, ex=24 * 60 * 60)
return list(islice(cycle(que), cursor, cursor + size))
except:
logging_exception()
return []
@classmethod
def get_gm_kv_ins(cls,redis_ip, redis_port, redis_db, redis_password=""):
try:
if len(redis_password) == 0:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, socket_timeout=2)
else:
cli_ins = redis.Redis(host=redis_ip, port=redis_port, db=redis_db, password=redis_password,
socket_timeout=2)
cli_ins.ping()
return cli_ins
except:
return None
@classmethod
def fetch_diary_queue_data(cls, city_id, device_id=None):
local = list()
nearby = list()
nation = list()
megacity = list()
use_city_id = city_id
try:
gm_kv_ins = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gm_kv_ins = cls.get_gm_kv_ins(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"],redis_password=gm_kv_host_item["password"])
if gm_kv_ins:
break
specify_city_id_key = "diary_queue:city_id:" + use_city_id
world_city_id_key = "diary_queue:city_id:world"
if device_id is not None:
specify_city_id_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
city_val_dict = gm_kv_ins.hgetall(specify_city_id_key)
if len(city_val_dict) == 0:
city_val_dict = gm_kv_ins.hgetall(world_city_id_key)
use_city_id = "world"
if b"native_queue" in city_val_dict and city_val_dict[b"native_queue"]:
local = list(filter(None, city_val_dict[b"native_queue"].split(b",")))
if b"nearby_queue" in city_val_dict and city_val_dict[b"nearby_queue"]:
nearby = list(filter(None, city_val_dict[b"nearby_queue"].split(b",")))
if b"nation_queue" in city_val_dict and city_val_dict[b"nation_queue"]:
nation = list(filter(None, city_val_dict[b"nation_queue"].split(b",")))
if b"megacity_queue" in city_val_dict and city_val_dict[b"megacity_queue"]:
megacity = list(filter(None, city_val_dict[b"megacity_queue"].split(b",")))
return (local, nearby, nation, megacity, use_city_id)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
qs = DiaryQueue.objects.filter(city_id__in=[city_id, 'world'])
# Assume that world queue must exist.
if len(qs) == 1:
obj = qs[0]
else:
obj = qs[0] if qs[0].city_id == city_id else qs[1]
if obj.native_queue:
local = list(filter(None, obj.native_queue.split(',')))
if obj.nearby_queue:
nearby = list(filter(None, obj.nearby_queue.split(',')))
if obj.nation_queue:
nation = list(filter(None, obj.nation_queue.split(',')))
if obj.megacity_queue:
megacity = list(filter(None, obj.megacity_queue.split(',')))
use_city_id = obj.city_id if obj else use_city_id
return (local, nearby, nation, megacity, use_city_id)
@classmethod
def fetch_device_diary_queue_data(cls, city_id, device_id):
local = list()
nearby = list()
nation = list()
megacity = list()
use_city_id = city_id
try:
gm_kv_ins = None
for gm_kv_host_item in settings.GM_KV_HOSTS:
gm_kv_ins = cls.get_gm_kv_ins(redis_ip=gm_kv_host_item["host"], redis_port=gm_kv_host_item["port"], redis_db=gm_kv_host_item["db"],redis_password=gm_kv_host_item["password"])
if gm_kv_ins:
break
specify_city_id_key = "device_diary_queue:device_id:" + device_id + ":city_id:" + use_city_id
city_val_dict = gm_kv_ins.hgetall(specify_city_id_key)
if b"native_queue" in city_val_dict and city_val_dict[b"native_queue"]:
local = list(filter(None, city_val_dict[b"native_queue"].split(b",")))
if b"nearby_queue" in city_val_dict and city_val_dict[b"nearby_queue"]:
nearby = list(filter(None, city_val_dict[b"nearby_queue"].split(b",")))
if b"nation_queue" in city_val_dict and city_val_dict[b"nation_queue"]:
nation = list(filter(None, city_val_dict[b"nation_queue"].split(b",")))
if b"megacity_queue" in city_val_dict and city_val_dict[b"megacity_queue"]:
megacity = list(filter(None, city_val_dict[b"megacity_queue"].split(b",")))
return (local, nearby, nation, megacity, use_city_id)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
obj = DeviceDiaryQueue.objects.filter(device_id=device_id, city_id=city_id).first()
if obj and obj.native_queue:
local = list(filter(None, obj.native_queue.split(',')))
if obj and obj.nearby_queue:
nearby = list(filter(None, obj.nearby_queue.split(',')))
if obj and obj.nation_queue:
nation = list(filter(None, obj.nation_queue.split(',')))
if obj and obj.megacity_queue:
megacity = list(filter(None, obj.megacity_queue.split(',')))
use_city_id = obj.city_id if obj else use_city_id
return (local, nearby, nation, megacity, use_city_id)
@classmethod
def fetch_diary(cls, device_id, card_type, city_id, size):
def read_history(cid_list):
if redis_client.exists(today_key):
redis_client.sadd(today_key, *cid_list)
else:
redis_client.sadd(today_key, *cid_list)
redis_client.expire(today_key, 15 * 24 * 60 * 60)
if redis_client.exists(read_key) and redis_client.exists(old_key):
redis_client.sdiffstore(read_key, read_key, old_key)
redis_client.delete(old_key)
redis_client.expire(read_key, time=13 * 24 * 60 * 60)
redis_client.sadd(read_key, *cid_list)
def dislike_cid_filter(device_id, cid_list):
try:
key = str(device_id) + "_dislike_diary"
if gmkv.exists(key):
value = gmkv.smembers(key)
cid_list = [i for i in cid_list if str(i) not in value]
return cid_list
except:
return cid_list
def fetch_after_filter_queue(device_id, city_id):
local = list()
nearby = list()
nation = list()
megacity = list()
use_city_id = city_id
try:
specify_city_id_key = "device_diary_queue_after_filter:device_id:" + device_id + ":city_id:" + use_city_id
if gmkv.exists(specify_city_id_key):
queue = gmkv.get(specify_city_id_key).split(b";")
local = list(filter(None, queue[0].split(b",")))
nearby = list(filter(None, queue[1].split(b",")))
nation = list(filter(None, queue[2].split(b",")))
megacity = list(filter(None, queue[3].split(b",")))
return (local, nearby, nation, megacity)
else:
return local, nearby, nation, megacity
except:
return local, nearby, nation, megacity
def write_after_filter_queue(device_id, city_id, local, nearby, megacity, nation):
try:
specify_city_id_key = "device_diary_queue_after_filter:device_id:" + device_id + ":city_id:" + city_id
queue = local + ";" + nearby + ";" + nation + ";" + megacity
if gmkv.exists(specify_city_id_key):
gmkv.set(specify_city_id_key, queue)
else:
gmkv.set(specify_city_id_key, queue, ex=6 * 60 * 60)
except:
logging_exception()
logger.error("catch exception,err_log:%s" % traceback.format_exc())
def get_data(local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size):
nx = int(round(x * 1.0 / (x + y + z + m) * size))
ny = int(round(y * 1.0 / (x + y + z + m) * size))
nz = int(round(z * 1.0 / (x + y + z + m) * size))
nm = int(round(m * 1.0 / (x + y + z + m) * size))
nxyz = [nx, ny, nm, nz]
xyz = [x, y, m, z]
counter = Counter([nx, ny, nm, nz])
if counter[0] == 2:
nxyz[nxyz.index(0)] += size - sum(nxyz)
else:
nxyz[xyz.index(max(xyz))] += size - sum(nxyz)
nx, ny, nm, nz = nxyz
local_filter = dislike_cid_filter(device_id, cx)
if len(local_filter) == 0:
local_filter = dislike_cid_filter(device_id, local)
slocal = local_filter[:nx]
have_x = local_filter[nx:]
x_str = ",".join([str(i) for i in have_x])
ny += (nx - len(slocal))
nearby_filter = dislike_cid_filter(device_id, cy)
if len(nearby_filter) == 0:
nearby_filter = dislike_cid_filter(device_id, nearby)
snearby = nearby_filter[:ny]
have_y = nearby_filter[ny:]
y_str = ",".join([str(i) for i in have_y])
nm += (ny - len(snearby))
megacity_filter = dislike_cid_filter(device_id, cm)
if len(megacity_filter) == 0:
megacity_filter = dislike_cid_filter(device_id, megacity)
smegacity = megacity_filter[:nm]
have_m = megacity_filter[nm:]
m_str = ",".join([str(i) for i in have_m])
nz += (nm - len(smegacity))
nation_filter = dislike_cid_filter(device_id, cz)
if len(nation_filter) == 0:
nation_filter = dislike_cid_filter(device_id, nation)
snation = nation_filter[:nz]
have_z = snation[nz:]
z_str = ",".join([str(i) for i in have_z])
return chain(slocal, snearby, smegacity, snation), x_str, y_str, m_str, z_str
if device_id != '0':
portrait_list = list()
click_diary_size = 1
search_diary_size = 4
read_key = "TS:recommend_diary_set:device_id:" + str(device_id)
old_key = "TS:recommend_diary_set:device_id:{}:{}"\
.format(device_id, (datetime.date.today() - datetime.timedelta(days=14)).strftime("%Y-%m-%d"))
today_key = "TS:recommend_diary_set:device_id:{}:{}"\
.format(device_id, datetime.date.today().strftime("%Y-%m-%d"))
user_portrait_diary_key = 'user_portrait_recommend_diary_queue:device_id:%s:%s' % \
(device_id, datetime.datetime.now().strftime('%Y-%m-%d'))
gmkv = redis.Redis(host="172.16.40.135", port=5379, db=2)
if redis_client.exists(user_portrait_diary_key):
user_portrait_diary_dict = redis_client.hgetall(user_portrait_diary_key)
user_portrait_cursor = str(user_portrait_diary_dict[b'cursor'], encoding='utf-8')
if user_portrait_cursor == '0':
if b'len_cursor' in user_portrait_diary_dict.keys():
user_portrait_diary_list = json.loads(user_portrait_diary_dict[b'diary_queue'])
filter_user_portrait_diary_list = dislike_cid_filter(device_id, user_portrait_diary_list)
if len(filter_user_portrait_diary_list) > size:
portrait_list = filter_user_portrait_diary_list[:size]
redis_client.hset(user_portrait_diary_key, 'diary_queue',
json.dumps(filter_user_portrait_diary_list[size:]))
portrait_list = list(map(int, portrait_list))
read_history(portrait_list)
return portrait_list
else:
size = size - len(filter_user_portrait_diary_list)
portrait_list = filter_user_portrait_diary_list
redis_client.delete(user_portrait_diary_key)
search_diary_recommend_key = "TS:search_recommend_diary_queue:device_id:" + str(device_id)
search_list = list()
if redis_client.exists(search_diary_recommend_key) and size > 3:
search_diary_recommend_dict = redis_client.hgetall(search_diary_recommend_key)
search_diary_recommend_list = json.loads(search_diary_recommend_dict[b'diary_queue'])
search_diary_recommend_list = dislike_cid_filter(device_id, search_diary_recommend_list)
if len(search_diary_recommend_list) == 0:
redis_client.delete(search_diary_recommend_key)
elif len(search_diary_recommend_list) <= search_diary_size:
search_list = search_diary_recommend_list
size = size - len(search_diary_recommend_list)
redis_client.delete(search_diary_recommend_key)
else:
search_list = search_diary_recommend_list[:search_diary_size]
size = size - search_diary_size
redis_client.hset(search_diary_recommend_key, 'diary_queue',
json.dumps(search_diary_recommend_list[search_diary_size:]))
if size <= 0:
portrait_list.extend(search_list)
portrait_list = list(map(int, portrait_list))
read_history(portrait_list)
return portrait_list
diary_recommend_key = "TS:recommend_diary_queue:device_id:" + str(device_id)
ts_recommend_list = list()
if redis_client.exists(diary_recommend_key) and size > 0:
diary_recommend_dict = redis_client.hgetall(diary_recommend_key)
diary_recommend_list = json.loads(diary_recommend_dict[b'diary_queue'])
diary_recommend_list = dislike_cid_filter(device_id, diary_recommend_list)
if len(diary_recommend_list) == 0:
redis_client.delete(diary_recommend_key)
elif len(diary_recommend_list) <= click_diary_size:
ts_recommend_list = diary_recommend_list
redis_client.delete(diary_recommend_key)
size = size - len(ts_recommend_list)
else:
size = size - click_diary_size
ts_recommend_list = diary_recommend_list[:click_diary_size]
diary_recommend_list_json = json.dumps(diary_recommend_list[click_diary_size:])
redis_client.hset(diary_recommend_key, 'diary_queue', diary_recommend_list_json)
if size <= 0:
portrait_list.extend(search_list)
portrait_list.extend(ts_recommend_list)
portrait_list = list(map(int, portrait_list))
read_history(portrait_list)
return portrait_list
if size > 0:
try:
(local, nearby, nation, megacity, city_id) = cls.fetch_device_diary_queue_data(city_id,
device_id)
if len(local) == 0 and len(nearby) == 0 and len(nation) == 0 and len(megacity) == 0:
(local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
except:
logging_exception()
(local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
x, y, m, z = cls.get_city_scale(city_id)
cx, cy, cm, cz = fetch_after_filter_queue(device_id, city_id)
data, x_str, y_str, m_str, z_str = get_data(
local, nearby, nation, megacity,
cx, cy, cm, cz,
x, y, z, m, size)
write_after_filter_queue(device_id, city_id, x_str, y_str, m_str, z_str)
portrait_list.extend(search_list)
portrait_list.extend(ts_recommend_list)
portrait_list.extend(data)
if len(portrait_list) == 0:
(local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
portrait_list = cls.get_queue(local, nearby, nation, megacity,
device_id, city_id, size, x, y, z, m)
portrait_list = list(map(int, portrait_list))
if len(portrait_list) != 0:
read_history(portrait_list)
return portrait_list
else:
try:
(local, nearby, nation, megacity, city_id) = cls.fetch_device_diary_queue_data(city_id, device_id)
if len(local) == 0 and len(nearby) == 0 and len(nation) == 0 and len(megacity) == 0:
(local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
except:
logging_exception()
(local, nearby, nation, megacity, city_id) = cls.fetch_diary_queue_data(city_id)
x, y, m, z = cls.get_city_scale(city_id)
data = cls.get_queue(local, nearby, nation, megacity, device_id, city_id, size, x, y, z, m)
return data
@classmethod
def get_queue(cls,local, nearby, nation, megacity, device_id,city_id,size,x,y,z,m):
key = '{device_id}-{city_id}-{date}'.format(device_id=device_id,
city_id=city_id, date=RecommendFeed.current_date())
counter_key = key + '-counter_v1'
counter = redis_client.incr(counter_key)
if counter == 1:
redis_client.expire(counter_key, 24 * 60 * 60)
cursor_key = key + '-cursor_v1'
cursor = redis_client.get(cursor_key) or b'0-0-0-0'
cx, cy, cm, cz = map(int, cursor.split(b'-'))
def get_scale(local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size):
nx = int(round(x * 1.0 / (x + y + z + m) * size))
ny = int(round(y * 1.0 / (x + y + z + m) * size))
nz = int(round(z * 1.0 / (x + y + z + m) * size))
nm = int(round(m * 1.0 / (x + y + z + m) * size))
nxyz = [nx, ny, nm, nz]
xyz = [x, y, m, z]
counter = Counter([nx, ny, nm, nz])
if counter[0] == 2:
nxyz[nxyz.index(0)] += size - sum(nxyz)
else:
nxyz[xyz.index(max(xyz))] += size - sum(nxyz)
nx, ny, nm, nz = nxyz
slocal = local[cx:cx + nx]
cx = min(cx + nx, len(local))
ny += (nx - len(slocal))
snearby = nearby[cy:cy + ny]
cy = min(cy + ny, len(nearby))
nm += (ny - len(snearby))
smegacity = megacity[cm: cm + nm]
cm = min(cm + nm, len(megacity))
nz += (nm - len(smegacity))
snation = nation[cz:cz + nz]
cz = min(cz + nz, len(nation))
return chain(slocal, snearby, smegacity, snation), cx, cy, cm, cz
data, ncx, ncy, ncm, ncz = get_scale(
local, nearby, nation, megacity,
cx, cy, cm, cz,
x, y, z, m, size)
if ncx == cx and ncy == cy: # native queue and nearby queue
logger.info("diary queue reach end,cx:%d,cy:%d,cm:%d,cz:%d", cx, cy, cm, cz)
ncx = ncy = ncm = ncz = 0
val = '-'.join(map(str, [ncx, ncy, ncm, ncz]))
redis_client.set(cursor_key, val, ex=24 * 60 * 60)
return list(map(int, data))
@staticmethod
def get_city_scale(city_id):
try:
c = CityScale.objects.get(city_id=city_id)
x, y, z, m = c.native, c.nearby, c.nation, c.megacity
except CityScale.DoesNotExist:
try:
c = City.objects.get(id=city_id)
if c.level in (CITY_LEVEL.SUPER, CITY_LEVEL.ONE):
x, y, m, z = 4, 3, 0, 3
elif c.level == CITY_LEVEL.TWO:
x, y, m, z = 3, 3, 0, 3
elif c.level == CITY_LEVEL.THREE:
x, y, m, z = 1, 4, 0, 5
else:
x, y, m, z = 0, 0, 0, 10
except City.DoesNotExist:
x, y, m, z = 0, 0, 0, 10
return x, y, m, z
@staticmethod
def get_scale_data(local, nearby, nation, megacity, cx, cy, cm, cz, x, y, z, m, size):
"""
:param local: local diary queue
:param nearby: nearby diary queue
:param nation: nation diary queue
:param megacity: megacity diary queue
:param cx: seen local diary offset
:param cy: seen nearby diary offset
:param cz: seen nation diary offset
:param cm: seen megacity diary offset
:param x: local diary scale factor
:param y: nearby diary scale factor
:param z: nation diary scale factor
:param m: megacity diary scale factor
:param size: nubmer of diary
:return:
"""
# 本地 临近 特大城市 全国 四个层级 都按照的是四舍五入取得方式
# 针对出现的问题,本次相应的优化是:
# 1、如果出现两个层级为零,且有剩余坑位时,则按照本地 临近 全国的优先级,先给优先级高且为零的层级一个坑位。
# 2、如果所有层级都非零,且有剩余坑位时,则优先给权重占比大的层级一个坑位。
# 3、如果只有一个层级为零,且有剩余坑位时,则优先填充权重占比大的层级一个坑位。
nx = int(round(x * 1.0 / (x + y + z + m) * size))
ny = int(round(y * 1.0 / (x + y + z + m) * size))
nz = int(round(z * 1.0 / (x + y + z + m) * size))
nm = int(round(m * 1.0 / (x + y + z + m) * size))
nxyz = [nx, ny, nm, nz]
xyz = [x, y, m, z]
counter = Counter([nx, ny, nm, nz])
if counter[0] == 2:
nxyz[nxyz.index(0)] += size - sum(nxyz)
else:
nxyz[xyz.index(max(xyz))] += size - sum(nxyz)
nx, ny, nm, nz = nxyz
slocal = local[cx:cx + nx]
cx = min(cx + nx, len(local))
ny += (nx - len(slocal))
snearby = nearby[cy:cy + ny]
cy = min(cy + ny, len(nearby))
nm += (ny - len(snearby))
smegacity = megacity[cm: cm + nm]
cm = min(cm + nm, len(megacity))
nz += (nm - len(smegacity))
snation = nation[cz:cz + nz]
cz = min(cz + nz, len(nation))
return chain(slocal, snearby, smegacity, snation), cx, cy, cm, cz
import os
import time
from config import *
# 定期删除特定文件夹内特征的文件
def remove_files(fileDir):
for eachFile in os.listdir(fileDir):
condition_a = os.path.isfile(fileDir + "/" + eachFile)
condition_b = ("DiaryTop3000.csv" in eachFile) or ("output.txt" in eachFile) or ("feed" in eachFile)
if condition_a and condition_b:
ft = os.stat(fileDir + "/" + eachFile)
ltime = int(ft.st_mtime)
# 删除5分钟前的文件
ntime = int(time.time()) - 5*60
if ltime <= ntime:
os.remove(fileDir + "/" + eachFile)
def delete_log():
for eachFile in os.listdir("/tmp"):
if "xlearn" in eachFile:
os.remove("/tmp" + "/" + eachFile)
if __name__ == "__main__":
while True:
delete_log()
remove_files(DIRECTORY_PATH + "result")
print("运行一次")
time.sleep(5*60)
......@@ -8,6 +8,7 @@ import time
from pyspark import StorageLevel
def all_click(x):
total = []
sum = 0
......@@ -136,7 +137,137 @@ def cpc_click(x):
return total
def os_all_click(x,os):
total = []
sum = 0
date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d")
print("美购搜索点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
"where partition_date='{}'and action = 'search_result_welfare_click_item' "
"and app['version'] >='7.14.0' and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
total.append(tmp)
sum = sum + tmp
print("美购首页相关推荐")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
"where partition_date='{}'and action = 'goto_welfare_detail' "
"and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' "
"and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
total.append(tmp)
sum = sum + tmp
home_page_sum = 0
print("首页点击'全部'icon按钮进入的列表-美购卡片点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
"and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
"and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' "
"and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
home_page_sum = home_page_sum + tmp
print("首页点击icon进入的列表-美购卡片点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
"where partition_date='{}'and action = 'goto_welfare_detail' "
"and app['version'] >='7.14.0' "
"and params['from'] = 'category' and params['cpc_referer'] = '19' "
"and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
home_page_sum = home_page_sum + tmp
total.append(home_page_sum)
sum = sum + home_page_sum
meigou_homepage_sum = 0
print("美购首页'全部'点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
"and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
"and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' "
"and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
meigou_homepage_sum = meigou_homepage_sum + tmp
print("美购首页icon美购点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
"and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
"and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' "
"and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
meigou_homepage_sum = meigou_homepage_sum + tmp
total.append(meigou_homepage_sum)
sum = sum + meigou_homepage_sum
total.append(sum)
return total
def os_cpc_click(x,os):
total = []
sum = 0
date = (datetime.date.today() - datetime.timedelta(days=x)).strftime("%Y%m%d")
print("美购搜索点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
"where partition_date='{}'and action = 'search_result_welfare_click_item' "
"and app['version'] >='7.14.0' and params['is_cpc'] = '1' "
"and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
total.append(tmp)
sum = sum + tmp
print("美购首页相关推荐")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
"where partition_date='{}'and action = 'goto_welfare_detail' "
"and app['version'] >='7.14.0' and params['from'] = 'welfare_home_list_item' "
"and params['is_cpc'] = '1' "
"and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
total.append(tmp)
sum = sum + tmp
home_page_sum = 0
print("首页点击'全部'icon按钮进入的列表-美购卡片点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
"and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
"and params['from'] = 'welfare_list' and params['cpc_referer'] = '6' "
"and params['is_cpc'] = '1' and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
home_page_sum = home_page_sum + tmp
print("首页点击icon进入的列表-美购卡片点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates "
"where partition_date='{}'and action = 'goto_welfare_detail' "
"and app['version'] >='7.14.0' "
"and params['from'] = 'category' and params['cpc_referer'] = '19' "
"and params['is_cpc'] = '1' and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
home_page_sum = home_page_sum + tmp
total.append(home_page_sum)
sum = sum + home_page_sum
meigou_home_sum = 0
print("美购首页'全部'点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
"and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
"and params['from'] = 'welfare_list' and params['cpc_referer'] = '21' "
"and params['is_cpc'] = '1' and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
meigou_home_sum = meigou_home_sum + tmp
print("美购首页icon美购点击")
tmp = spark.sql("select count(*) from online.bl_hdfs_maidian_updates where partition_date='{}' "
"and action = 'goto_welfare_detail' and app['version'] >='7.14.0' "
"and params['from'] = 'welfare_list' and params['cpc_referer'] = '18' "
"and params['is_cpc'] = '1' and device['device_type'] = '{}'"
.format(date,os)).rdd.map(lambda x: x[0]).collect()[0]
meigou_home_sum = meigou_home_sum + tmp
total.append(meigou_home_sum)
sum = sum + meigou_home_sum
total.append(sum)
return total
if __name__ == '__main__':
......@@ -149,21 +280,26 @@ if __name__ == '__main__':
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
all_list = []
for i in range(1,27):
date_str = (datetime.date.today() - datetime.timedelta(days=i)).strftime("%Y%m%d")
tmp_list = [date_str]
tmp_list.extend(all_click(i))
tmp_list.extend(cpc_click(i))
all_list.append(tmp_list)
df = pd.DataFrame(all_list)
df = df.rename(columns={0: "date",1: "search", 2: "xiangguan",3:"home",4:"service_home",
5: "all_clcik",
6: "cpc_search", 7: "cpc_xiangguan",8:"cpc_home",9:"cpc_service_home",
10:"cpc_all"})
df.to_csv('/home/gmuser/cpc.csv', index=False)
for os in ["ios","android"]:
all_list = []
for i in range(1,27):
date_str = (datetime.date.today() - datetime.timedelta(days=i)).strftime("%Y%m%d")
tmp_list = [date_str]
tmp_list.extend(os_all_click(i,os))
tmp_list.extend(os_cpc_click(i,os))
all_list.append(tmp_list)
df = pd.DataFrame(all_list)
df = df.rename(columns={0: "date",1: "search", 2: "xiangguan",3:"home",4:"service_home",
5: "all_clcik",
6: "cpc_search", 7: "cpc_xiangguan",8:"cpc_home",9:"cpc_service_home",
10:"cpc_all"})
df.to_csv('/home/gmuser/cpc_{}.csv'.format(os), index=False)
# df = df.rename(columns={0: "date",1: "search", 2: "xiangguan",3:"home",4:"service_home",
# 5: "all_clcik",
# 6: "cpc_search", 7: "cpc_xiangguan",8:"cpc_home",9:"cpc_service_home",
# 10:"cpc_all"})
# df.to_csv('/home/gmuser/cpc.csv', index=False)
spark.stop()
......
import numpy as np, pandas as pd
from sklearn.cluster import DBSCAN
from shapely.geometry import MultiPoint
import geopandas
import shapefile
from matplotlib import pyplot as plt
data = pd.read_csv("/Users/mac/Downloads/location.csv")
data.drop(["device_id", "partition_date"], axis=1, inplace=True)
data = data[["lat", "lng"]]
data = data.as_matrix().astype("float32", copy=False)#convert to array
plt.title("beijing location")
plt.scatter(latlngs[:, 0], latlngs[:, 1], s=1, c="black", marker='.')
border_shape = shapefile.Reader(shape_path)
border_shape_2 = shapefile.Reader(shape_path_2huan)
border_shape_5 = shapefile.Reader(shape_path_5huan)
border = border_shape.shapes()
border_2 = border_shape_2.shapes()
border_5 = border_shape_5.shapes()
# 聚类中心区域
def get_centermost_point(cluster):
centroid = (MultiPoint(cluster).centroid.x, MultiPoint(cluster).centroid.y)
print(centroid)
return tuple(centroid)
# #渲染聚类结果
for border_detail in clusters:
x, y = [], []
for cell in border_detail:
x.append(cell[0])
y.append(cell[1])
plt.scatter(x, y, marker='o')
plt.show()
# coding=utf-8
import numpy as np
from scipy.spatial.distance import cdist
import matplotlib.pyplot as plt
import seaborn as sns
sns.set()
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import pandas as pd
data = pd.read_csv("/Users/mac/Downloads/location.csv")
data.drop(["device_id", "partition_date"], axis=1, inplace=True)
data = data[["lat", "lng"]]
data = data.as_matrix().astype("float32", copy=False)#convert to array
#数据预处理,特征标准化,每一维是零均值和单位方差
stscaler = StandardScaler().fit(data)
data = stscaler.transform(data)
#画出x和y的散点图
plt.scatter(data[:, 0], data[:, 1])
plt.xlabel("lat")
plt.ylabel("lng")
plt.title("beijng_users")
# plt.savefig("results/wholesale.png", format="PNG")
dbsc = DBSCAN(eps=0.5, min_samples=15).fit(data)
labels = dbsc.labels_ #聚类得到每个点的聚类标签 -1表示噪点
#print(labels)
core_samples = np.zeros_like(labels, dtype=bool) #构造和labels一致的零矩阵,值是false
core_samples[dbsc.core_sample_indices_] = True
#print(core_samples)
unique_labels = np.unique(labels)
colors = plt.cm.Spectral(np.linspace(0, 1, len(unique_labels))) #linespace返回在【0,1】之间均匀分布数字是len个,Sepectral生成len个颜色
#print(zip(unique_labels,colors))
for (label, color) in zip(unique_labels, colors):
class_member_mask = (labels == label)
print(class_member_mask&core_samples)
xy = data[class_member_mask & core_samples]
plt.plot(xy[:, 0], xy[:, 1], 'o', markerfacecolor=color, markersize=10)
xy2 = data[class_member_mask & ~core_samples]
plt.plot(xy2[:, 0], xy2[:, 1], 'o', markerfacecolor=color, markersize=5)
plt.title("DBSCAN on beijing_users")
plt.xlabel("lat (scaled)")
plt.ylabel("lng (scaled)")
# plt.savefig("results/(0.9,15)dbscan_wholesale.png", format="PNG")
import redis
if __name__ == "__main__":
topic_key()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment