Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
fcbbd898
Commit
fcbbd898
authored
Aug 08, 2019
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
change
parent
cf086404
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
194 additions
and
8 deletions
+194
-8
ctr.py
ctr.py
+194
-8
No files found.
ctr.py
View file @
fcbbd898
...
...
@@ -5,22 +5,171 @@ from datetime import timedelta
import
pickle
import
time
from
kafka
import
KafkaProducer
import
json
from
pyspark.streaming.kafka
import
KafkaUtils
from
pyspark
import
SparkContext
from
pyspark.sql
import
SQLContext
from
pyspark.streaming
import
StreamingContext
from
pyspark
import
SparkConf
import
redis
import
sys
import
os
import
json
import
pymysql
import
numpy
as
np
import
time
import
datetime
import
tensorflow
as
tf
import
msgpack
import
smtplib
import
requests
from
email.mime.text
import
MIMEText
from
email.utils
import
formataddr
from
email.mime.multipart
import
MIMEMultipart
from
email.mime.application
import
MIMEApplication
# sys.path.append('/srv/apps/ftrl/Bandist_Streaming')
from
Bandits_Streaming.deep_contextual_bandits.bandits.algorithms.neural_linear_sampling
import
NeuralLinearPosteriorSampling
def
on_send_success
():
print
(
"succeed"
)
return
1
def
send_email
(
app
,
id
,
e
,
extra_information
=
''
):
# 第三方 SMTP 服务
mail_host
=
'smtp.exmail.qq.com'
# 设置服务器
mail_user
=
"huangkai@igengmei.com"
# 用户名
mail_pass
=
"UyhVobmDHa4r4ecV"
# 口令
sender
=
'huangkai@igengmei.com'
receivers
=
[
'huangkai@igengmei.com'
]
# 接收邮件,可设置为你的QQ邮箱或者其他邮箱
e
=
str
(
e
)
msg
=
MIMEMultipart
()
part
=
MIMEText
(
'app_id:'
+
id
+
':fail'
,
'plain'
,
'utf-8'
)
msg
.
attach
(
part
)
msg
[
'From'
]
=
formataddr
([
"huangkai"
,
sender
])
# 括号里的对应收件人邮箱昵称、收件人邮箱账号
msg
[
'To'
]
=
";"
.
join
(
receivers
)
# message['Cc'] = ";".join(cc_reciver)
def
on_send_error
():
print
(
"fail"
)
msg
[
'Subject'
]
=
'spark streaming:app_name:'
+
app
with
open
(
'error.txt'
,
'w'
)
as
f
:
f
.
write
(
e
)
f
.
write
(
extra_information
)
f
.
close
()
part
=
MIMEApplication
(
open
(
'error.txt'
,
'r'
)
.
read
())
part
.
add_header
(
'Content-Disposition'
,
'attachment'
,
filename
=
"error.txt"
)
msg
.
attach
(
part
)
try
:
smtpObj
=
smtplib
.
SMTP_SSL
(
mail_host
,
465
)
smtpObj
.
login
(
mail_user
,
mail_pass
)
smtpObj
.
sendmail
(
sender
,
receivers
,
msg
.
as_string
())
except
smtplib
.
SMTPException
:
print
(
'error'
)
def
ts_cal
():
return
0
if
__name__
==
"__main__"
:
producer
=
KafkaProducer
(
bootstrap_servers
=
[
'173.16.44.25:9092'
],
key_serializer
=
lambda
k
:
pickle
.
dumps
(
k
),
value_serializer
=
lambda
v
:
pickle
.
dumps
(
v
))
def
cal_ctr
(
data
):
a1
=
datetime
.
datetime
.
now
()
device_data
=
data
[
1
]
device_id
=
device_data
[
'device'
][
'device_id'
]
db_eagle
=
pymysql
.
connect
(
host
=
"172.16.40.158"
,
port
=
4000
,
user
=
"root"
,
password
=
"3SYz54LS9#^9sBvC"
,
db
=
"eagle"
,
cursorclass
=
pymysql
.
cursors
.
DictCursor
)
cursor
=
db_eagle
.
cursor
()
sql
=
'select id from online_api_service'
cursor
.
execute
(
sql
)
results
=
cursor
.
fetchall
()
device_meigou_ctr_key
=
'device_meigou_ctr:device_id:'
+
str
(
device_id
)
device_meigou_params_key
=
'device_meigou_params:device_id:'
+
str
(
device_id
)
redis_client
=
redis
.
StrictRedis
.
from_url
(
'redis://:ReDis!GmTx*0aN6@172.16.40.133:6379'
)
meigou_index_dict
=
dict
()
meigou_new_params_dict
=
dict
()
index_value
=
0
init_params_value
=
1
model_param_a
=
list
()
model_param_b
=
list
()
if
redis_client
.
exists
(
device_meigou_params_key
):
meigou_params_dict
=
redis_client
.
hgetall
(
device_meigou_params_key
)
for
result
in
results
:
if
result
[
'id'
]
in
meigou_params_dict
.
keys
():
meigou_index_dict
.
update
({
index_value
:
result
[
'id'
]})
meigou_new_params_dict
.
update
({
result
[
'id'
]:
meigou_index_dict
[
result
[
'id'
]]})
model_param_a
.
append
(
meigou_params_dict
[
result
[
'id'
]][
'a'
])
model_param_b
.
append
(
meigou_params_dict
[
result
[
'id'
]][
'b'
])
index_value
+=
1
else
:
meigou_index_dict
.
update
({
index_value
:
result
[
'id'
]})
meigou_new_params_dict
.
update
({
result
[
'id'
]:{
"a"
:
init_params_value
,
"b"
:
init_params_value
}})
model_param_a
.
append
(
init_params_value
)
model_param_b
.
append
(
init_params_value
)
index_value
+=
1
else
:
for
result
in
results
:
meigou_new_params_dict
.
update
({
result
[
'id'
]:{
"a"
:
init_params_value
,
"b"
:
init_params_value
}})
meigou_index_dict
.
update
({
index_value
:
result
[
'id'
]})
model_param_a
.
append
(
init_params_value
)
model_param_b
.
append
(
init_params_value
)
index_value
+=
1
a2
=
datetime
.
datetime
.
now
()
num_actions
=
len
(
results
)
user_feature
=
np
.
array
([
1
])
# hparams_nlinear = tf.contrib.training.HParams(num_actions=num_actions,
# context_dim=1,
# init_scale=0.3,
# activation=tf.nn.relu,
# layer_sizes=[1],
# batch_size=1,
# activate_decay=True,
# initial_lr=0.1,
# max_grad_norm=5.0,
# show_training=False,
# freq_summary=1000,
# buffer_s=-1,
# initial_pulls=0,
# reset_lr=True,
# lr_decay_rate=0.5,
# training_freq=1,
# training_freq_network=10000,
# training_epochs=100,
# a0=model_param_a,
# b0=model_param_b,
# lambda_prior=0.25)
# inital model
model
=
NeuralLinearPosteriorSampling
(
'NeuralLinear'
,
num_actions
,
model_param_a
,
model_param_b
)
a2
=
datetime
.
datetime
.
now
()
vals
=
model
.
action
(
user_feature
)
# model.update(user_feature,0,np.array(1))
max
=
vals
.
max
()
min
=
vals
.
min
()
ctr_0_1
=
(
vals
-
min
)
/
(
max
-
min
)
meigou_ctr_dict
=
dict
()
a3
=
datetime
.
datetime
.
now
()
for
i
in
range
(
len
(
ctr_0_1
)):
meigou_ctr_dict
.
update
({
meigou_index_dict
[
i
]:
ctr_0_1
[
i
]})
redis_client
.
set
(
device_meigou_ctr_key
,
json
.
dumps
(
meigou_ctr_dict
))
a4
=
datetime
.
datetime
.
now
()
send_email
(
str
(
a1
),
str
(
a2
),
str
(
a3
),
str
(
a4
))
def
choose_action
():
return
0
def
Filter_Data
(
data
):
data_dict
=
data
[
1
]
if
b
'content'
in
data_dict
:
return
False
elif
'type'
in
data_dict
:
if
data_dict
[
'type'
]
==
'device_opened'
and
data_dict
[
'device'
][
'device_id'
]
==
'8E699605-DC2A-46B6-8B47-E9E809353055'
:
return
True
def
write_to_kafka
():
producer
=
KafkaProducer
(
bootstrap_servers
=
[
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
],
key_serializer
=
lambda
k
:
pickle
.
dumps
(
k
),
value_serializer
=
lambda
v
:
pickle
.
dumps
(
v
))
print
(
"hajs"
)
future
=
producer
.
send
(
topic
=
"test_topic"
,
key
=
"hello"
,
value
=
"world"
)
future
=
producer
.
send
(
topic
=
"test_topic"
,
key
=
"hello"
,
value
=
"world"
)
try
:
record_metadata
=
future
.
get
(
timeout
=
10
)
print
(
12
)
...
...
@@ -28,6 +177,43 @@ if __name__ == "__main__":
print
(
str
(
e
))
def
Ctr
(
rdd
):
try
:
results
=
rdd
return
results
except
:
print
(
"fail"
)
def
m_decoder
(
s
):
if
s
is
None
:
return
None
try
:
data
=
msgpack
.
loads
(
s
,
encoding
=
'utf-8'
)
return
data
except
:
data
=
pickle
.
loads
(
s
)
return
data
if
__name__
==
'__main__'
:
# Spark-Streaming-Kafka
sc
=
SparkContext
(
conf
=
SparkConf
()
.
setMaster
(
"spark://nvwa01:7077"
)
.
setAppName
(
"kafka_test"
)
.
set
(
"spark.io.compression.codec"
,
"lzf"
))
ssc
=
SQLContext
(
sc
)
ssc
=
StreamingContext
(
sc
,
10
)
sc
.
setLogLevel
(
"WARN"
)
kafkaParams
=
{
"metadata.broker.list"
:
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
,
"group.id"
:
"kafka_test"
,
"socket.timeout.ms"
:
"600000"
,
"auto.offset.reset"
:
"largest"
}
stream
=
KafkaUtils
.
createDirectStream
(
ssc
,
[
"test_topic"
],
kafkaParams
,
keyDecoder
=
m_decoder
,
valueDecoder
=
m_decoder
)
transformstream
=
stream
.
transform
(
lambda
x
:
Ctr
(
x
))
transformstream
.
pprint
()
ssc
.
start
()
ssc
.
awaitTermination
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment