Commit ab3af1d2 authored by 赵威's avatar 赵威

Merge branch 'offic' into 'master'

Offic

See merge request !62
parents c1824411 7730892b
from elasticsearch import Elasticsearch as Es from elasticsearch import Elasticsearch as Es
import json
def get_es(): def get_es():
...@@ -38,4 +39,14 @@ def es_mquery(doc, body, es=None): ...@@ -38,4 +39,14 @@ def es_mquery(doc, body, es=None):
es = get_es() es = get_es()
index = es_index_adapt(index_prefix='gm-dbmw', doc_type=doc, rw='read') index = es_index_adapt(index_prefix='gm-dbmw', doc_type=doc, rw='read')
res = es.msearch(body, index=index) res = es.msearch(body, index=index)
return res return res
\ No newline at end of file
def es_insert_device_info(device_id, body, es=None, rw=None):
if es is None:
es = get_es()
index = es_index_adapt(index_prefix="gm-dbmw", doc_type="device", rw=None)
bulk_head = '{"index": {"_id":"%s"}}' % device_id
data_str = json.dumps(body, ensure_ascii=False)
bulk_one_body = bulk_head + "\n" + data_str + "\n"
return es.bulk(index=index, doc_type="device", body=bulk_one_body)
...@@ -4,13 +4,12 @@ import json ...@@ -4,13 +4,12 @@ import json
import operator import operator
from collections import Counter from collections import Counter
import redis
import pymysql import pymysql
from pyspark import SparkConf from pyspark import SparkConf
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from tool import (get_doris_prod, get_jerry_test, get_tag3_user_log, send_email, write_user_portrait,
write_user_portrait_by_event, write_user_portrait_doris) from tool import (get_doris_prod, get_redis_client, get_tag3_user_log, get_user_portrait_tag3_with_score, send_email,
write_user_portrait, write_user_portrait_by_event, write_user_portrait_doris)
# [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}] # [{'激光': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873}, {'手术': 1.949194898204873, '植发际线': 7.1}]
...@@ -255,7 +254,10 @@ def update_tag3_user_portrait(cl_id): ...@@ -255,7 +254,10 @@ def update_tag3_user_portrait(cl_id):
res.update(tmp_res) res.update(tmp_res)
key = "doris:user_portrait:tag3:device_id:" + str(cl_id) key = "doris:user_portrait:tag3:device_id:" + str(cl_id)
redis_client = redis.StrictRedis.from_url("redis://:ReDis!GmTx*0aN9@172.16.40.173:6379") redis_client = get_redis_client()
anecdote_tags_scores = get_user_portrait_tag3_with_score(cl_id).get("anecdote_tags", {})
res["anecdote_tags"] = anecdote_tags_scores
if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \ if (len(first_demands_score.keys()) > 0) or (len(second_demands_score.keys()) > 0) or \
(len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \ (len(first_solutions_score.keys()) > 0) or (len(second_solutions_score.keys()) > 0) or \
...@@ -269,6 +271,15 @@ def update_tag3_user_portrait(cl_id): ...@@ -269,6 +271,15 @@ def update_tag3_user_portrait(cl_id):
",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
",".join(projects_score.keys())) ",".join(projects_score.keys()))
# body = {}
# for (k, v) in res.items():
# body[k] = list(v.keys())
# body["device_id"] = cl_id
# body["last_modified"] = datetime.datetime.strftime(datetime.datetime.now(pytz.timezone("Asia/Shanghai")),
# "%Y-%m-%dT%H:%M:%S.%f")[:-7] + "Z"
# es_insert_device_info(cl_id, body)
# write_user_portrait_doris(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()), # write_user_portrait_doris(cl_id, ",".join(first_solutions_score.keys()), ",".join(second_solutions_score.keys()),
# ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()), # ",".join(first_demands_score.keys()), ",".join(second_demands_score.keys()),
# ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()), # ",".join(first_positions_score.keys()), ",".join(second_positions_score.keys()),
...@@ -299,6 +310,7 @@ def consume_kafka(): ...@@ -299,6 +310,7 @@ def consume_kafka():
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate() spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN") spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py") spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
# spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/es_tool.py")
device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000) device_ids_lst_rdd = spark.sparkContext.parallelize(device_ids_lst, numSlices=1000)
...@@ -318,9 +330,8 @@ def consume_kafka(): ...@@ -318,9 +330,8 @@ def consume_kafka():
if __name__ == "__main__": if __name__ == "__main__":
start = datetime.datetime.now() start = datetime.datetime.now()
# update_tag3_user_portrait("862460044588666")
# update_tag3_user_portrait("androidid_a25a1129c0b38f7b") # update_tag3_user_portrait("androidid_a25a1129c0b38f7b")
# device_id = "862460044588666"
# update_tag3_user_portrait(device_id)
consume_kafka() consume_kafka()
end = datetime.datetime.now() end = datetime.datetime.now()
......
...@@ -503,3 +503,45 @@ def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first ...@@ -503,3 +503,45 @@ def write_user_portrait_by_event(cl_id, first_solutions, second_solutions, first
except Exception as e: except Exception as e:
print("write db error") print("write db error")
print(e) print(e)
def get_redis_client():
return redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN9@172.16.40.173:6379')
def get_user_portrait_tag3_from_redis(device_id, limit_score=0):
def items_gt_score(d):
new_d = dict(sorted(d.items(), key=lambda x: x[1], reverse=True))
res = {tag: float(score) for tag, score in new_d.items() if float(score) >= limit_score}
return list(res.keys())
portrait_key = "doris:user_portrait:tag3:device_id:" + str(device_id)
redis_client = get_redis_client()
if redis_client.exists(portrait_key):
user_portrait = json.loads(redis_client.get(portrait_key))
first_demands = items_gt_score(user_portrait.get("first_demands", {}))
second_demands = items_gt_score(user_portrait.get("second_demands", {}))
first_solutions = items_gt_score(user_portrait.get("first_solutions", {}))
second_solutions = items_gt_score(user_portrait.get("second_solutions", {}))
first_positions = items_gt_score(user_portrait.get("first_positions", {}))
second_positions = items_gt_score(user_portrait.get("second_positions", {}))
projects = items_gt_score(user_portrait.get("projects", {}))
anecdote_tags = items_gt_score(user_portrait.get("anecdote_tags", {}))
return {
"first_demands": first_demands,
"second_demands": second_demands,
"first_solutions": first_solutions,
"second_solutions": second_solutions,
"first_positions": first_positions,
"second_positions": second_positions,
"projects": projects,
"anecdote_tags": anecdote_tags
}
return {}
def get_user_portrait_tag3_with_score(device_id):
portrait_key = "doris:user_portrait:tag3:device_id:" + str(device_id)
redis_client = get_redis_client()
if redis_client.exists(portrait_key):
return json.loads(redis_client.get(portrait_key).decode("utf-8"))
return {}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment