1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess as sp
import sys
import time
from typing import Any, List, Optional, Union
import urllib3
DEFAULT_HOST = '127.0.0.1'
DEFAULT_PORT = 4723
STARTUP_TIMEOUT_MS = 60000
MAIN_SCRIPT_PATH = 'appium/build/lib/main.js'
STATUS_URL = '/wd/hub/status'
def find_executable(executable: str) -> Optional[str]:
path = os.environ['PATH']
paths = path.split(os.pathsep)
base, ext = os.path.splitext(executable)
if sys.platform == 'win32' and not ext:
executable = executable + '.exe'
if os.path.isfile(executable):
return executable
for p in paths:
full_path = os.path.join(p, executable)
if os.path.isfile(full_path):
return full_path
return None
def poll_url(host: str, port: int, path: str, timeout_ms: int) -> bool:
time_started_sec = time.time()
while time.time() < time_started_sec + timeout_ms / 1000.0:
try:
conn = urllib3.PoolManager(timeout=1.0)
resp = conn.request('HEAD', f'http://{host}:{port}{path}')
if resp.status < 400:
return True
except Exception:
pass
time.sleep(1.0)
return False
class AppiumServiceError(RuntimeError):
pass
class AppiumService:
def __init__(self) -> None:
self._process: Optional[sp.Popen] = None
self._cmd: Optional[List] = None
def _get_node(self) -> str:
if not hasattr(self, '_node_executable'):
self._node_executable = find_executable('node')
if self._node_executable is None:
raise AppiumServiceError('NodeJS main executable cannot be found. ' +
'Make sure it is installed and present in PATH')
return self._node_executable
def _get_npm(self) -> str:
if not hasattr(self, '_npm_executable'):
self._npm_executable = find_executable('npm.cmd' if sys.platform == 'win32' else 'npm')
if self._npm_executable is None:
raise AppiumServiceError('Node Package Manager executable cannot be found. ' +
'Make sure it is installed and present in PATH')
return self._npm_executable
def _get_main_script(self) -> Union[str, bytes]:
if not hasattr(self, '_main_script'):
for args in [['root', '-g'], ['root']]:
try:
modules_root = sp.check_output([self._get_npm()] + args).strip().decode('utf-8')
if os.path.exists(os.path.join(modules_root, MAIN_SCRIPT_PATH)):
self._main_script: Union[str, bytes] = os.path.join(modules_root, MAIN_SCRIPT_PATH)
break
except sp.CalledProcessError:
continue
if not hasattr(self, '_main_script'):
try:
self._main_script = sp.check_output(
[self._get_node(),
'-e',
'console.log(require.resolve("{}"))'.format(MAIN_SCRIPT_PATH)]).strip()
except sp.CalledProcessError as e:
raise AppiumServiceError(e.output) from e
return self._main_script
@staticmethod
def _parse_port(args: List[str]) -> int:
for idx, arg in enumerate(args or []):
if arg in ('--port', '-p') and idx < len(args) - 1:
return int(args[idx + 1])
return DEFAULT_PORT
@staticmethod
def _parse_host(args: List[str]) -> str:
for idx, arg in enumerate(args or []):
if arg in ('--address', '-a') and idx < len(args) - 1:
return args[idx + 1]
return DEFAULT_HOST
def start(self, **kwargs: Any) -> sp.Popen:
"""Starts Appium service with given arguments.
The service will be forcefully restarted if it is already running.
Keyword Args:
env (dict): Environment variables mapping. The default system environment,
which is inherited from the parent process, is assigned by default.
node (str): The full path to the main NodeJS executable. The service will try
to retrieve it automatically by default.
stdout (int): Check the documentation for subprocess.Popen for more details.
The default value is subprocess.DEVNULL on Windows and subprocess.PIPE on other platforms.
stderr (int): Check the documentation for subprocess.Popen for more details.
The default value is subprocess.DEVNULL on Windows and subprocess.PIPE on other platforms.
timeout_ms (int): The maximum time to wait until Appium process starts listening
for HTTP connections. If set to zero or a negative number then no wait will be applied.
60000 ms by default.
main_script (str): The full path to the main Appium executable
(usually located at build/lib/main.js). If this is not set
then the service tries to detect the path automatically.
args (str): List of Appium arguments (all must be strings). Check
https://appium.io/docs/en/writing-running-appium/server-args/ for more details
about possible arguments and their values.
Returns:
You can use Popen.communicate interface or stderr/stdout properties
of the instance (stdout/stderr must not be set to None in such case) in order to retrieve the actual process
output.
"""
self.stop()
env = kwargs['env'] if 'env' in kwargs else None
node = kwargs['node'] if 'node' in kwargs else self._get_node()
# A workaround for https://github.com/appium/python-client/issues/534
default_std = sp.DEVNULL if sys.platform == 'win32' else sp.PIPE
stdout = kwargs['stdout'] if 'stdout' in kwargs else default_std
stderr = kwargs['stderr'] if 'stderr' in kwargs else default_std
timeout_ms = int(kwargs['timeout_ms']) if 'timeout_ms' in kwargs else STARTUP_TIMEOUT_MS
main_script = kwargs['main_script'] if 'main_script' in kwargs else self._get_main_script()
args = [node, main_script]
if 'args' in kwargs:
args.extend(kwargs['args'])
self._cmd = args
self._process = sp.Popen(args=args, stdout=stdout, stderr=stderr, env=env)
host = self._parse_host(args)
port = self._parse_port(args)
error_msg: Optional[str] = None
if not self.is_running or (timeout_ms > 0 and not poll_url(host, port, STATUS_URL, timeout_ms)):
error_msg = f'Appium has failed to start on {host}:{port} within {timeout_ms}ms timeout'
if error_msg is not None:
if stderr == sp.PIPE and self._process.stderr is not None:
err_output = self._process.stderr.read()
if err_output:
error_msg += f'\nOriginal error: {str(err_output)}'
self.stop()
raise AppiumServiceError(error_msg)
return self._process
def stop(self) -> bool:
"""Stops Appium service if it is running.
The call will be ignored if the service is not running
or has been already stopped.
Returns:
`True` if the service was running before being stopped
"""
is_terminated = False
if self.is_running:
self._process.terminate() # type: ignore
is_terminated = True
self._process = None
self._cmd = None
return is_terminated
@property
def is_running(self) -> bool:
"""Check if the service is running.
Returns:
bool: `True` if the service is running
"""
return self._process is not None and self._process.poll() is None
@property
def is_listening(self) -> bool:
"""Check if the service is listening on the given/default host/port.
The fact, that the service is running, does not always mean it is listening.
the default host/port values can be customized by providing --address/--port
command line arguments while starting the service.
Returns:
bool: `True` if the service is running and listening on the given/default host/port
"""
if not self.is_running or self._cmd is None:
return False
host = self._parse_host(self._cmd)
port = self._parse_port(self._cmd)
return self.is_running and poll_url(host, port, STATUS_URL, 1000)
if __name__ == '__main__':
assert(find_executable('node') is not None)
assert(find_executable('npm') is not None)
service = AppiumService()
service.start(args=['--address', '127.0.0.1', '-p', str(DEFAULT_PORT)])
# service.start(args=['--address', '127.0.0.1', '-p', '80'], timeout_ms=2000)
assert service.is_running
assert service.is_listening
service.stop()
assert(not service.is_running)
assert(not service.is_listening)