ctr.py 8.19 KB
Newer Older
张彦钊's avatar
张彦钊 committed
1 2
import pandas as pd
import pymysql
张彦钊's avatar
张彦钊 committed
3 4
from datetime import datetime
from datetime import timedelta
张彦钊's avatar
张彦钊 committed
5 6 7
import pickle
import time
from kafka import KafkaProducer
张彦钊's avatar
张彦钊 committed
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
import json
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.streaming import StreamingContext
from pyspark import  SparkConf
import redis
import sys
import os
import json
import pymysql
import numpy as np
import time
import datetime
import tensorflow as tf
import msgpack
import smtplib
import requests

from email.mime.text import MIMEText
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
# sys.path.append('/srv/apps/ftrl/Bandist_Streaming')

def send_email(app,id,e,extra_information = ''):
    # 第三方 SMTP 服务
    mail_host = 'smtp.exmail.qq.com'  # 设置服务器
    mail_user = "huangkai@igengmei.com"  # 用户名
    mail_pass = "UyhVobmDHa4r4ecV"  # 口令

    sender = 'huangkai@igengmei.com'
    receivers = ['huangkai@igengmei.com']  # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
    e = str(e)
    msg = MIMEMultipart()
    part = MIMEText('app_id:'+id+':fail', 'plain', 'utf-8')
    msg.attach(part)
    msg['From'] = formataddr(["huangkai", sender])
    # 括号里的对应收件人邮箱昵称、收件人邮箱账号
    msg['To'] = ";".join(receivers)
    # message['Cc'] = ";".join(cc_reciver)

    msg['Subject'] = 'spark streaming:app_name:'+app
    with open('error.txt','w') as f:
         f.write(e)
         f.write(extra_information)
         f.close()
    part = MIMEApplication(open('error.txt', 'r').read())
    part.add_header('Content-Disposition', 'attachment', filename="error.txt")
    msg.attach(part)

    try:
        smtpObj = smtplib.SMTP_SSL(mail_host, 465)
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receivers, msg.as_string())
    except smtplib.SMTPException:
        print('error')


def ts_cal():
    return  0
张彦钊's avatar
张彦钊 committed
69

张彦钊's avatar
张彦钊 committed
70

张彦钊's avatar
张彦钊 committed
71 72 73 74 75 76 77 78 79 80 81
def cal_ctr(data):
    a1 = datetime.datetime.now()
    device_data = data[1]
    device_id = device_data['device']['device_id']
    db_eagle = pymysql.connect(host="172.16.40.158", port=4000, user="root", password="3SYz54LS9#^9sBvC",
                               db="eagle",
                               cursorclass=pymysql.cursors.DictCursor)
    cursor = db_eagle.cursor()
    sql = 'select id from online_api_service'
    cursor.execute(sql)
    results = cursor.fetchall()
张彦钊's avatar
张彦钊 committed
82

张彦钊's avatar
张彦钊 committed
83 84
    device_meigou_ctr_key = 'device_meigou_ctr:device_id:'+str(device_id)
    device_meigou_params_key = 'device_meigou_params:device_id:'+str(device_id)
张彦钊's avatar
张彦钊 committed
85

张彦钊's avatar
张彦钊 committed
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155
    redis_client = redis.StrictRedis.from_url('redis://:ReDis!GmTx*0aN6@172.16.40.133:6379')
    meigou_index_dict = dict()
    meigou_new_params_dict = dict()
    index_value = 0
    init_params_value = 1
    model_param_a = list()
    model_param_b = list()
    if redis_client.exists(device_meigou_params_key):
       meigou_params_dict = redis_client.hgetall(device_meigou_params_key)
       for result in results:
           if result['id'] in meigou_params_dict.keys():
               meigou_index_dict.update({index_value:result['id']})
               meigou_new_params_dict.update({result['id']:meigou_index_dict[result['id']]})
               model_param_a.append(meigou_params_dict[result['id']]['a'])
               model_param_b.append(meigou_params_dict[result['id']]['b'])
               index_value += 1
           else:
               meigou_index_dict.update({index_value: result['id']})
               meigou_new_params_dict.update({result['id']:{"a":init_params_value,"b":init_params_value}})
               model_param_a.append(init_params_value)
               model_param_b.append(init_params_value)
               index_value +=1
    else:
       for result in results:
           meigou_new_params_dict.update({result['id']:{"a":init_params_value,"b":init_params_value}})
           meigou_index_dict.update({index_value: result['id']})
           model_param_a.append(init_params_value)
           model_param_b.append(init_params_value)
           index_value += 1
    a2 = datetime.datetime.now()
    num_actions = len(results)
    user_feature = np.array([1])
    # hparams_nlinear = tf.contrib.training.HParams(num_actions=num_actions,
    #                                               context_dim=1,
    #                                               init_scale=0.3,
    #                                               activation=tf.nn.relu,
    #                                               layer_sizes=[1],
    #                                               batch_size=1,
    #                                               activate_decay=True,
    #                                               initial_lr=0.1,
    #                                               max_grad_norm=5.0,
    #                                               show_training=False,
    #                                               freq_summary=1000,
    #                                               buffer_s=-1,
    #                                               initial_pulls=0,
    #                                               reset_lr=True,
    #                                               lr_decay_rate=0.5,
    #                                               training_freq=1,
    #                                               training_freq_network=10000,
    #                                               training_epochs=100,
    #                                               a0=model_param_a,
    #                                               b0=model_param_b,
    #                                               lambda_prior=0.25)
    # inital model
    model = NeuralLinearPosteriorSampling('NeuralLinear',num_actions,model_param_a,model_param_b)
    a2 =datetime.datetime.now()
    vals = model.action(user_feature)
    # model.update(user_feature,0,np.array(1))
    max =vals.max()
    min = vals.min()
    ctr_0_1 = (vals-min)/(max-min)
    meigou_ctr_dict = dict()
    a3 =datetime.datetime.now()
    for i in range(len(ctr_0_1)):
        meigou_ctr_dict.update({meigou_index_dict[i]:ctr_0_1[i]})
    redis_client.set(device_meigou_ctr_key,json.dumps(meigou_ctr_dict))
    a4 = datetime.datetime.now()
    send_email(str(a1),str(a2),str(a3),str(a4))

def choose_action():
张彦钊's avatar
张彦钊 committed
156
    return 0
张彦钊's avatar
张彦钊 committed
157

张彦钊's avatar
张彦钊 committed
158 159 160 161 162 163 164 165
def Filter_Data(data):
    data_dict = data[1]
    if b'content' in data_dict:
        return False
    elif 'type' in data_dict:
        if data_dict['type'] == 'device_opened' and data_dict['device']['device_id'] == '8E699605-DC2A-46B6-8B47-E9E809353055':
            return True

张彦钊's avatar
张彦钊 committed
166

张彦钊's avatar
张彦钊 committed
167
def write_to_kafka():
张彦钊's avatar
张彦钊 committed
168
    producer = KafkaProducer(bootstrap_servers=["172.16.44.25:9092","172.16.44.31:9092","172.16.44.45:9092"],
张彦钊's avatar
张彦钊 committed
169 170
                             key_serializer=lambda k: json.dumps(k).encode('utf-8'),
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))
张彦钊's avatar
张彦钊 committed
171
    future = producer.send(topic="test_topic", key="hello", value="world")
张彦钊's avatar
张彦钊 committed
172 173
    try:
        record_metadata = future.get(timeout=10)
张彦钊's avatar
张彦钊 committed
174
        print("send ok")
张彦钊's avatar
张彦钊 committed
175 176 177
    except kafka_errors as e:
        print(str(e))

张彦钊's avatar
张彦钊 committed
178 179 180
def Ctr(rdd):
    try:
        results = rdd
张彦钊's avatar
张彦钊 committed
181
        write_to_kafka()
张彦钊's avatar
张彦钊 committed
182 183 184 185 186 187 188 189
        return results
    except:
        print("fail")

def m_decoder(s):
    if s is None:
        return None
    try:
张彦钊's avatar
张彦钊 committed
190
        data = json.loads(s)
张彦钊's avatar
张彦钊 committed
191 192
        return data
    except:
张彦钊's avatar
张彦钊 committed
193
        data = msgpack.loads(s, encoding='utf-8')
张彦钊's avatar
张彦钊 committed
194 195 196 197
        return data

if __name__ == '__main__':
    # Spark-Streaming-Kafka
张彦钊's avatar
张彦钊 committed
198

张彦钊's avatar
张彦钊 committed
199 200
    sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("kafka_test")
                      .set("spark.io.compression.codec", "lzf"))
张彦钊's avatar
张彦钊 committed
201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217
    ssc = SQLContext(sc)
    ssc = StreamingContext(sc, 10)
    sc.setLogLevel("WARN")
    kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
                   "group.id": "kafka_test",
                   "socket.timeout.ms": "600000",
                   "auto.offset.reset": "largest"}

    stream = KafkaUtils.createDirectStream(ssc, ["test_topic"], kafkaParams,
                                           keyDecoder=m_decoder, valueDecoder=m_decoder)
    transformstream = stream.transform(lambda x: Ctr(x))
    transformstream.pprint()
    ssc.start()
    ssc.awaitTermination()



张彦钊's avatar
张彦钊 committed
218 219


张彦钊's avatar
张彦钊 committed
220 221 222