monitor.py 8.02 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1 2 3
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
张彦钊's avatar
张彦钊 committed
4
import json
张彦钊's avatar
张彦钊 committed
5 6 7 8
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
张彦钊's avatar
张彦钊 committed
9 10 11 12
from pyspark import  SparkConf
import redis
import sys
import os
张彦钊's avatar
张彦钊 committed
13 14
import json
import pymysql
张彦钊's avatar
张彦钊 committed
15 16 17
import numpy as np
import pandas as pd
import time
张彦钊's avatar
张彦钊 committed
18
import datetime
张彦钊's avatar
张彦钊 committed
19

张彦钊's avatar
张彦钊 committed
20 21 22
def Json(x):
    data = json.loads(x[1])
    if 'type' in data and 'device' in data and 'params' in data and 'card_content_type' in data['params']:
张彦钊's avatar
张彦钊 committed
23
        if data['type'] == 'on_click_card' and data["device"]["device_id"] == "E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6":
张彦钊's avatar
张彦钊 committed
24
            return True
张彦钊's avatar
张彦钊 committed
25
        else:
张彦钊's avatar
张彦钊 committed
26
            return False
张彦钊's avatar
张彦钊 committed
27 28
    else:
        return False
张彦钊's avatar
张彦钊 committed
29 30


张彦钊's avatar
张彦钊 committed
31 32
def model(rdd):
    try:
张彦钊's avatar
张彦钊 committed
33 34
        rdd = rdd.filter(lambda x:Json(x)).repartition(10).map(lambda x:get_data(x))\
            .map(lambda x:write_redis(x[0],x[1],x[2]))
张彦钊's avatar
张彦钊 committed
35
        return rdd
张彦钊's avatar
张彦钊 committed
36
    except:
张彦钊's avatar
张彦钊 committed
37
        print("fail")
张彦钊's avatar
张彦钊 committed
38 39


张彦钊's avatar
张彦钊 committed
40 41 42 43
def get_data(x):
    try:
        data = json.loads(x[1])
        device_id = data['device']['device_id']
张彦钊's avatar
张彦钊 committed
44
        diary_id = data['params']["card_id"]
张彦钊's avatar
张彦钊 committed
45 46
        card = data['params']['card_content_type']
        return device_id,diary_id,card
张彦钊's avatar
张彦钊 committed
47 48
    except Exception as e:
        print("get_data fail")
张彦钊's avatar
张彦钊 committed
49
        # send_email("get_data", "get_data", e)
张彦钊's avatar
张彦钊 committed
50

张彦钊's avatar
张彦钊 committed
51

张彦钊's avatar
张彦钊 committed
52 53 54
def write_redis(device_id,cid,card):
    if card == "diary":
        diary_write(device_id, cid)
张彦钊's avatar
张彦钊 committed
55
    elif card == "qa":
张彦钊's avatar
张彦钊 committed
56 57 58 59 60
        question_write(device_id, cid)
    elif card == "user_post":
        tractate_write(device_id, cid)


张彦钊's avatar
张彦钊 committed
61
def tractate_write(device_id, cid):
张彦钊's avatar
张彦钊 committed
62 63
    try:
        db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
张彦钊's avatar
张彦钊 committed
64 65
        sql = "select b.id from src_mimas_prod_api_tractate_tag a left join src_zhengxing_api_tag b " \
              "on a.tag_id = b.id where b.tag_type = '3' and a.tractate_id = {}".format(cid)
张彦钊's avatar
张彦钊 committed
66 67 68
        cursor = db.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
69 70 71
        if len(result) > 0:
            tags = result[0][0]
            if tags is not None:
张彦钊's avatar
张彦钊 committed
72 73
                sql = "select a.id from src_mimas_prod_api_tractate a left join src_mimas_prod_api_tractate_tag b " \
                      "on a.id=b.tractate_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
张彦钊's avatar
张彦钊 committed
74 75 76 77
                      "where a.is_online = 1 and c.id = {} and c.tag_type = '3'".format(tags)
                cursor.execute(sql)
                result = cursor.fetchall()
                db.close()
张彦钊's avatar
张彦钊 committed
78 79

                if len(result) > 0:
张彦钊's avatar
张彦钊 committed
80
                    cids = [str(i[0]) for i in result]
张彦钊's avatar
张彦钊 committed
81

张彦钊's avatar
张彦钊 committed
82
                    r = redis.Redis(host="172.16.40.135", port=5379, password="",db = 2)
张彦钊's avatar
张彦钊 committed
83
                    key = str(device_id) + "_dislike_tractate"
张彦钊's avatar
张彦钊 committed
84
                    if r.exists(key):
张彦钊's avatar
张彦钊 committed
85
                        value = json.loads(r.get(key).decode('utf-8'))
张彦钊's avatar
张彦钊 committed
86
                        value.extend(cids)
张彦钊's avatar
张彦钊 committed
87
                        cids = json.dumps(list(set(value)))
张彦钊's avatar
张彦钊 committed
88 89 90
                        r.set(key, cids)
                        print("cunza")

张彦钊's avatar
张彦钊 committed
91
                    else:
张彦钊's avatar
张彦钊 committed
92
                        r.set(key, json.dumps(cids))
张彦钊's avatar
张彦钊 committed
93
                        r.expire(key, 7 * 24 * 60 * 60)
张彦钊's avatar
张彦钊 committed
94

张彦钊's avatar
张彦钊 committed
95 96

    except Exception as e:
张彦钊's avatar
张彦钊 committed
97
        print("tractate insert redis fail")
张彦钊's avatar
张彦钊 committed
98 99 100 101 102 103 104
        print(e)


def question_write(device_id,cid):
    try:
        db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
        sql = "select b.id from src_mimas_prod_api_questiontag a left join src_zhengxing_api_tag b " \
张彦钊's avatar
张彦钊 committed
105 106
              "on a.tag_id = b.id where b.tag_type = '3' and a.question_id = {}".format(cid)

张彦钊's avatar
张彦钊 committed
107 108 109
        cursor = db.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
110 111 112 113 114
        if len(result) > 0:
            tags = result[0][0]
            if tags is not None:
                sql = "select a.id from src_mimas_prod_api_question a left join src_mimas_prod_api_questiontag b " \
                      "on a.id=b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
张彦钊's avatar
张彦钊 committed
115
                      "where a.is_online = 1 and c.tag_type = '3' and c.id = {}".format(tags)
张彦钊's avatar
张彦钊 committed
116 117 118
                cursor.execute(sql)
                result = cursor.fetchall()
                db.close()
张彦钊's avatar
张彦钊 committed
119
                if len(result) > 0:
张彦钊's avatar
张彦钊 committed
120
                    cids = [str(i[0]) for i in result]
张彦钊's avatar
张彦钊 committed
121

张彦钊's avatar
张彦钊 committed
122
                    r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
张彦钊's avatar
张彦钊 committed
123
                    key = str(device_id) + "_dislike_qa"
张彦钊's avatar
张彦钊 committed
124
                    if r.exists(key):
张彦钊's avatar
张彦钊 committed
125
                        value = json.loads(r.get(key).decode('utf-8'))
张彦钊's avatar
张彦钊 committed
126
                        value.extend(cids)
张彦钊's avatar
张彦钊 committed
127
                        cids = json.dumps(list(set(value)))
张彦钊's avatar
张彦钊 committed
128
                        r.set(key, cids)
张彦钊's avatar
张彦钊 committed
129
                        print("cunza")
张彦钊's avatar
张彦钊 committed
130 131

                    else:
张彦钊's avatar
张彦钊 committed
132
                        r.set(key, json.dumps(cids))
张彦钊's avatar
张彦钊 committed
133
                        r.expire(key, 7 * 24 * 60 * 60)
张彦钊's avatar
张彦钊 committed
134
                        print("bucunza")
张彦钊's avatar
张彦钊 committed
135
            return "question good"
张彦钊's avatar
张彦钊 committed
136 137 138 139 140 141 142

    except Exception as e:
        print("question insert redis fail")
        print(e)


def diary_write(device_id,cid):
张彦钊's avatar
张彦钊 committed
143 144 145
    try:
        db = pymysql.connect(host='172.16.40.158', port=4000, user='root', passwd='3SYz54LS9#^9sBvC', db='eagle')
        sql = "select b.id from src_mimas_prod_api_diary_tags a left join src_zhengxing_api_tag b " \
张彦钊's avatar
张彦钊 committed
146
              "on a.tag_id = b.id where b.tag_type = '3' and a.diary_id = {}".format(cid)
张彦钊's avatar
张彦钊 committed
147 148 149
        cursor = db.cursor()
        cursor.execute(sql)
        result = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
150 151 152 153 154 155 156 157 158 159
        if len(result) > 0:
            tags = result[0][0]
            if tags is not None:
                sql = "select a.id from src_mimas_prod_api_diary a left join src_mimas_prod_api_diary_tags b " \
                      "on a.id=b.diary_id left join src_zhengxing_api_tag c on b.tag_id=c.id " \
                      "where a.is_online = 1 and a.content_level >= '3' " \
                      "and c.id = {} and c.tag_type = '3'".format(tags)
                cursor.execute(sql)
                result = cursor.fetchall()
                db.close()
张彦钊's avatar
张彦钊 committed
160
                if len(result) > 0:
张彦钊's avatar
张彦钊 committed
161
                    cids = [str(i[0]) for i in result]
张彦钊's avatar
张彦钊 committed
162
                    r = redis.Redis(host="172.16.40.135", port=5379, password="", db=2)
张彦钊's avatar
张彦钊 committed
163 164
                    key = str(device_id) + "_dislike_diary"
                    if r.exists(key):
张彦钊's avatar
张彦钊 committed
165
                        value = json.loads(r.get(key).decode('utf-8'))
张彦钊's avatar
张彦钊 committed
166
                        value.extend(cids)
张彦钊's avatar
张彦钊 committed
167
                        cids = json.dumps(list(set(value)))
张彦钊's avatar
张彦钊 committed
168 169 170
                        r.set(key, cids)

                    else:
张彦钊's avatar
张彦钊 committed
171
                        r.set(key, json.dumps(cids))
张彦钊's avatar
张彦钊 committed
172
                        r.expire(key, 7 * 24 * 60 * 60)
张彦钊's avatar
张彦钊 committed
173

张彦钊's avatar
张彦钊 committed
174
    except Exception as e:
张彦钊's avatar
张彦钊 committed
175
        print("diary insert redis fail")
张彦钊's avatar
张彦钊 committed
176
        print(e)
张彦钊's avatar
张彦钊 committed
177

张彦钊's avatar
张彦钊 committed
178

张彦钊's avatar
张彦钊 committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
# sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
# ssc = StreamingContext(sc,4)
# sc.setLogLevel("WARN")
# kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
#                "group.id": "dislike",
#                "socket.timeout.ms": "600000",
#                "auto.offset.reset": "largest"}
#
#
# stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
# transformstream = stream.transform(lambda x:model(x))
# transformstream.pprint()
#
# ssc.start()
# ssc.awaitTermination()

def make_data(device_id,city_id):
    r = redis.StrictRedis.from_url("redis://redis.paas-test.env:6379/2")
    key = "device_diary_queue_rerank:device_id:" + device_id + ":city_id:" + city_id
    r.hset(name=key, key="native_queue", value=native)
    r.hset(name=key, key="nearby_queue", value=nearby)
    r.hset(name=key, key="nation_queue", value=nation)
    r.hset(name=key, key="megacity_queue", value=megacity)
    print(r.hgetall(key))

if __name__ == "__main__":
    native = ",".join([str(i) for i in (range(2, 6))])
    nearby = ",".join([str(i) for i in (range(6, 10))])
    nation = ",".join([str(i) for i in (range(10, 13))])
    megacity = ",".join([str(i) for i in (range(13, 16))])
    make_data("hello","beijing")








张彦钊's avatar
张彦钊 committed
218 219 220



张彦钊's avatar
张彦钊 committed
221 222


张彦钊's avatar
张彦钊 committed
223

张彦钊's avatar
张彦钊 committed
224

张彦钊's avatar
张彦钊 committed
225 226