Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
f9a504d4
Commit
f9a504d4
authored
Aug 23, 2018
by
张彦钊
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update diaryQueueUpdate
parent
7223912e
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
31 additions
and
13 deletions
+31
-13
diaryQueueUpdate.py
diaryQueueUpdate.py
+31
-13
No files found.
diaryQueueUpdate.py
View file @
f9a504d4
...
@@ -11,6 +11,8 @@ from config import *
...
@@ -11,6 +11,8 @@ from config import *
import
json
import
json
from
sklearn.preprocessing
import
MinMaxScaler
from
sklearn.preprocessing
import
MinMaxScaler
import
time
import
time
from
userProfile
import
get_active_users
import
os
def
test_con_sql
(
device_id
):
def
test_con_sql
(
device_id
):
db
=
pymysql
.
connect
(
host
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
port
=
3306
,
user
=
'doris'
,
db
=
pymysql
.
connect
(
host
=
'rm-m5e842126ng59jrv6.mysql.rds.aliyuncs.com'
,
port
=
3306
,
user
=
'doris'
,
...
@@ -124,7 +126,7 @@ def save_result(queue_name, x_list):
...
@@ -124,7 +126,7 @@ def save_result(queue_name, x_list):
mm_scaler
.
fit
(
score_df
)
mm_scaler
.
fit
(
score_df
)
score_df
=
pd
.
DataFrame
(
mm_scaler
.
transform
(
score_df
))
score_df
=
pd
.
DataFrame
(
mm_scaler
.
transform
(
score_df
))
print
(
"概率前十行:"
)
print
(
"概率前十行:"
)
print
(
score_df
)
#
print(score_df)
score_df
=
score_df
.
rename
(
columns
=
{
0
:
"score"
})
score_df
=
score_df
.
rename
(
columns
=
{
0
:
"score"
})
score_df
[
"cid"
]
=
x_list
score_df
[
"cid"
]
=
x_list
...
@@ -147,14 +149,14 @@ def merge_score(x_list, score_df):
...
@@ -147,14 +149,14 @@ def merge_score(x_list, score_df):
result
=
cursor
.
fetchall
()
result
=
cursor
.
fetchall
()
score
=
pd
.
DataFrame
(
list
(
result
))
score
=
pd
.
DataFrame
(
list
(
result
))
print
(
"数据库日记表前十行"
)
print
(
"数据库日记表前十行"
)
print
(
score
)
#
print(score)
score_list
=
score
[
0
]
.
values
.
tolist
()
score_list
=
score
[
0
]
.
values
.
tolist
()
db
.
close
()
db
.
close
()
score_df
[
"score"
]
=
score_df
[
"score"
]
+
score_list
score_df
[
"score"
]
=
score_df
[
"score"
]
+
score_list
print
(
"sum"
)
print
(
"sum"
)
print
(
score_df
)
#
print(score_df)
return
score_df
return
score_df
...
@@ -201,7 +203,7 @@ def update_sql_dairy_queue(queue_name, diary_id):
...
@@ -201,7 +203,7 @@ def update_sql_dairy_queue(queue_name, diary_id):
# print("成功写入")
# print("成功写入")
def
multi_update
(
key
,
name_dict
):
def
multi_update
(
key
,
name_dict
,
native_queue_list
):
predict
(
key
,
name_dict
[
key
])
predict
(
key
,
name_dict
[
key
])
score_df
=
save_result
(
key
,
name_dict
[
key
])
score_df
=
save_result
(
key
,
name_dict
[
key
])
score_df
=
merge_score
(
name_dict
[
key
],
score_df
)
score_df
=
merge_score
(
name_dict
[
key
],
score_df
)
...
@@ -214,21 +216,38 @@ def multi_update(key, name_dict):
...
@@ -214,21 +216,38 @@ def multi_update(key, name_dict):
print
(
"不需要更新日记队列"
)
print
(
"不需要更新日记队列"
)
if
__name__
==
"__main__"
:
def
user_update
(
device_id
):
start
=
time
.
time
()
warnings
.
filterwarnings
(
"ignore"
)
data_set_cid
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"data_set_cid.csv"
)[
"cid"
]
.
values
.
tolist
()
device_id
=
"358035085192742"
native_queue_list
,
nearby_queue_list
,
nation_queue_list
,
megacity_queue_list
=
test_con_sql
(
device_id
)
native_queue_list
,
nearby_queue_list
,
nation_queue_list
,
megacity_queue_list
=
test_con_sql
(
device_id
)
name_dict
=
{
"native_queue"
:
native_queue_list
,
"nearby_queue"
:
nearby_queue_list
,
name_dict
=
{
"native_queue"
:
native_queue_list
,
"nearby_queue"
:
nearby_queue_list
,
"nation_queue"
:
nation_queue_list
,
"megacity_queue"
:
megacity_queue_list
}
"nation_queue"
:
nation_queue_list
,
"megacity_queue"
:
megacity_queue_list
}
pool
=
Pool
(
4
)
pool
=
Pool
(
4
)
for
key
in
name_dict
.
keys
():
for
key
in
name_dict
.
keys
():
pool
.
apply_async
(
multi_update
,
(
key
,
name_dic
t
,))
pool
.
apply_async
(
multi_update
,
(
key
,
name_dict
,
native_queue_lis
t
,))
pool
.
close
()
pool
.
close
()
pool
.
join
()
pool
.
join
()
if
__name__
==
"__main__"
:
while
True
:
empty
,
device_id_list
=
get_active_users
()
if
empty
:
for
eachFile
in
os
.
listdir
(
"/tmp"
):
if
"xlearn"
in
eachFile
:
os
.
remove
(
"/tmp"
+
"/"
+
eachFile
)
time
.
sleep
(
58
)
else
:
old_device_id_list
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"data_set_device_id.csv"
)[
"device_id"
]
.
values
.
tolist
()
# 求活跃用户和老用户的交集,也就是只预测老用户
predict_list
=
list
(
set
(
device_id_list
)
&
set
(
old_device_id_list
))
start
=
time
.
time
()
warnings
.
filterwarnings
(
"ignore"
)
data_set_cid
=
pd
.
read_csv
(
DIRECTORY_PATH
+
"data_set_cid.csv"
)[
"cid"
]
.
values
.
tolist
()
for
device_id
in
predict_list
:
user_update
(
device_id
)
end
=
time
.
time
()
print
(
end
-
start
)
# # TODO 上线后把预测用户改成多进程预测
# # TODO 上线后把预测用户改成多进程预测
# data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
# data_set_cid = pd.read_csv(DIRECTORY_PATH + "data_set_cid.csv")["cid"].values.tolist()
#
#
...
@@ -239,8 +258,7 @@ if __name__ == "__main__":
...
@@ -239,8 +258,7 @@ if __name__ == "__main__":
#
#
# for key in name_dict.keys():
# for key in name_dict.keys():
# multi_update(key, name_dict)
# multi_update(key, name_dict)
end
=
time
.
time
()
print
(
end
-
start
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment