Commit 50fdc4ad authored by 赵威's avatar 赵威

get tractate info from es

parent 781e81ef
...@@ -4,7 +4,6 @@ import sys ...@@ -4,7 +4,6 @@ import sys
sys.path.append(os.path.realpath(".")) sys.path.append(os.path.realpath("."))
from utils.date import get_ndays_before, get_ndays_before_no_minus from utils.date import get_ndays_before, get_ndays_before_no_minus
from utils.es import es_scan
from utils.files import save_df_to_csv from utils.files import save_df_to_csv
from utils.spark import (get_card_feature_df, get_click_data, get_device_tags, get_exposure_data, get_spark) from utils.spark import (get_card_feature_df, get_click_data, get_device_tags, get_exposure_data, get_spark)
......
import os import os
import random
import sys import sys
sys.path.append(os.path.realpath(".")) sys.path.append(os.path.realpath("."))
from utils.date import get_ndays_before, get_ndays_before_no_minus from utils.date import get_ndays_before, get_ndays_before_no_minus
from utils.es import es_scan from utils.es import es_scan, get_tractate_info_from_es
from utils.files import save_df_to_csv from utils.files import save_df_to_csv
from utils.spark import (get_click_data, get_device_tags, get_exposure_data, get_spark) from utils.spark import (get_click_data, get_device_tags, get_exposure_data, get_spark)
...@@ -14,16 +15,24 @@ if __name__ == "__main__": ...@@ -14,16 +15,24 @@ if __name__ == "__main__":
days = 5 # TODO days 30 days = 5 # TODO days 30
start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1) start, end = get_ndays_before_no_minus(days), get_ndays_before_no_minus(1)
click_df = get_click_data(spark, card_type, start, end) # click_df = get_click_data(spark, card_type, start, end)
# save_df_to_csv(click_df, "tractate_click.csv") # # save_df_to_csv(click_df, "personas_tractate_click.csv")
print(click_df.shape) # print(click_df.shape)
exposure_df = get_exposure_data(spark, card_type, start, end) # exposure_df = get_exposure_data(spark, card_type, start, end)
# save_df_to_csv(exposure_df, "tractate_exposure.csv") # # save_df_to_csv(exposure_df, "personas_tractate_exposure.csv")
print(exposure_df.shape) # print(exposure_df.shape)
device_feature_df = get_device_tags(spark) # device_feature_df = get_device_tags(spark)
# save_df_to_csv(device_feature_df, "device_feature.csv") # # save_df_to_csv(device_feature_df, "personas_device_feature.csv")
print(device_feature_df.shape) # print(device_feature_df.shape)
tractate_dict = {}
for item in get_tractate_info_from_es(["id", "portrait_tag_name"]):
id = item["_id"]
business_tags = item["portrait_tag_name"]
tractate_dict[id] = business_tags
print(len(tractate_dict))
print(random.choice(list(tractate_dict.items())))
# spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/personas_vector/get_data.py # spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/personas_vector/get_data.py
...@@ -44,3 +44,100 @@ def es_scan(doc, body, es=None, rw="read"): ...@@ -44,3 +44,100 @@ def es_scan(doc, body, es=None, rw="read"):
es = get_es() es = get_es()
index = es_index_adapt(index_prefix="gm-dbmw", doc_type=doc, rw=rw) index = es_index_adapt(index_prefix="gm-dbmw", doc_type=doc, rw=rw)
return helpers.scan(es, index=index, query=body, request_timeout=100, scroll="300m", raise_on_error=False) return helpers.scan(es, index=index, query=body, request_timeout=100, scroll="300m", raise_on_error=False)
# curl "http://172.16.31.17:9000/gm-dbmw-tractate-read/_search?pretty&size=1" -d '
# '
def get_tractate_info_from_es(fields=["id"]):
q = {
"query": {
"bool": {
"must": [{
"term": {
"is_online": "true"
}
}, {
"terms": {
"content_level": [6, 5, 4, 3.5, 3]
}
}],
"must_not": [{
"term": {
"status": 4
}
}, {
"term": {
"show_by_index": 2
}
}]
}
},
"_source": {
"include": fields
}
}
results = es_scan("diary", q)
return results
# def save_diary_info_from_es():
# q = {
# "query": {
# "bool": {
# "filter": [{
# "term": {
# "is_online": True
# }
# }, {
# "term": {
# "has_cover": True
# }
# }, {
# "term": {
# "is_sink": False
# }
# }, {
# "term": {
# "has_before_cover": True
# }
# }, {
# "term": {
# "has_after_cover": True
# }
# }, {
# "terms": {
# "content_level": [6, 5, 4, 3.5, 3]
# }
# }, {
# "term": {
# "content_simi_bol_show": 0
# }
# }, {
# "exists": {
# "field": "before_cover_url"
# }
# }]
# }
# },
# "_source": {
# "include": ["id"]
# }
# }
# count = 0
# # before_res_dict = {}
# after_res_dict = {}
# results = es_scan("diary", q)
# for item in results:
# diary_id = item["_id"]
# # before_cover_url = item["_source"]["before_cover_url"] + "-w"
# # before_img = url_to_ndarray(before_cover_url)
# after_cover_url = item["_source"]["after_cover_url"] + "-w"
# img = url_to_ndarray(after_cover_url)
# if img.any():
# count += 1
# print("count: " + str(count) + " " + str(diary_id))
# faces = FACE_TO_VEC_FUN(img)
# for face in faces:
# after_res_dict[diary_id] = face["feature"]
# redis_client_db.hmset(DIARY_AFTER_COVER_FEATURE_KEY, after_res_dict)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment