Commit 3b6fa0b2 authored by 赵威's avatar 赵威

use pandas

parent 7ff8f681
......@@ -166,12 +166,12 @@ def get_click_data(spark, card_type, start, end):
WHERE (spam_pv.device_id IS NULL or spam_pv.device_id ='')
and (dev.device_id is null or dev.device_id ='')
""".format(start, end, card_type, reg, start, end, end, start, end, end, end, end, end)
return spark.sql(sql)
return spark.sql(sql).toPandas()
def get_exposure_data(spark, card_type, start, end):
reg = r"""^\\d+$"""
return spark.sql("""
sql = """
SELECT DISTINCT
t1.cl_id device_id,
cast(card_id AS int) card_id,
......@@ -260,7 +260,8 @@ def get_exposure_data(spark, card_type, start, end):
,'promotion_qianka','promotion_xiaoyu','promotion_dianru','promotion_malioaso','promotion_malioaso-shequ'
,'promotion_shike','promotion_julang_jl03','promotion_zuimei')
AND t2.first_channel_source_type not LIKE 'promotion\\_jf\\_%'
""".format(reg, card_type, end, start, end, end, end, end, end, start, end))
""".format(reg, card_type, end, start, end, end, end, end, end, start, end)
return spark.sql(sql).toPandas()
def get_card_feature_df(spark, card_type, yesterday):
......@@ -373,7 +374,7 @@ def get_card_feature_df(spark, card_type, yesterday):
and card_content_type = '{}'
and card_id rlike '{}'
""".format(yesterday, card_type, reg)
return spark.sql(sql)
return spark.sql(sql).toPandas()
def get_device_tags(spark):
......@@ -382,7 +383,7 @@ def get_device_tags(spark):
FROM user_tag3_portrait
WHERE date = '{}'
""".format(get_ndays_before(1))
return spark.sql(sql)
return spark.sql(sql).toPandas()
if __name__ == "__main__":
......@@ -391,15 +392,15 @@ if __name__ == "__main__":
# TODO days 30
start, end = get_ndays_before_no_minus(5), get_ndays_before_no_minus(1)
click_df = get_click_data(spark, card_type, start, end)
click_df.show(5, False)
print(click_df.head(3))
exposure_df = get_exposure_data(spark, card_type, start, end)
exposure_df.show(5, False)
print(exposure_df.head(3))
tractate_feature_df = get_card_feature_df(spark, card_type, end)
tractate_feature_df.show(5, False)
print(tractate_feature_df.head(3))
device_feature_df = get_device_tags(spark)
device_feature_df.show(5, False)
print(device_feature_df.head(3))
# spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/strategy_embedding/dssm/get_tractate_data.py
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment