Skip to content
Projects
Groups
Snippets
Help
Loading...
Sign in
Toggle navigation
A
Appium-crawl
Project
Project
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
李康
Appium-crawl
Commits
04067762
Commit
04067762
authored
Oct 18, 2019
by
李康
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
product scraper support failure detection, auto-restart
parent
f2bd2fee
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
214 additions
and
43 deletions
+214
-43
product.py
product.py
+214
-43
No files found.
product.py
View file @
04067762
...
...
@@ -4,6 +4,81 @@ import numpy as np
import
sys
,
os
,
time
delim
=
"; "
def
clickBrand
(
d
,
brand_name
):
print
(
"Try finding brand
%
s......"
%
brand_name
)
while
True
:
names
=
d
(
className
=
"android.support.v7.widget.RecyclerView"
)
.
child
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_name"
)
for
name
in
names
:
if
name
.
get_text
()
==
brand_name
:
name
.
click
()
print
(
"Brand
%
s is clicked!"
%
brand_name
)
return
names
=
d
(
className
=
"android.support.v7.widget.RecyclerView"
)
.
child
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_name_en"
)
for
name
in
names
:
if
name
.
get_text
()
==
brand_name
:
name
.
click
()
print
(
"Brand
%
s is clicked!"
%
brand_name
)
return
d
.
swipe_ext
(
"up"
,
scale
=
0.5
)
time
.
sleep
(
1
)
def
getProductFilterUiObj
(
d
):
return
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_filter"
)
def
adjustProductFilterPosition
(
d
):
top
=
d
.
window_size
()[
0
]
filter
=
getProductFilterUiObj
(
d
)
if
filter
.
count
==
0
:
print
(
"No product filter is found!!!!!!"
)
return
while
True
:
if
filter
.
info
[
'bounds'
][
'top'
]
>
top
/
2
:
d
.
swipe_ext
(
"up"
,
scale
=
0.2
)
else
:
break
return
filter
def
gotoProductFilter
(
d
):
while
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_filter"
)
.
count
==
0
:
d
.
swipe_ext
(
"up"
,
scale
=
0.5
)
time
.
sleep
(
1
)
return
adjustProductFilterPosition
(
d
)
def
getAllFilterTypes
(
d
):
filter
=
adjustProductFilterPosition
(
d
)
filter
.
click
()
ftype_names
=
[]
filter_pane
=
d
(
scrollable
=
True
)
if
filter_pane
.
count
!=
1
:
print
(
"filter pane is invalid!"
)
return
while
True
:
ftypes
=
filter_pane
.
child
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_type"
)
new_flag
=
False
for
ftype
in
ftypes
:
if
ftype
.
get_text
()
==
"全部品类"
:
continue
elif
ftype
.
get_text
()
in
ftype_names
:
continue
else
:
new_flag
=
True
ftype_names
.
append
(
ftype
.
get_text
())
if
new_flag
:
filter_pane
.
scroll
.
vert
(
steps
=
10
)
else
:
break
filter_pane
.
scroll
.
vert
.
backward
()
return
ftype_names
def
filterProduct
(
d
,
ftype_name
):
ftype
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_type"
,
text
=
ftype_name
)
while
ftype
.
count
==
0
:
d
(
scrollable
=
True
)
.
scroll
(
steps
=
10
)
ftype
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_type"
,
text
=
ftype_name
)
ftype
.
click
()
def
cropImg
(
img
,
out
,
bounds
):
delta
=
20
+
28
cropped
=
img
[
...
...
@@ -12,7 +87,13 @@ def cropImg(img, out, bounds):
]
# 裁剪坐标为[y0:y1, x0:x1]
cv2
.
imwrite
(
out
,
cropped
)
def
getProductDetail
(
d
,
brand_name
,
product
,
done
):
def
getProductImg
(
d
,
path
,
pos
):
image
=
d
.
screenshot
(
format
=
'opencv'
)
cropImg
(
image
,
path
,
pos
)
def
getProductKey
(
product
,
names
=
None
):
keyobj
=
None
name
=
product
.
child
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_name"
)
if
name
.
count
==
0
:
...
...
@@ -34,8 +115,18 @@ def getProductDetail(d, brand_name, product, done):
key
=
name
elif
name_en
!=
""
:
key
=
name_en
if
names
is
not
None
:
names
.
append
(
name
)
names
.
append
(
name_en
)
return
key
,
keyobj
def
getProductDetailBasic
(
product
,
done
):
names
=
[]
key
,
keyobj
=
getProductKey
(
product
,
names
)
if
key
in
done
or
key
==
""
:
return
None
return
key
,
keyobj
,
None
rate_score
=
keyobj
.
sibling
(
resourceId
=
"org.c2h4.afei.beauty:id/rate_score"
)
if
rate_score
.
count
==
0
:
...
...
@@ -54,13 +145,38 @@ def getProductDetail(d, brand_name, product, done):
address
=
""
else
:
address
=
address
.
get_text
()
return
key
,
keyobj
,
names
+
[
rate_score
,
asess_num
,
address
]
def
getProductDetail
(
d
,
brand_name
,
product
,
done
,
debug_trigger
=
False
# used for debug
):
if
debug_trigger
:
d
.
press
(
"back"
)
raise
RuntimeError
(
"No response from APP"
)
try
:
key
,
keyobj
,
basicinfo
=
getProductDetailBasic
(
product
,
done
)
if
basicinfo
is
None
:
return
None
except
Exception
as
e
:
print
(
"ERROR:
%
s"
,
str
(
e
))
raise
RuntimeError
(
"No response from APP"
)
keyobj
.
click
()
img
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/iv_image"
)
wait_cnt
=
0
while
img
.
count
==
0
:
time
.
sleep
(
0.1
)
time
.
sleep
(
0.2
)
wait_cnt
+=
1
if
(
wait_cnt
==
300
):
raise
RuntimeError
(
"No response from APP"
)
img
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/iv_image"
)
# wait half second for the image to be stable to take screenshot,
# otherwise possibly the image will be different somehow
time
.
sleep
(
0.5
)
path
=
brand_name
+
'/'
+
key
+
".jpg"
getProductImg
(
d
,
path
,
img
.
info
[
'bounds'
])
effects
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/rl_effect"
)
.
child
(
className
=
"android.widget.TextView"
)
...
...
@@ -71,47 +187,103 @@ def getProductDetail(d, brand_name, product, done):
else
:
temp
.
append
(
effect
.
get_text
())
effects
=
" "
.
join
(
temp
)
d
(
resourceId
=
"org.c2h4.afei.beauty:id/iv_back"
)
.
click
(
)
d
.
press
(
"back"
)
time
.
sleep
(
1
)
done
.
append
(
key
)
return
[
name
,
name_en
,
rate_score
,
asess_num
,
address
,
effects
]
def
getProductImg
(
d
,
path
,
pos
):
image
=
d
.
screenshot
()
image
=
cv2
.
cvtColor
(
np
.
array
(
image
),
cv2
.
COLOR_RGB2BGR
)
cropImg
(
image
,
path
,
pos
)
basicinfo
.
append
(
effects
)
return
basicinfo
def
saveProduct
(
f
,
res
,
product_type
):
res
.
append
(
product_type
)
print
(
delim
.
join
(
res
))
f
.
write
(
delim
.
join
(
res
)
+
"
\n
"
)
def
resetRuntime
(
d
,
brand_name
,
ftype_name
,
done
):
print
(
"!!!!!!!! start reset runtime environment !!!!!!!!!!!!!"
)
# re-select the target app
d
.
press
(
"home"
)
time
.
sleep
(
2
)
d
.
press
(
"recent"
)
time
.
sleep
(
2
)
center
=
[
ele
/
2
for
ele
in
d
.
window_size
()]
d
.
click
(
center
[
0
],
center
[
1
])
time
.
sleep
(
2
)
# go to the page where crashed
clickBrand
(
d
,
brand_name
)
time
.
sleep
(
2
)
gotoProductFilter
(
d
)
.
click
()
time
.
sleep
(
2
)
filterProduct
(
d
,
ftype_name
)
time
.
sleep
(
2
)
# goto the last product
if
len
(
done
)
>
0
:
ready
=
False
while
True
:
products
=
getAllProductUiObj
(
d
)
for
product
in
products
:
key
,
keyobj
=
getProductKey
(
product
)
if
key
==
done
[
-
1
]:
ready
=
True
break
if
ready
:
break
else
:
d
.
swipe_ext
(
"up"
,
scale
=
0.5
)
time
.
sleep
(
0.5
)
print
(
"!!!!!!!! reset runtime environment successfully !!!!!!!!"
)
def
getAllProductUiObj
(
d
):
return
d
(
resourceId
=
"org.c2h4.afei.beauty:id/rv_container"
)
.
child
(
className
=
"android.widget.RelativeLayout"
)
def
getAllProducts
(
d
,
f
,
brand_name
,
product_type
):
done
=
[]
nocnt
=
0
scraped_cnt
=
0
scroll_scale
=
0.1
debug
=
False
while
True
:
products
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/rv_container"
)
.
child
(
className
=
"android.widget.RelativeLayout"
)
products
=
getAllProductUiObj
(
d
)
product
=
None
for
temp
in
products
:
if
temp
.
info
[
'bounds'
][
'top'
]
==
temp
.
info
[
'visibleBounds'
][
'top'
]:
product
=
temp
break
res
=
getProductDetail
(
d
,
brand_name
,
product
,
done
)
try
:
# switch to open reset runtime debug
# if scraped_cnt == 5 and debug is not None:
# debug = True
res
=
getProductDetail
(
d
,
brand_name
,
product
,
done
,
debug
)
except
RuntimeError
:
resetRuntime
(
d
,
brand_name
,
product_type
,
done
)
debug
=
None
nocnt
=
0
continue
if
res
is
not
None
:
saveProduct
(
f
,
res
,
product_type
)
scraped_cnt
+=
1
if
scraped_cnt
%
10
==
0
:
f
.
flush
()
nocnt
=
0
else
:
nocnt
+=
1
if
nocnt
==
3
:
if
nocnt
==
int
(
1
/
scroll_scale
)
+
1
:
break
d
.
swipe_ext
(
"up"
,
scale
=
0.1
)
products
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/rv_container"
)
.
child
(
className
=
"android.widget.RelativeLayout"
)
d
.
swipe_ext
(
"up"
,
scale
=
scroll_scale
)
# handle the last few products in the list
products
=
getAllProductUiObj
(
d
)
for
idx
in
range
(
1
,
products
.
count
):
res
=
getProductDetail
(
d
,
brand_name
,
products
[
idx
],
done
)
if
res
is
not
None
:
saveProduct
(
f
,
res
,
product_type
)
scraped_cnt
+=
1
return
scraped_cnt
def
getProductByType
(
argv
):
brand_name
=
argv
[
1
]
...
...
@@ -119,44 +291,42 @@ def getProductByType(argv):
os
.
makedirs
(
brand_name
)
d
=
u2
.
connect_usb
(
'd52196830204'
)
f
=
open
(
brand_name
+
"/products.csv"
,
'a+'
)
filter
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_filter"
)
filter
.
click
()
ftypes
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_type"
)
ftype_names
=
[]
for
ftype
in
ftypes
:
if
ftype
.
get_text
()
==
"全部品类"
:
continue
else
:
ftype_names
.
append
(
ftype
.
get_text
())
d
(
scrollable
=
True
)
.
scroll
(
steps
=
15
)
ftypes
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_type"
)
for
ftype
in
ftypes
:
if
ftype
.
get_text
()
==
"全部品类"
:
continue
elif
ftype
.
get_text
()
in
ftype_names
:
continue
print
(
d
.
app_current
())
if
getProductFilterUiObj
(
d
)
.
count
==
0
:
clickBrand
(
d
,
brand_name
)
time
.
sleep
(
2
)
gotoProductFilter
(
d
)
else
:
ftype_names
.
append
(
ftype
.
get_text
()
)
adjustProductFilterPosition
(
d
)
d
(
scrollable
=
True
)
.
scroll
.
vert
.
backward
()
scraped_cnt
=
0
with
open
(
brand_name
+
"/products.csv"
,
'a+'
)
as
f
:
ftype_names
=
getAllFilterTypes
(
d
)
for
ftype_name
in
ftype_names
:
print
(
ftype_name
)
for
ftype_name
in
ftype_names
:
# if ftype_name != "护肤水":
# continue
ftype
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_type"
,
text
=
ftype_name
)
if
ftype
.
count
==
0
:
d
(
scrollable
=
True
)
.
scroll
(
steps
=
15
)
ftype
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_type"
,
text
=
ftype_name
)
ftype
.
click
()
filterProduct
(
d
,
ftype_name
)
time
.
sleep
(
1
)
getAllProducts
(
d
,
f
,
brand_name
,
ftype_name
)
filter
=
d
(
resourceId
=
"org.c2h4.afei.beauty:id/tv_filter"
)
temp_cnt
=
getAllProducts
(
d
,
f
,
brand_name
,
ftype_name
)
f
.
flush
()
print
(
"--------------------- Brand Sub-summury ---------------------"
)
print
(
"
%
s
%
s products are scraped from
%
s"
%
(
temp_cnt
,
ftype_name
,
brand_name
))
print
(
"-------------------------------------------------------------"
)
scraped_cnt
+=
temp_cnt
# start select another type
filter
=
getProductFilterUiObj
(
d
)
filter
.
click
()
print
(
"--------------------- Brand Summury ---------------------"
)
print
(
"
%
s products are scraped from
%
s"
%
(
scraped_cnt
,
brand_name
))
print
(
"---------------------------------------------------------"
)
# return to the brand list page in case
# you want to continue scraping anthoer brand
d
.
press
(
"back"
)
if
__name__
==
'__main__'
:
getProductByType
(
sys
.
argv
)
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment