# -*- coding: utf-8 -*- # from pyspark.context import SparkContext # from pyspark.conf import SparkConf # import pytispark.pytispark as pti # from pyspark.sql import SparkSession import pandas as pd import pymysql from sqlalchemy import create_engine import redis import json def test(): conf = SparkConf().setAppName("My App").set("spark.io.compression.codec", "lzf") sc = SparkContext(conf = conf) spark = SparkSession.builder.enableHiveSupport().getOrCreate() ti = pti.TiContext(spark) ti.tidbMapDatabase("jerry_test") spark = SparkSession.builder.appName("hello test").enableHiveSupport().getOrCreate() spark.sql("use online") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/brickhouse-0.7.1-SNAPSHOT.jar") spark.sql("ADD JAR hdfs:///user/hive/share/lib/udf/hive-udf-1.0-SNAPSHOT.jar") spark.sql("CREATE TEMPORARY FUNCTION json_map AS 'brickhouse.udf.json.JsonMapUDF'") spark.sql("CREATE TEMPORARY FUNCTION is_json AS 'com.gmei.hive.common.udf.UDFJsonFormatCheck'") sql = """select cl_id as device_id,params["business_id"] as cid_id, (params["out"]-params["in"]) as dur_time from online.bl_hdfs_maidian_updates where action="page_view" and params["page_name"]="diary_detail" and partition_date = '20190801' """ df = spark.sql(sql) def con_sql(db,sql): cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() df = pd.DataFrame(list(result)) db.close() return df def write_redis(device_id,cid_list): db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle') sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \ "on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list)) cursor = db.cursor() cursor.execute(sql) result = cursor.fetchall() tags = list(set([i[0] for i in result])) if tags is not None: sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \ "on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \ "where a.is_online = 1 and a.content_level >= '3' " \ "and c.id in {} and c.tag_type = '3'".format(tuple(tags)) cursor.execute(sql) result = cursor.fetchall() if result is not None: cids = list(set([i[0] for i in result])) r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379') key = str(device_id) + "_dislike_diary" if r.exists(key): value = eval(r.get(key)) value.extend(cids) cids = json.dumps(list(set(value))) r.set(key, json.dumps(cids)) if __name__ == '__main__': a = [15202811, 15825403, 16480766, 15432195, 15759876] d = "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6" write_redis(d, a)