hello.py 4.51 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1 2 3 4 5 6 7 8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
张彦钊's avatar
张彦钊 committed
9
import json
张彦钊's avatar
张彦钊 committed
10
import msgpack
张彦钊's avatar
张彦钊 committed
11
import pymysql
张彦钊's avatar
张彦钊 committed
12 13 14 15 16
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
张彦钊's avatar
张彦钊 committed
17
import redis
张彦钊's avatar
张彦钊 committed
18 19 20 21 22 23 24
import datetime


# filter logging
def gbk_decoder(s):
    if s is None:
        return None
张彦钊's avatar
张彦钊 committed
25 26 27 28 29 30
    try:
        data = msgpack.loads(s,encoding='utf-8')
        return data
    except:
        data = json.loads(s)
        return data
张彦钊's avatar
张彦钊 committed
31 32 33 34


def maidian(x):
    try:
张彦钊's avatar
张彦钊 committed
35 36
        data = json.loads(x[1])
        if 'type' in data and 'device' in data:
张彦钊's avatar
张彦钊 committed
37 38 39 40 41 42 43
            if data['type'] == 'on_click_button' \
                    and data['params']['page_name'] == 'home' and data['params']['tab_name'] == '精选' \
                    and data['params']['button_name'] == 'user_feedback_type' \
                    and data['params']['extra_param'][0]["card_content_type"] == "diary" \
                    and ("1" in data['params']['extra_param'][0]["feedback_type"]
                         or "2" in data['params']['extra_param'][0]["feedback_type"]):
                return True
张彦钊's avatar
张彦钊 committed
44
            else:
张彦钊's avatar
张彦钊 committed
45 46 47 48 49 50 51 52 53 54 55 56
                return False
        else:
            return False

    except Exception as e:
        print("filter fail")
        print(e)


def get_data(x):
    try:
        device_id = x[1]['device']['device_id']
张彦钊's avatar
张彦钊 committed
57
        diary_id = x[1]['params']['extra_param'][0]["card_id"]
张彦钊's avatar
张彦钊 committed
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
        return device_id,diary_id
    except Exception as e:
        print("get_data fail")
        send_email("get_data", "get_data", e)


def write_redis(device_id,cid_list):
    try:
        db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
        sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
              "on a.tag_id = b.id where b.tag_type = '3' and a.diary_id in {}".format(tuple(cid_list))
        cursor = db.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
        tags = list(set([i[0] for i in result]))
        if tags is not None:
            sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
                  "on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
                  "where a.is_online = 1 and a.content_level >= '3' " \
                  "and c.id in {} and c.tag_type = '3'".format(tuple(tags))
            cursor.execute(sql)
            result = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
80 81 82
            if result is not None:
                cids = list(set([i[0] for i in result]))
                r = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
张彦钊's avatar
张彦钊 committed
83
                key = str(device_id) + "_dislike_diary"
张彦钊's avatar
张彦钊 committed
84 85 86 87 88 89 90 91
                if r.exists(key):
                    value = eval(r.get(key))
                    value.extend(cids)
                    cids = json.dumps(list(set(value)))
                    r.set(key, json.dumps(cids))
                else:
                    r.set(key, json.dumps(cids))
                    r.expire(key, 7*24*60*60)
张彦钊's avatar
张彦钊 committed
92
    except Exception as e:
张彦钊's avatar
张彦钊 committed
93 94 95 96 97 98
        print("insert redis fail")
        print(e)


def model(rdd):
    try:
张彦钊's avatar
张彦钊 committed
99
        rdd.filter(lambda x: maidian(x)).map(lambda x:get_data(x).na.drop().groupByKey())\
张彦钊's avatar
张彦钊 committed
100
            .map(lambda x:write_redis(x[0],x[1]))
张彦钊's avatar
张彦钊 committed
101
    except Exception as e:
张彦钊's avatar
张彦钊 committed
102
        print("fail")
张彦钊's avatar
张彦钊 committed
103
        print(e)
张彦钊's avatar
张彦钊 committed
104 105


张彦钊's avatar
张彦钊 committed
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
if __name__ == '__main__':
    sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike_filter").set(
        "spark.io.compression.codec", "lzf"))
    ssc = StreamingContext(sc, 10)
    sc.setLogLevel("WARN")
    kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
                   "group.id": "dislike",
                   "socket.timeout.ms": "600000",
                   "auto.offset.reset": "largest"}
    try:
        stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams, keyDecoder=gbk_decoder,
                                               valueDecoder=gbk_decoder)
        transformstream = stream.transform(lambda x: model(x))
        transformstream.pprint()
        ssc.start()
        ssc.awaitTermination()
    except Exception as e:
        print(e)
张彦钊's avatar
张彦钊 committed
124
        # send_email(sc.appName, sc.applicationId, e)
张彦钊's avatar
张彦钊 committed
125

张彦钊's avatar
张彦钊 committed
126