test.py 2.96 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1
# -*- coding: utf-8 -*-
张彦钊's avatar
张彦钊 committed
2 3 4 5 6 7 8
# from pyspark.context import SparkContext
# from pyspark.conf import SparkConf
# import pytispark.pytispark as pti
# from pyspark.sql import SparkSession
import pandas as pd
import pymysql
from sqlalchemy import create_engine
张彦钊's avatar
张彦钊 committed
9 10
import redis
import json
张彦钊's avatar
张彦钊 committed
11

张彦钊's avatar
张彦钊 committed
12

张彦钊's avatar
张彦钊 committed
13
def test():
张彦钊's avatar
张彦钊 committed
14 15
        conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf")
        sc = SparkContext(conf = conf)
张彦钊's avatar
张彦钊 committed
16
        spark = SparkSession.builder.enableHiveSupport().getOrCreate()
张彦钊's avatar
张彦钊 committed
17 18
        ti = pti.TiContext(spark)
        ti.tidbMapDatabase("jerry_test")
张彦钊's avatar
张彦钊 committed
19

张彦钊's avatar
张彦钊 committed
20
        spark = SparkSession.builder.appName("hello test").enableHiveSupport().getOrCreate()
张彦钊's avatar
张彦钊 committed
21 22 23 24 25 26

        spark.sql("use online")
        spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar")
        spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar")
        spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'")
        spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'")
张彦钊's avatar
张彦钊 committed
27

张彦钊's avatar
张彦钊 committed
28 29 30 31 32
        sql = """select cl_id as device_id,params["business_id"] as cid_id,
        (params["out"]-params["in"]) as dur_time from online.bl_hdfs_maidian_updates where action="page_view" 
        and params["page_name"]="diary_detail" and partition_date = '20190801'
       """
        df = spark.sql(sql)
张彦钊's avatar
张彦钊 committed
33 34


张彦钊's avatar
张彦钊 committed
35

张彦钊's avatar
张彦钊 committed
36 37 38 39 40 41 42
def con_sql(db,sql):
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    df = pd.DataFrame(list(result))
    db.close()
    return df
张彦钊's avatar
张彦钊 committed
43

张彦钊's avatar
张彦钊 committed
44
def write_redis(device_id,cid_list):
张彦钊's avatar
张彦钊 committed
45 46 47 48 49 50 51 52 53 54 55 56 57

    db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
    sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
          "on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
    cursor = db.cursor()
    cursor.execute(sql)
    result = cursor.fetchall()
    tags = list(set([i[0] for i in result]))
    if tags is not None:
        sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
              "on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
              "where a.is_online = 1 and a.content_level >= '3' " \
              "and c.id in {} and c.tag_type = '3'".format(tuple(tags))
张彦钊's avatar
张彦钊 committed
58 59
        cursor.execute(sql)
        result = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
60
        if result is not None:
张彦钊's avatar
张彦钊 committed
61 62
            cids = list(set([i[0] for i in result]))
            r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
张彦钊's avatar
张彦钊 committed
63
            key = str(device_id) + "_dislike_diary"
张彦钊's avatar
张彦钊 committed
64
            if r.exists(key):
张彦钊's avatar
张彦钊 committed
65 66 67
                value = eval(r.get(key))
                value.extend(cids)
                cids = json.dumps(list(set(value)))
张彦钊's avatar
张彦钊 committed
68 69
            r.set(key, json.dumps(cids))

张彦钊's avatar
张彦钊 committed
70
if __name__ == '__main__':
张彦钊's avatar
张彦钊 committed
71
        a = [15202811, 15825403, 16480766, 15432195, 15759876]
张彦钊's avatar
张彦钊 committed
72 73 74
        d = "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6"
        write_redis(d, a)

张彦钊's avatar
张彦钊 committed
75 76


张彦钊's avatar
张彦钊 committed
77