Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
c4d08167
Commit
c4d08167
authored
5 years ago
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
change test file
parent
e94dbac1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
20 additions
and
65 deletions
+20
-65
monitor.py
monitor.py
+20
-65
No files found.
monitor.py
View file @
c4d08167
...
@@ -20,8 +20,7 @@ import datetime
...
@@ -20,8 +20,7 @@ import datetime
def
Json
(
x
):
def
Json
(
x
):
data
=
json
.
loads
(
x
[
1
])
data
=
json
.
loads
(
x
[
1
])
if
'type'
in
data
and
'device'
in
data
and
'params'
in
data
and
'card_content_type'
in
data
[
'params'
]:
if
'type'
in
data
and
'device'
in
data
and
'params'
in
data
and
'card_content_type'
in
data
[
'params'
]:
if
data
[
'type'
]
==
'on_click_card'
and
data
[
"device"
][
"device_id"
]
==
"E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6"
\
if
data
[
'type'
]
==
'on_click_card'
and
data
[
"device"
][
"device_id"
]
==
"E417C286-40A4-42F6-BDA9-AEEBD8FEC3B6"
:
and
data
[
'params'
][
'card_content_type'
]
in
(
"answer"
,
"question"
,
"qa"
):
return
True
return
True
else
:
else
:
return
False
return
False
...
@@ -53,10 +52,8 @@ def get_data(x):
...
@@ -53,10 +52,8 @@ def get_data(x):
def
write_redis
(
device_id
,
cid
,
card
):
def
write_redis
(
device_id
,
cid
,
card
):
if
card
==
"diary"
:
if
card
==
"diary"
:
diary_write
(
device_id
,
cid
)
diary_write
(
device_id
,
cid
)
elif
card
==
"q
uestion
"
:
elif
card
==
"q
a
"
:
question_write
(
device_id
,
cid
)
question_write
(
device_id
,
cid
)
elif
card
==
"answer"
:
answer_write
(
device_id
,
cid
)
elif
card
==
"user_post"
:
elif
card
==
"user_post"
:
tractate_write
(
device_id
,
cid
)
tractate_write
(
device_id
,
cid
)
...
@@ -101,48 +98,6 @@ def tractate_write(device_id, cid):
...
@@ -101,48 +98,6 @@ def tractate_write(device_id, cid):
print
(
e
)
print
(
e
)
def
answer_write
(
device_id
,
cid
):
try
:
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
sql
=
"select c.id from src_mimas_prod_api_answer a left join src_mimas_prod_api_questiontag b "
\
"on a.question_id = b.question_id left join src_zhengxing_api_tag c "
\
"on b.tag_id = c.id where c.tag_type = '3' and a.id = {}"
.
format
(
cid
)
cursor
=
db
.
cursor
()
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
if
len
(
result
)
>
0
:
tags
=
result
[
0
][
0
]
if
tags
is
not
None
:
sql
=
"select a.id from src_mimas_prod_api_answer a left join src_mimas_prod_api_questiontag b "
\
"on a.question_id = b.question_id left join src_zhengxing_api_tag c on b.tag_id=c.id "
\
"where a.is_online = 1 and c.id = {} and c.tag_type = '3'"
.
format
(
tags
)
cursor
.
execute
(
sql
)
result
=
cursor
.
fetchall
()
db
.
close
()
if
len
(
result
)
>
0
:
cids
=
[
i
[
0
]
for
i
in
result
]
r
=
redis
.
Redis
(
host
=
"172.16.40.135"
,
port
=
5379
,
password
=
""
,
db
=
2
)
key
=
str
(
device_id
)
+
"_dislike_answer"
if
r
.
exists
(
key
):
value
=
json
.
loads
(
r
.
get
(
key
)
.
decode
(
'utf-8'
))
value
.
extend
(
cids
)
cids
=
json
.
dumps
(
list
(
set
(
value
)))
r
.
set
(
key
,
cids
)
print
(
"cunza"
)
else
:
r
.
set
(
key
,
json
.
dumps
(
cids
))
r
.
expire
(
key
,
7
*
24
*
60
*
60
)
print
(
"bucunza"
)
except
Exception
as
e
:
print
(
"answer insert redis fail"
)
print
(
e
)
def
question_write
(
device_id
,
cid
):
def
question_write
(
device_id
,
cid
):
try
:
try
:
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
db
=
pymysql
.
connect
(
host
=
'172.16.40.158'
,
port
=
4000
,
user
=
'root'
,
passwd
=
'3SYz54LS9#^9sBvC'
,
db
=
'eagle'
)
...
@@ -165,7 +120,7 @@ def question_write(device_id,cid):
...
@@ -165,7 +120,7 @@ def question_write(device_id,cid):
cids
=
[
i
[
0
]
for
i
in
result
]
cids
=
[
i
[
0
]
for
i
in
result
]
r
=
redis
.
Redis
(
host
=
"172.16.40.135"
,
port
=
5379
,
password
=
""
,
db
=
2
)
r
=
redis
.
Redis
(
host
=
"172.16.40.135"
,
port
=
5379
,
password
=
""
,
db
=
2
)
key
=
str
(
device_id
)
+
"_dislike_q
uestion
"
key
=
str
(
device_id
)
+
"_dislike_q
a
"
if
r
.
exists
(
key
):
if
r
.
exists
(
key
):
value
=
json
.
loads
(
r
.
get
(
key
)
.
decode
(
'utf-8'
))
value
=
json
.
loads
(
r
.
get
(
key
)
.
decode
(
'utf-8'
))
value
.
extend
(
cids
)
value
.
extend
(
cids
)
...
@@ -221,23 +176,23 @@ def diary_write(device_id,cid):
...
@@ -221,23 +176,23 @@ def diary_write(device_id,cid):
print
(
e
)
print
(
e
)
sc
=
SparkContext
(
conf
=
SparkConf
()
.
setMaster
(
"spark://nvwa01:7077"
)
.
setAppName
(
"dislike"
)
.
set
(
"spark.io.compression.codec"
,
"lzf"
))
#
sc = SparkContext(conf=SparkConf().setMaster("spark://nvwa01:7077").setAppName("dislike").set("spark.io.compression.codec", "lzf"))
ssc
=
StreamingContext
(
sc
,
4
)
#
ssc = StreamingContext(sc,4)
sc
.
setLogLevel
(
"WARN"
)
#
sc.setLogLevel("WARN")
kafkaParams
=
{
"metadata.broker.list"
:
"172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092"
,
#
kafkaParams = {"metadata.broker.list": "172.16.44.25:9092,172.16.44.31:9092,172.16.44.45:9092",
"group.id"
:
"dislike"
,
#
"group.id": "dislike",
"socket.timeout.ms"
:
"600000"
,
#
"socket.timeout.ms": "600000",
"auto.offset.reset"
:
"largest"
}
#
"auto.offset.reset": "largest"}
#
#
stream
=
KafkaUtils
.
createDirectStream
(
ssc
,
[
"gm-maidian-data"
],
kafkaParams
)
#
stream = KafkaUtils.createDirectStream(ssc, ["gm-maidian-data"], kafkaParams)
transformstream
=
stream
.
transform
(
lambda
x
:
model
(
x
))
#
transformstream = stream.transform(lambda x:model(x))
transformstream
.
pprint
()
#
transformstream.pprint()
#
ssc
.
start
()
#
ssc.start()
ssc
.
awaitTermination
()
#
ssc.awaitTermination()
# diary_write("9C5E7C73-380C-4623-8F48-A64C8034E315",16952841
)
question_write
(
"hello"
,
224586
)
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment