Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
F
ffm-baseline
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
ML
ffm-baseline
Commits
b69ad3c9
Commit
b69ad3c9
authored
Oct 17, 2019
by
高雅喆
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update
parent
ac75cb2b
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
22 additions
and
22 deletions
+22
-22
pyspark_argsparse_test.py
eda/smart_rank/pyspark_argsparse_test.py
+22
-22
No files found.
eda/smart_rank/pyspark_argsparse_test.py
View file @
b69ad3c9
...
...
@@ -68,25 +68,25 @@ if __name__ == '__main__':
all_3tag_2tag
=
get_all_3tag_2tag
()
# rdd
#
sparkConf = SparkConf().set("spark.hive.mapred.supports.subdirectories", "true") \
#
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive", "true") \
#
.set("spark.tispark.plan.allow_index_double_read", "false") \
#
.set("spark.tispark.plan.allow_index_read", "true") \
#
.set("spark.sql.extensions", "org.apache.spark.sql.TiExtensions") \
#
.set("spark.tispark.pd.addresses", "172.16.40.170:2379").set("spark.io.compression.codec", "lzf") \
#
.set("spark.driver.maxResultSize", "8g").set("spark.sql.avro.compression.codec", "snappy")
#
#
spark = SparkSession.builder.config(conf=sparkConf).enableHiveSupport().getOrCreate()
#
spark.sparkContext.setLogLevel("WARN")
#
spark.sparkContext.addPyFile("/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py")
#
device_ids_lst_rdd = spark.sparkContext.parallelize(action_type)
#
print("="*100)
#
print(action_type)
#
print(type(device_ids_lst_rdd))
#
print(device_ids_lst_rdd)
#
print("=" * 100)
#
result = device_ids_lst_rdd.repartition(100).map(
#
lambda x: args_test(x))
#
print(result)
#
print(result.collect())
#
print(result.foreach(print))
sparkConf
=
SparkConf
()
.
set
(
"spark.hive.mapred.supports.subdirectories"
,
"true"
)
\
.
set
(
"spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive"
,
"true"
)
\
.
set
(
"spark.tispark.plan.allow_index_double_read"
,
"false"
)
\
.
set
(
"spark.tispark.plan.allow_index_read"
,
"true"
)
\
.
set
(
"spark.sql.extensions"
,
"org.apache.spark.sql.TiExtensions"
)
\
.
set
(
"spark.tispark.pd.addresses"
,
"172.16.40.170:2379"
)
.
set
(
"spark.io.compression.codec"
,
"lzf"
)
\
.
set
(
"spark.driver.maxResultSize"
,
"8g"
)
.
set
(
"spark.sql.avro.compression.codec"
,
"snappy"
)
spark
=
SparkSession
.
builder
.
config
(
conf
=
sparkConf
)
.
enableHiveSupport
()
.
getOrCreate
()
spark
.
sparkContext
.
setLogLevel
(
"WARN"
)
spark
.
sparkContext
.
addPyFile
(
"/srv/apps/ffm-baseline_git/eda/smart_rank/tool.py"
)
device_ids_lst_rdd
=
spark
.
sparkContext
.
parallelize
(
action_type
)
print
(
"="
*
100
)
print
(
action_type
)
print
(
type
(
device_ids_lst_rdd
))
print
(
device_ids_lst_rdd
)
print
(
"="
*
100
)
result
=
device_ids_lst_rdd
.
repartition
(
100
)
.
map
(
lambda
x
:
args_test
(
x
))
print
(
result
)
print
(
result
.
collect
())
print
(
result
.
foreach
(
print
))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment