Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
149aa897
Commit
149aa897
authored
5 years ago
by
Your Name
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
dist predict test
parent
460e6336
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
37 additions
and
35 deletions
+37
-35
dist_predict.py
eda/esmm/Model_pipline/dist_predict.py
+37
-35
No files found.
eda/esmm/Model_pipline/dist_predict.py
View file @
149aa897
...
...
@@ -275,19 +275,21 @@ if __name__ == "__main__":
te_result_dataframe
=
spark
.
createDataFrame
(
indices
.
flatMap
(
lambda
x
:
x
.
split
(
";"
))
.
map
(
lambda
l
:
Row
(
uid
=
l
.
split
(
":"
)[
0
],
city
=
l
.
split
(
":"
)[
1
],
cid_id
=
l
.
split
(
":"
)[
2
],
ctcvr
=
l
.
split
(
":"
)[
3
])))
te_result_dataframe
.
show
()
# print("nearby rdd data")
# te_result_dataframe.show()
nearby_data
=
te_result_dataframe
.
toPandas
()
print
(
"nearby pd data"
)
nearby_data
[
"cid_id1"
]
=
nearby_data
[
"cid_id"
]
.
apply
(
trans
)
nearby_data
[
"city1"
]
=
nearby_data
[
"city"
]
.
apply
(
trans
)
nearby_data
[
"uid1"
]
=
nearby_data
[
"uid"
]
.
apply
(
trans
)
print
(
nearby_data
.
head
())
df3
=
nearby_data
.
groupby
(
by
=
[
"uid1"
,
"city1"
])
.
apply
(
lambda
x
:
x
.
sort_values
(
by
=
"ctcvr"
,
ascending
=
False
))
\
.
reset_index
(
drop
=
True
)
.
groupby
(
by
=
[
"uid1"
,
"city1"
])
.
agg
({
'cid_id1'
:
set_join
})
.
reset_index
(
drop
=
False
)
df3
.
columns
=
[
"device_id"
,
"city_id"
,
"native_queue"
]
print
(
"nearby_device_count"
,
df3
.
shape
)
#
nearby_data = te_result_dataframe.toPandas()
#
print("nearby pd data")
#
nearby_data["cid_id1"] = nearby_data["cid_id"].apply(trans)
#
nearby_data["city1"] = nearby_data["city"].apply(trans)
#
nearby_data["uid1"] = nearby_data["uid"].apply(trans)
#
print(nearby_data.head())
#
#
df3 = nearby_data.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="ctcvr", ascending=False)) \
#
.reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
#
df3.columns = ["device_id", "city_id", "native_queue"]
#
print("nearby_device_count", df3.shape)
# print(nearby_data.head())
# print(nearby_data.dtypes)
...
...
@@ -297,31 +299,31 @@ if __name__ == "__main__":
#native data
native_data
=
spark
.
read
.
parquet
(
path
+
"native_result/"
)
# print("native rdd data")
# native_data.show()
native_data_pd
=
native_data
.
toPandas
()
print
(
"native pd data"
)
# native_data = spark.read.parquet(path+"native_result/")
# # print("native rdd data")
# # native_data.show()
# native_data_pd = native_data.toPandas()
# print("native pd data")
# # print(native_data_pd.head())
# native_data_pd["cid_id1"] = native_data_pd["cid_id"].apply(trans)
# native_data_pd["city1"] = native_data_pd["city"].apply(trans)
# native_data_pd["uid1"] = native_data_pd["uid"].apply(trans)
# print(native_data_pd.head())
native_data_pd
[
"cid_id1"
]
=
native_data_pd
[
"cid_id"
]
.
apply
(
trans
)
native_data_pd
[
"city1"
]
=
native_data_pd
[
"city"
]
.
apply
(
trans
)
native_data_pd
[
"uid1"
]
=
native_data_pd
[
"uid"
]
.
apply
(
trans
)
print
(
native_data_pd
.
head
())
df4
=
native_data_pd
.
groupby
(
by
=
[
"uid1"
,
"city1"
])
.
apply
(
lambda
x
:
x
.
sort_values
(
by
=
"ctcvr"
,
ascending
=
False
))
\
.
reset_index
(
drop
=
True
)
.
groupby
(
by
=
[
"uid1"
,
"city1"
])
.
agg
({
'cid_id1'
:
set_join
})
.
reset_index
(
drop
=
False
)
df4
.
columns
=
[
"device_id"
,
"city_id"
,
"nearby_queue"
]
print
(
"native_device_count"
,
df4
.
shape
)
# print(native_data_pd.dtypes)
# union
df_all
=
pd
.
merge
(
df3
,
df4
,
on
=
[
'device_id'
,
'city_id'
],
how
=
'outer'
)
.
fillna
(
""
)
df_all
[
'device_id'
]
=
df_all
[
'device_id'
]
.
astype
(
str
)
df_all
[
'city_id'
]
=
df_all
[
'city_id'
]
.
astype
(
str
)
df_all
[
"time"
]
=
str
(
datetime
.
datetime
.
now
()
.
strftime
(
'
%
Y
%
m
%
d
%
H
%
M'
))
print
(
"union_device_count"
,
df_all
.
shape
)
print
(
df_all
.
head
(
10
))
#
# df4 = native_data_pd.groupby(by=["uid1", "city1"]).apply(lambda x: x.sort_values(by="ctcvr", ascending=False)) \
# .reset_index(drop=True).groupby(by=["uid1", "city1"]).agg({'cid_id1': set_join}).reset_index(drop=False)
# df4.columns = ["device_id", "city_id", "nearby_queue"]
# print("native_device_count", df4.shape)
# # print(native_data_pd.dtypes)
#
#
# # union
# df_all = pd.merge(df3, df4, on=['device_id', 'city_id'], how='outer').fillna("")
# df_all['device_id'] = df_all['device_id'].astype(str)
# df_all['city_id'] = df_all['city_id'].astype(str)
# df_all["time"] = str(datetime.datetime.now().strftime('%Y%m%d%H%M'))
# print("union_device_count", df_all.shape)
# print(df_all.head(10))
# host = '172.16.40.158'
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment