1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
"""DB-API implementation backed by Presto
See http://www.python.org/dev/peps/pep-0249/
Many docstrings in this file are based on the PEP, which is in the public domain.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
from builtins import object
from pyhive import common
from pyhive.common import DBAPITypeObject
# Make all exceptions visible in this module per DB-API
from pyhive.exc import * # noqa
import base64
import getpass
import logging
import requests
from requests.auth import HTTPBasicAuth
try: # Python 3
import urllib.parse as urlparse
except ImportError: # Python 2
import urlparse
# PEP 249 module globals
apilevel = '2.0'
threadsafety = 2 # Threads may share the module and connections.
paramstyle = 'pyformat' # Python extended format codes, e.g. ...WHERE name=%(name)s
_logger = logging.getLogger(__name__)
_escaper = common.ParamEscaper()
def connect(*args, **kwargs):
"""Constructor for creating a connection to the database. See class :py:class:`Connection` for
arguments.
:returns: a :py:class:`Connection` object.
"""
return Connection(*args, **kwargs)
class Connection(object):
"""Presto does not have a notion of a persistent connection.
Thus, these objects are small stateless factories for cursors, which do all the real work.
"""
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
def close(self):
"""Presto does not have anything to close"""
# TODO cancel outstanding queries?
pass
def commit(self):
"""Presto does not support transactions"""
pass
def cursor(self):
"""Return a new :py:class:`Cursor` object using the connection."""
return Cursor(*self._args, **self._kwargs)
def rollback(self):
raise NotSupportedError("Presto does not have transactions") # pragma: no cover
class Cursor(common.DBAPICursor):
"""These objects represent a database cursor, which is used to manage the context of a fetch
operation.
Cursors are not isolated, i.e., any changes done to the database by a cursor are immediately
visible by other cursors or connections.
"""
def __init__(self, host, port='8080', username=None, catalog='hive',
schema='default', poll_interval=1, source='pyhive', session_props=None,
protocol='http', password=None, requests_session=None, requests_kwargs=None):
"""
:param host: hostname to connect to, e.g. ``presto.example.com``
:param port: int -- port, defaults to 8080
:param username: string -- defaults to system user name
:param catalog: string -- defaults to ``hive``
:param schema: string -- defaults to ``default``
:param poll_interval: int -- how often to ask the Presto REST interface for a progress
update, defaults to a second
:param source: string -- arbitrary identifier (shows up in the Presto monitoring page)
:param protocol: string -- network protocol, valid options are ``http`` and ``https``.
defaults to ``http``
:param password: string -- Deprecated. Defaults to ``None``.
Using BasicAuth, requires ``https``.
Prefer ``requests_kwargs={'auth': HTTPBasicAuth(username, password)}``.
May not be specified with ``requests_kwargs['auth']``.
:param requests_session: a ``requests.Session`` object for advanced usage. If absent, this
class will use the default requests behavior of making a new session per HTTP request.
Caller is responsible for closing session.
:param requests_kwargs: Additional ``**kwargs`` to pass to requests
"""
super(Cursor, self).__init__(poll_interval)
# Config
self._host = host
self._port = port
self._username = username or getpass.getuser()
self._catalog = catalog
self._schema = schema
self._arraysize = 1
self._poll_interval = poll_interval
self._source = source
self._session_props = session_props if session_props is not None else {}
if protocol not in ('http', 'https'):
raise ValueError("Protocol must be http/https, was {!r}".format(protocol))
self._protocol = protocol
self._requests_session = requests_session or requests
requests_kwargs = dict(requests_kwargs) if requests_kwargs is not None else {}
if password is not None and 'auth' in requests_kwargs:
raise ValueError("Cannot use both password and requests_kwargs authentication")
for k in ('method', 'url', 'data', 'headers'):
if k in requests_kwargs:
raise ValueError("Cannot override requests argument {}".format(k))
if password is not None:
requests_kwargs['auth'] = HTTPBasicAuth(username, password)
if protocol != 'https':
raise ValueError("Protocol must be https when passing a password")
self._requests_kwargs = requests_kwargs
self._reset_state()
def _reset_state(self):
"""Reset state about the previous query in preparation for running another query"""
super(Cursor, self)._reset_state()
self._nextUri = None
self._columns = None
@property
def description(self):
"""This read-only attribute is a sequence of 7-item sequences.
Each of these sequences contains information describing one result column:
- name
- type_code
- display_size (None in current implementation)
- internal_size (None in current implementation)
- precision (None in current implementation)
- scale (None in current implementation)
- null_ok (always True in current implementation)
The ``type_code`` can be interpreted by comparing it to the Type Objects specified in the
section below.
"""
# Sleep until we're done or we got the columns
self._fetch_while(
lambda: self._columns is None and
self._state not in (self._STATE_NONE, self._STATE_FINISHED)
)
if self._columns is None:
return None
return [
# name, type_code, display_size, internal_size, precision, scale, null_ok
(col['name'], col['type'], None, None, None, None, True)
for col in self._columns
]
def execute(self, operation, parameters=None):
"""Prepare and execute a database operation (query or command).
Return values are not defined.
"""
headers = {
'X-Presto-Catalog': self._catalog,
'X-Presto-Schema': self._schema,
'X-Presto-Source': self._source,
'X-Presto-User': self._username,
}
if self._session_props:
headers['X-Presto-Session'] = ','.join(
'{}={}'.format(propname, propval)
for propname, propval in self._session_props.items()
)
# Prepare statement
if parameters is None:
sql = operation
else:
sql = operation % _escaper.escape_args(parameters)
self._reset_state()
self._state = self._STATE_RUNNING
url = urlparse.urlunparse((
self._protocol,
'{}:{}'.format(self._host, self._port), '/v1/statement', None, None, None))
_logger.info('%s', sql)
_logger.debug("Headers: %s", headers)
response = self._requests_session.post(
url, data=sql.encode('utf-8'), headers=headers, **self._requests_kwargs)
self._process_response(response)
def cancel(self):
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
return
response = self._requests_session.delete(self._nextUri, **self._requests_kwargs)
if response.status_code != requests.codes.no_content:
fmt = "Unexpected status code after cancel {}\n{}"
raise OperationalError(fmt.format(response.status_code, response.content))
self._state = self._STATE_FINISHED
self._nextUri = None
def poll(self):
"""Poll for and return the raw status data provided by the Presto REST API.
:returns: dict -- JSON status information or ``None`` if the query is done
:raises: ``ProgrammingError`` when no query has been started
.. note::
This is not a part of DB-API.
"""
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")
if self._nextUri is None:
assert self._state == self._STATE_FINISHED, "Should be finished if nextUri is None"
return None
response = self._requests_session.get(self._nextUri, **self._requests_kwargs)
self._process_response(response)
return response.json()
def _fetch_more(self):
"""Fetch the next URI and update state"""
self._process_response(self._requests_session.get(self._nextUri, **self._requests_kwargs))
def _decode_binary(self, rows):
# As of Presto 0.69, binary data is returned as the varbinary type in base64 format
# This function decodes base64 data in place
for i, col in enumerate(self.description):
if col[1] == 'varbinary':
for row in rows:
if row[i] is not None:
row[i] = base64.b64decode(row[i])
def _process_response(self, response):
"""Given the JSON response from Presto's REST API, update the internal state with the next
URI and any data from the response
"""
# TODO handle HTTP 503
if response.status_code != requests.codes.ok:
fmt = "Unexpected status code {}\n{}"
raise OperationalError(fmt.format(response.status_code, response.content))
response_json = response.json()
_logger.debug("Got response %s", response_json)
assert self._state == self._STATE_RUNNING, "Should be running if processing response"
self._nextUri = response_json.get('nextUri')
self._columns = response_json.get('columns')
if 'X-Presto-Clear-Session' in response.headers:
propname = response.headers['X-Presto-Clear-Session']
self._session_props.pop(propname, None)
if 'X-Presto-Set-Session' in response.headers:
propname, propval = response.headers['X-Presto-Set-Session'].split('=', 1)
self._session_props[propname] = propval
if 'data' in response_json:
assert self._columns
new_data = response_json['data']
self._decode_binary(new_data)
self._data += map(tuple, new_data)
if 'nextUri' not in response_json:
self._state = self._STATE_FINISHED
if 'error' in response_json:
raise DatabaseError(response_json['error'])
#
# Type Objects and Constructors
#
# See types in presto-main/src/main/java/com/facebook/presto/tuple/TupleInfo.java
FIXED_INT_64 = DBAPITypeObject(['bigint'])
VARIABLE_BINARY = DBAPITypeObject(['varchar'])
DOUBLE = DBAPITypeObject(['double'])
BOOLEAN = DBAPITypeObject(['boolean'])