Commit 74150978 authored by 赵威's avatar 赵威

try es

parent c1824411
from elasticsearch import Elasticsearch as Es from elasticsearch import Elasticsearch as Es
import json
def get_es(): def get_es():
...@@ -38,4 +39,14 @@ def es_mquery(doc, body, es=None): ...@@ -38,4 +39,14 @@ def es_mquery(doc, body, es=None):
es = get_es() es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw', doc_type=doc, rw='read') index = es_index_adapt(index_prefix='gm-dbmw', doc_type=doc, rw='read')
res = es.msearch(body, index=index) res = es.msearch(body, index=index)
return res return res
\ No newline at end of file
def es_insert_device_info(device_id, body, es=None, rw=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix="gm-dbmw", doc_type="device", rw=None)
bulk_head = '{"index": {"_id":"%s"}}' % device_id
data_str = json.dumps(body, ensure_ascii=False)
bulk_one_body = bulk_head + "\n" + data_str + "\n"
return es.bulk(index=index, doc_type="device", body=bulk_one_body)
...@@ -4,13 +4,15 @@ import json ...@@ -4,13 +4,15 @@ import json
import operator import operator
from collections import Counter from collections import Counter
import redis
import pymysql import pymysql
import redis
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import (get_doris_prod, get_jerry_test, get_tag3_user_log, send_email, write_user_portrait,
write_user_portrait_by_event, write_user_portrait_doris) from es_tool import es_insert_device_info
from tool import (get_doris_prod, get_jerry_test, get_redis_client, get_tag3_user_log, get_user_portrait_tag3_from_redis,
get_user_portrait_tag3_with_score, send_email, write_user_portrait, write_user_portrait_by_event,
write_user_portrait_doris)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}]
...@@ -255,24 +257,28 @@ def update_tag3_user_portrait(cl_id): ...@@ -255,24 +257,28 @@ def update_tag3_user_portrait(cl_id):
res.update(tmp_res) res.update(tmp_res)
key = "doris:user_portrait:tag3:device_id:" + str(cl_id) key = "doris:user_portrait:tag3:device_id:" + str(cl_id)
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379") redis_client = get_redis_client()
anecdote_tags_scores = get_user_portrait_tag3_with_score(cl_id).get("anecdote_tags", {})
print(anecdote_tags_scores)
if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \ if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \
(len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \ (len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \
(len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \ (len(first_positions_score.keys()) > 0) or (len(second_positions_score.keys()) > 0) or \
(len(projects_score.keys()) > 0): (len(projects_score.keys()) > 0):
redis_client.set(key, json.dumps(res)) # TODO
redis_client.expire(key, 60 * 60 * 24 * 180) # redis_client.set(key, json.dumps(res))
# redis_client.expire(key, 60 * 60 * 24 * 180)
write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()), # write_user_portrait(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()), # ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), # ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
",".join(projects_score.keys())) # ",".join(projects_score.keys()))
# write_user_portrait_doris(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()), # # write_user_portrait_doris(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
# ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()), # # ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
# ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), # # ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
# ",".join(projects_score.keys())) # # ",".join(projects_score.keys()))
return cl_id return cl_id
...@@ -299,6 +305,7 @@ def consume_kafka(): ...@@ -299,6 +305,7 @@ def consume_kafka():
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/es_tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
...@@ -319,10 +326,11 @@ if __name__ == "__main__": ...@@ -319,10 +326,11 @@ if __name__ == "__main__":
start = datetime.datetime.now() start = datetime.datetime.now()
# update_tag3_user_portrait("androidid_a25a1129c0b38f7b") # update_tag3_user_portrait("androidid_a25a1129c0b38f7b")
# device_id = "862460044588666" device_id = "862460044588666"
# update_tag3_user_portrait(device_id) update_tag3_user_portrait(device_id)
consume_kafka() # TODO
# consume_kafka()
end = datetime.datetime.now() end = datetime.datetime.now()
print(end - start) print(end - start)
print("done") print("done")
...@@ -503,3 +503,45 @@ def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first ...@@ -503,3 +503,45 @@ def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first
except Exception as e: except Exception as e:
print("write db error") print("write db error")
print(e) print(e)
def get_redis_client():
return redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
def get_user_portrait_tag3_from_redis(device_id, limit_score=0):
def items_gt_score(d):
new_d = dict(sorted(d.items(), key=lambda x: x[1], reverse=True))
res = {tag: float(score) for tag, score in new_d.items() if float(score) >= limit_score}
return list(res.keys())
portrait_key = "doris:user_portrait:tag3:device_id:" + str(device_id)
redis_client = get_redis_client()
if redis_client.exists(portrait_key):
user_portrait = json.loads(redis_client.get(portrait_key))
first_demands = items_gt_score(user_portrait.get("first_demands", {}))
second_demands = items_gt_score(user_portrait.get("second_demands", {}))
first_solutions = items_gt_score(user_portrait.get("first_solutions", {}))
second_solutions = items_gt_score(user_portrait.get("second_solutions", {}))
first_positions = items_gt_score(user_portrait.get("first_positions", {}))
second_positions = items_gt_score(user_portrait.get("second_positions", {}))
projects = items_gt_score(user_portrait.get("projects", {}))
anecdote_tags = items_gt_score(user_portrait.get("anecdote_tags", {}))
return {
"first_demands": first_demands,
"second_demands": second_demands,
"first_solutions": first_solutions,
"second_solutions": second_solutions,
"first_positions": first_positions,
"second_positions": second_positions,
"projects": projects,
"anecdote_tags": anecdote_tags
}
return {}
def get_user_portrait_tag3_with_score(device_id):
portrait_key = "doris:user_portrait:tag3:device_id:" + str(device_id)
redis_client = get_redis_client()
if redis_client.exists(portrait_key):
return json.loads(redis_client.get(portrait_key))
return {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment