Commit 6c3b89ad authored by litaolemo's avatar litaolemo

update

parent 8405f1f9
# meta_base_code
metabase 数据开发相关代码
\ No newline at end of file
metabase 数据开发相关代码
服务器 airflow002
1. 切换权限 sudo su - gmuser
2. source /srv/envs/esmm/bin/activate
3. python crawler/crawler_sys/utils/get_query_result.py
/opt/spark/bin/spark-submit --master yarn --deploy-mode client --queue root.strategy --driver-memory 16g --executor-memory 1g --executor-cores 1 --num-executors 70 --conf spark.default.parallelism=100 --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3 --conf spark.executorEnv.LD_LIBRARY_PATH="/opt/java/jdk1.8.0_181/jre/lib/amd64/server:/opt/cloudera/parcels/CDH-5.16.1-1.cdh5.16.1.p0.3/lib64" --conf spark.locality.wait=0 --jars /srv/apps/tispark-core-2.1-SNAPSHOT-jar-with-dependencies.jar,/srv/apps/spark-connector_2.11-1.9.0-rc2.jar,/srv/apps/mysql-connector-java-5.1.38.jar /srv/apps/meta_base_code/task/conent_detail_page_grayscale_ctr.py
# -*- coding:UTF-8 -*-
# @Time : 2020/8/21 9:34
# @File : __init__.py.py
# @email : litao@igengmei.com
# @author : litao
\ No newline at end of file
http://www.opensource.org/licenses/mit-license.php
Copyright 2007-2011 David Alan Cridland
Copyright 2011 Lance Stout
Copyright 2012 Tyler L Hobbs
Permission is hereby granted, free of charge, to any person obtaining a copy of this
software and associated documentation files (the "Software"), to deal in the Software
without restriction, including without limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or
substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.
Metadata-Version: 2.1
Name: pure-sasl
Version: 0.6.1
Summary: Pure Python client SASL implementation
Home-page: http://github.com/thobbs/pure-sasl
Author: Tyler Hobbs
Author-email: tylerlhobbs@gmail.com
Maintainer: Alex Shafer
Maintainer-email: ashafer01@gmail.com
License: MIT
Keywords: sasl
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: MIT License
Classifier: Natural Language :: English
Classifier: Operating System :: OS Independent
Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Provides-Extra: gssapi
Requires-Dist: kerberos (>=1.3.0) ; extra == 'gssapi'
This package provides a reasonably high-level SASL client written
in pure Python. New mechanisms may be integrated easily, but by default,
support for PLAIN, ANONYMOUS, EXTERNAL, CRAM-MD5, DIGEST-MD5, and GSSAPI are
provided.
pure_sasl-0.6.1.dist-info/INSTALLER,sha256=zuuue4knoyJ-UwPPXg8fezS7VCrXJQrAP7zeNuwvFQg,4
pure_sasl-0.6.1.dist-info/LICENSE,sha256=NpiBGKBhU7n98ItgAMlDXwQw_NH-rDj9hPscwmf0pT0,1172
pure_sasl-0.6.1.dist-info/METADATA,sha256=BVuBACWERcyUyMiB1TOkLRmwd2U6J9E43aDUaevVl8M,1215
pure_sasl-0.6.1.dist-info/RECORD,,
pure_sasl-0.6.1.dist-info/WHEEL,sha256=wy6I0RkXf1SHQI9WgMGMZtkrPqc-c49UWrTH1_nKFnQ,93
pure_sasl-0.6.1.dist-info/top_level.txt,sha256=2acq8MK3X4dZrUf_tIPo23sT7L9lDb9BnXcpeiazxZk,9
puresasl/__init__.py,sha256=PLhXLobzrQmCWjZvQvbx4yZicVvRuUg8Pxt-QKgpIq8,1026
puresasl/__init__.pyc,,
puresasl/client.py,sha256=uVZDRtlvKYDFa5qNWz0hgHtrpRRXUWqKWNiu73KZMn4,9929
puresasl/client.pyc,,
puresasl/mechanisms.py,sha256=h5l7cMkFwyh9hhzhqzE09jWXPAsBmI6bXe_jAz122vE,19319
puresasl/mechanisms.pyc,,
Wheel-Version: 1.0
Generator: bdist_wheel (0.33.1)
Root-Is-Purelib: true
Tag: cp27-none-any
from __future__ import absolute_import
from __future__ import unicode_literals
__version__ = '0.6.1'
"""Package private common utilities. Do not use directly.
Many docstrings in this file are based on PEP-249, which is in the public domain.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
from builtins import bytes
from builtins import int
from builtins import object
from builtins import str
from past.builtins import basestring
from pyhive import exc
import abc
import collections
import time
from future.utils import with_metaclass
from itertools import islice
class DBAPICursor(with_metaclass(abc.ABCMeta, object)):
"""Base class for some common DB-API logic"""
_STATE_NONE = 0
_STATE_RUNNING = 1
_STATE_FINISHED = 2
def __init__(self, poll_interval=1):
self._poll_interval = poll_interval
self._reset_state()
self.lastrowid = None
def _reset_state(self):
"""Reset state about the previous query in preparation for running another query"""
# State to return as part of DB-API
self._rownumber = 0
# Internal helper state
self._state = self._STATE_NONE
self._data = collections.deque()
self._columns = None
def _fetch_while(self, fn):
while fn():
self._fetch_more()
if fn():
time.sleep(self._poll_interval)
@abc.abstractproperty
def description(self):
raise NotImplementedError # pragma: no cover
def close(self):
"""By default, do nothing"""
pass
@abc.abstractmethod
def _fetch_more(self):
"""Get more results, append it to ``self._data``, and update ``self._state``."""
raise NotImplementedError # pragma: no cover
@property
def rowcount(self):
"""By default, return -1 to indicate that this is not supported."""
return -1
@abc.abstractmethod
def execute(self, operation, parameters=None):
"""Prepare and execute a database operation (query or command).
Parameters may be provided as sequence or mapping and will be bound to variables in the
operation. Variables are specified in a database-specific notation (see the module's
``paramstyle`` attribute for details).
Return values are not defined.
"""
raise NotImplementedError # pragma: no cover
def executemany(self, operation, seq_of_parameters):
"""Prepare a database operation (query or command) and then execute it against all parameter
sequences or mappings found in the sequence ``seq_of_parameters``.
Only the final result set is retained.
Return values are not defined.
"""
for parameters in seq_of_parameters[:-1]:
self.execute(operation, parameters)
while self._state != self._STATE_FINISHED:
self._fetch_more()
if seq_of_parameters:
self.execute(operation, seq_of_parameters[-1])
def fetchone(self):
"""Fetch the next row of a query result set, returning a single sequence, or ``None`` when
no more data is available.
An :py:class:`~pyhive.exc.Error` (or subclass) exception is raised if the previous call to
:py:meth:`execute` did not produce any result set or no call was issued yet.
"""
if self._state == self._STATE_NONE:
raise exc.ProgrammingError("No query yet")
# Sleep until we're done or we have some data to return
self._fetch_while(lambda: not self._data and self._state != self._STATE_FINISHED)
if not self._data:
return None
else:
self._rownumber += 1
return self._data.popleft()
def fetchmany(self, size=None):
"""Fetch the next set of rows of a query result, returning a sequence of sequences (e.g. a
list of tuples). An empty sequence is returned when no more rows are available.
The number of rows to fetch per call is specified by the parameter. If it is not given, the
cursor's arraysize determines the number of rows to be fetched. The method should try to
fetch as many rows as indicated by the size parameter. If this is not possible due to the
specified number of rows not being available, fewer rows may be returned.
An :py:class:`~pyhive.exc.Error` (or subclass) exception is raised if the previous call to
:py:meth:`execute` did not produce any result set or no call was issued yet.
"""
if size is None:
size = self.arraysize
return list(islice(iter(self.fetchone, None), size))
def fetchall(self):
"""Fetch all (remaining) rows of a query result, returning them as a sequence of sequences
(e.g. a list of tuples).
An :py:class:`~pyhive.exc.Error` (or subclass) exception is raised if the previous call to
:py:meth:`execute` did not produce any result set or no call was issued yet.
"""
return list(iter(self.fetchone, None))
@property
def arraysize(self):
"""This read/write attribute specifies the number of rows to fetch at a time with
:py:meth:`fetchmany`. It defaults to 1 meaning to fetch a single row at a time.
"""
return self._arraysize
@arraysize.setter
def arraysize(self, value):
self._arraysize = value
def setinputsizes(self, sizes):
"""Does nothing by default"""
pass
def setoutputsize(self, size, column=None):
"""Does nothing by default"""
pass
#
# Optional DB API Extensions
#
@property
def rownumber(self):
"""This read-only attribute should provide the current 0-based index of the cursor in the
result set.
The index can be seen as index of the cursor in a sequence (the result set). The next fetch
operation will fetch the row indexed by ``rownumber`` in that sequence.
"""
return self._rownumber
def __next__(self):
"""Return the next row from the currently executing SQL statement using the same semantics
as :py:meth:`fetchone`. A ``StopIteration`` exception is raised when the result set is
exhausted.
"""
one = self.fetchone()
if one is None:
raise StopIteration
else:
return one
next = __next__
def __iter__(self):
"""Return self to make cursors compatible to the iteration protocol."""
return self
class DBAPITypeObject(object):
# Taken from http://www.python.org/dev/peps/pep-0249/#implementation-hints
def __init__(self, *values):
self.values = values
def __cmp__(self, other):
if other in self.values:
return 0
if other < self.values:
return 1
else:
return -1
class ParamEscaper(object):
def escape_args(self, parameters):
if isinstance(parameters, dict):
return {k: self.escape_item(v) for k, v in parameters.items()}
elif isinstance(parameters, (list, tuple)):
return tuple(self.escape_item(x) for x in parameters)
else:
raise exc.ProgrammingError("Unsupported param format: {}".format(parameters))
def escape_number(self, item):
return item
def escape_string(self, item):
# Need to decode UTF-8 because of old sqlalchemy.
# Newer SQLAlchemy checks dialect.supports_unicode_binds before encoding Unicode strings
# as byte strings. The old version always encodes Unicode as byte strings, which breaks
# string formatting here.
if isinstance(item, bytes):
item = item.decode('utf-8')
# This is good enough when backslashes are literal, newlines are just followed, and the way
# to escape a single quote is to put two single quotes.
# (i.e. only special character is single quote)
return "'{}'".format(item.replace("'", "''"))
def escape_sequence(self, item):
l = map(str, map(self.escape_item, item))
return '(' + ','.join(l) + ')'
def escape_item(self, item):
if item is None:
return 'NULL'
elif isinstance(item, (int, float)):
return self.escape_number(item)
elif isinstance(item, basestring):
return self.escape_string(item)
elif isinstance(item, collections.Iterable):
return self.escape_sequence(item)
else:
raise exc.ProgrammingError("Unsupported object {}".format(item))
class UniversalSet(object):
"""set containing everything"""
def __contains__(self, item):
return True
"""
Package private common utilities. Do not use directly.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
__all__ = [
'Error', 'Warning', 'InterfaceError', 'DatabaseError', 'InternalError', 'OperationalError',
'ProgrammingError', 'DataError', 'NotSupportedError',
]
class Error(Exception):
"""Exception that is the base class of all other error exceptions.
You can use this to catch all errors with one single except statement.
"""
pass
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting, etc."""
pass
class InterfaceError(Error):
"""Exception raised for errors that are related to the database interface rather than the
database itself.
"""
pass
class DatabaseError(Error):
"""Exception raised for errors that are related to the database."""
pass
class InternalError(DatabaseError):
"""Exception raised when the database encounters an internal error, e.g. the cursor is not valid
anymore, the transaction is out of sync, etc."""
pass
class OperationalError(DatabaseError):
"""Exception raised for errors that are related to the database's operation and not necessarily
under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name
is not found, a transaction could not be processed, a memory allocation error occurred during
processing, etc.
"""
pass
class ProgrammingError(DatabaseError):
"""Exception raised for programming errors, e.g. table not found or already exists, syntax error
in the SQL statement, wrong number of parameters specified, etc.
"""
pass
class DataError(DatabaseError):
"""Exception raised for errors that are due to problems with the processed data like division by
zero, numeric value out of range, etc.
"""
pass
class NotSupportedError(DatabaseError):
"""Exception raised in case a method or database API was used which is not supported by the
database, e.g. requesting a ``.rollback()`` on a connection that does not support transaction or
has transactions turned off.
"""
pass
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
"""Integration between SQLAlchemy and Presto.
Some code based on
https://github.com/zzzeek/sqlalchemy/blob/rel_0_5/lib/sqlalchemy/databases/sqlite.py
which is released under the MIT license.
"""
from __future__ import absolute_import
from __future__ import unicode_literals
import re
from sqlalchemy import exc
from sqlalchemy import types
from sqlalchemy import util
# TODO shouldn't use mysql type
from sqlalchemy.databases import mysql
from sqlalchemy.engine import default
from sqlalchemy.sql import compiler
from sqlalchemy.sql.compiler import SQLCompiler
from pyhive import presto
from pyhive.common import UniversalSet
class PrestoIdentifierPreparer(compiler.IdentifierPreparer):
# Just quote everything to make things simpler / easier to upgrade
reserved_words = UniversalSet()
_type_map = {
'boolean': types.Boolean,
'tinyint': mysql.MSTinyInteger,
'smallint': types.SmallInteger,
'integer': types.Integer,
'bigint': types.BigInteger,
'real': types.Float,
'double': types.Float,
'varchar': types.String,
'timestamp': types.TIMESTAMP,
'date': types.DATE,
'varbinary': types.VARBINARY,
}
class PrestoCompiler(SQLCompiler):
def visit_char_length_func(self, fn, **kw):
return 'length{}'.format(self.function_argspec(fn, **kw))
class PrestoTypeCompiler(compiler.GenericTypeCompiler):
def visit_CLOB(self, type_, **kw):
raise ValueError("Presto does not support the CLOB column type.")
def visit_NCLOB(self, type_, **kw):
raise ValueError("Presto does not support the NCLOB column type.")
def visit_DATETIME(self, type_, **kw):
raise ValueError("Presto does not support the DATETIME column type.")
def visit_FLOAT(self, type_, **kw):
return 'DOUBLE'
def visit_TEXT(self, type_, **kw):
if type_.length:
return 'VARCHAR({:d})'.format(type_.length)
else:
return 'VARCHAR'
class PrestoDialect(default.DefaultDialect):
name = 'presto'
driver = 'rest'
paramstyle = 'pyformat'
preparer = PrestoIdentifierPreparer
statement_compiler = PrestoCompiler
supports_alter = False
supports_pk_autoincrement = False
supports_default_values = False
supports_empty_insert = False
supports_unicode_statements = True
supports_unicode_binds = True
returns_unicode_strings = True
description_encoding = None
supports_native_boolean = True
type_compiler = PrestoTypeCompiler
@classmethod
def dbapi(cls):
return presto
def create_connect_args(self, url):
db_parts = (url.database or 'hive').split('/')
kwargs = {
'host': url.host,
'port': url.port or 8080,
'username': url.username,
'password': url.password
}
kwargs.update(url.query)
if len(db_parts) == 1:
kwargs['catalog'] = db_parts[0]
elif len(db_parts) == 2:
kwargs['catalog'] = db_parts[0]
kwargs['schema'] = db_parts[1]
else:
raise ValueError("Unexpected database format {}".format(url.database))
return [], kwargs
def get_schema_names(self, connection, **kw):
return [row.Schema for row in connection.execute('SHOW SCHEMAS')]
def _get_table_columns(self, connection, table_name, schema):
full_table = self.identifier_preparer.quote_identifier(table_name)
if schema:
full_table = self.identifier_preparer.quote_identifier(schema) + '.' + full_table
try:
return connection.execute('SHOW COLUMNS FROM {}'.format(full_table))
except (presto.DatabaseError, exc.DatabaseError) as e:
# Normally SQLAlchemy should wrap this exception in sqlalchemy.exc.DatabaseError, which
# it successfully does in the Hive version. The difference with Presto is that this
# error is raised when fetching the cursor's description rather than the initial execute
# call. SQLAlchemy doesn't handle this. Thus, we catch the unwrapped
# presto.DatabaseError here.
# Does the table exist?
msg = (
e.args[0].get('message') if e.args and isinstance(e.args[0], dict)
else e.args[0] if e.args and isinstance(e.args[0], str)
else None
)
regex = r"Table\ \'.*{}\'\ does\ not\ exist".format(re.escape(table_name))
if msg and re.search(regex, msg):
raise exc.NoSuchTableError(table_name)
else:
raise
def has_table(self, connection, table_name, schema=None):
try:
self._get_table_columns(connection, table_name, schema)
return True
except exc.NoSuchTableError:
return False
def get_columns(self, connection, table_name, schema=None, **kw):
rows = self._get_table_columns(connection, table_name, schema)
result = []
for row in rows:
try:
coltype = _type_map[row.Type]
except KeyError:
util.warn("Did not recognize type '%s' of column '%s'" % (row.Type, row.Column))
coltype = types.NullType
result.append({
'name': row.Column,
'type': coltype,
# newer Presto no longer includes this column
'nullable': getattr(row, 'Null', True),
'default': None,
})
return result
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
# Hive has no support for foreign keys.
return []
def get_pk_constraint(self, connection, table_name, schema=None, **kw):
# Hive has no support for primary keys.
return []
def get_indexes(self, connection, table_name, schema=None, **kw):
rows = self._get_table_columns(connection, table_name, schema)
col_names = []
for row in rows:
part_key = 'Partition Key'
# Presto puts this information in one of 3 places depending on version
# - a boolean column named "Partition Key"
# - a string in the "Comment" column
# - a string in the "Extra" column
is_partition_key = (
(part_key in row and row[part_key])
or row['Comment'].startswith(part_key)
or ('Extra' in row and 'partition key' in row['Extra'])
)
if is_partition_key:
col_names.append(row['Column'])
if col_names:
return [{'name': 'partition', 'column_names': col_names, 'unique': False}]
else:
return []
def get_table_names(self, connection, schema=None, **kw):
query = 'SHOW TABLES'
if schema:
query += ' FROM ' + self.identifier_preparer.quote_identifier(schema)
return [row.Table for row in connection.execute(query)]
def do_rollback(self, dbapi_connection):
# No transactions for Presto
pass
def _check_unicode_returns(self, connection, additional_tests=None):
# requests gives back Unicode strings
return True
def _check_unicode_description(self, connection):
# requests gives back Unicode strings
return True
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from thrift.Thrift import TProcessor, TMessageType
from thrift.protocol import TProtocolDecorator, TMultiplexedProtocol
from thrift.protocol.TProtocol import TProtocolException
class TMultiplexedProcessor(TProcessor):
def __init__(self):
self.defaultProcessor = None
self.services = {}
def registerDefault(self, processor):
"""
If a non-multiplexed processor connects to the server and wants to
communicate, use the given processor to handle it. This mechanism
allows servers to upgrade from non-multiplexed to multiplexed in a
backwards-compatible way and still handle old clients.
"""
self.defaultProcessor = processor
def registerProcessor(self, serviceName, processor):
self.services[serviceName] = processor
def on_message_begin(self, func):
for key in self.services.keys():
self.services[key].on_message_begin(func)
def process(self, iprot, oprot):
(name, type, seqid) = iprot.readMessageBegin()
if type != TMessageType.CALL and type != TMessageType.ONEWAY:
raise TProtocolException(
TProtocolException.NOT_IMPLEMENTED,
"TMultiplexedProtocol only supports CALL & ONEWAY")
index = name.find(TMultiplexedProtocol.SEPARATOR)
if index < 0:
if self.defaultProcessor:
return self.defaultProcessor.process(
StoredMessageProtocol(iprot, (name, type, seqid)), oprot)
else:
raise TProtocolException(
TProtocolException.NOT_IMPLEMENTED,
"Service name not found in message name: " + name + ". " +
"Did you forget to use TMultiplexedProtocol in your client?")
serviceName = name[0:index]
call = name[index + len(TMultiplexedProtocol.SEPARATOR):]
if serviceName not in self.services:
raise TProtocolException(
TProtocolException.NOT_IMPLEMENTED,
"Service name not found: " + serviceName + ". " +
"Did you forget to call registerProcessor()?")
standardMessage = (call, type, seqid)
return self.services[serviceName].process(
StoredMessageProtocol(iprot, standardMessage), oprot)
class StoredMessageProtocol(TProtocolDecorator.TProtocolDecorator):
def __init__(self, protocol, messageBegin):
self.messageBegin = messageBegin
def readMessageBegin(self):
return self.messageBegin
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from thrift.Thrift import TType
TYPE_IDX = 1
SPEC_ARGS_IDX = 3
SPEC_ARGS_CLASS_REF_IDX = 0
SPEC_ARGS_THRIFT_SPEC_IDX = 1
def fix_spec(all_structs):
"""Wire up recursive references for all TStruct definitions inside of each thrift_spec."""
for struc in all_structs:
spec = struc.thrift_spec
for thrift_spec in spec:
if thrift_spec is None:
continue
elif thrift_spec[TYPE_IDX] == TType.STRUCT:
other = thrift_spec[SPEC_ARGS_IDX][SPEC_ARGS_CLASS_REF_IDX].thrift_spec
thrift_spec[SPEC_ARGS_IDX][SPEC_ARGS_THRIFT_SPEC_IDX] = other
elif thrift_spec[TYPE_IDX] in (TType.LIST, TType.SET):
_fix_list_or_set(thrift_spec[SPEC_ARGS_IDX])
elif thrift_spec[TYPE_IDX] == TType.MAP:
_fix_map(thrift_spec[SPEC_ARGS_IDX])
def _fix_list_or_set(element_type):
# For a list or set, the thrift_spec entry looks like,
# (1, TType.LIST, 'lister', (TType.STRUCT, [RecList, None], False), None, ), # 1
# so ``element_type`` will be,
# (TType.STRUCT, [RecList, None], False)
if element_type[0] == TType.STRUCT:
element_type[1][1] = element_type[1][0].thrift_spec
elif element_type[0] in (TType.LIST, TType.SET):
_fix_list_or_set(element_type[1])
elif element_type[0] == TType.MAP:
_fix_map(element_type[1])
def _fix_map(element_type):
# For a map of key -> value type, ``element_type`` will be,
# (TType.I16, None, TType.STRUCT, [RecMapBasic, None], False), None, )
# which is just a normal struct definition.
#
# For a map of key -> list / set, ``element_type`` will be,
# (TType.I16, None, TType.LIST, (TType.STRUCT, [RecMapList, None], False), False)
# and we need to process the 3rd element as a list.
#
# For a map of key -> map, ``element_type`` will be,
# (TType.I16, None, TType.MAP, (TType.I16, None, TType.STRUCT,
# [RecMapMap, None], False), False)
# and need to process 3rd element as a map.
# Is the map key a struct?
if element_type[0] == TType.STRUCT:
element_type[1][1] = element_type[1][0].thrift_spec
elif element_type[0] in (TType.LIST, TType.SET):
_fix_list_or_set(element_type[1])
elif element_type[0] == TType.MAP:
_fix_map(element_type[1])
# Is the map value a struct?
if element_type[2] == TType.STRUCT:
element_type[3][1] = element_type[3][0].thrift_spec
elif element_type[2] in (TType.LIST, TType.SET):
_fix_list_or_set(element_type[3])
elif element_type[2] == TType.MAP:
_fix_map(element_type[3])
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from os import path
from SCons.Builder import Builder
from six.moves import map
def scons_env(env, add=''):
opath = path.dirname(path.abspath('$TARGET'))
lstr = 'thrift --gen cpp -o ' + opath + ' ' + add + ' $SOURCE'
cppbuild = Builder(action=lstr)
env.Append(BUILDERS={'ThriftCpp': cppbuild})
def gen_cpp(env, dir, file):
scons_env(env)
suffixes = ['_types.h', '_types.cpp']
targets = map(lambda s: 'gen-cpp/' + file + s, suffixes)
return env.ThriftCpp(targets, dir + file + '.thrift')
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from .protocol import TBinaryProtocol
from .transport import TTransport
def serialize(thrift_object,
protocol_factory=TBinaryProtocol.TBinaryProtocolFactory()):
transport = TTransport.TMemoryBuffer()
protocol = protocol_factory.getProtocol(transport)
thrift_object.write(protocol)
return transport.getvalue()
def deserialize(base,
buf,
protocol_factory=TBinaryProtocol.TBinaryProtocolFactory()):
transport = TTransport.TMemoryBuffer(buf)
protocol = protocol_factory.getProtocol(transport)
base.read(protocol)
return base
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
import logging
import socket
import struct
from .transport.TTransport import TTransportException, TTransportBase, TMemoryBuffer
from io import BytesIO
from collections import deque
from contextlib import contextmanager
from tornado import gen, iostream, ioloop, tcpserver, concurrent
__all__ = ['TTornadoServer', 'TTornadoStreamTransport']
logger = logging.getLogger(__name__)
class _Lock(object):
def __init__(self):
self._waiters = deque()
def acquired(self):
return len(self._waiters) > 0
@gen.coroutine
def acquire(self):
blocker = self._waiters[-1] if self.acquired() else None
future = concurrent.Future()
self._waiters.append(future)
if blocker:
yield blocker
raise gen.Return(self._lock_context())
def release(self):
assert self.acquired(), 'Lock not aquired'
future = self._waiters.popleft()
future.set_result(None)
@contextmanager
def _lock_context(self):
try:
yield
finally:
self.release()
class TTornadoStreamTransport(TTransportBase):
"""a framed, buffered transport over a Tornado stream"""
def __init__(self, host, port, stream=None, io_loop=None):
self.host = host
self.port = port
self.io_loop = io_loop or ioloop.IOLoop.current()
self.__wbuf = BytesIO()
self._read_lock = _Lock()
# servers provide a ready-to-go stream
self.stream = stream
def with_timeout(self, timeout, future):
return gen.with_timeout(timeout, future, self.io_loop)
@gen.coroutine
def open(self, timeout=None):
logger.debug('socket connecting')
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self.stream = iostream.IOStream(sock)
try:
connect = self.stream.connect((self.host, self.port))
if timeout is not None:
yield self.with_timeout(timeout, connect)
else:
yield connect
except (socket.error, IOError, ioloop.TimeoutError) as e:
message = 'could not connect to {}:{} ({})'.format(self.host, self.port, e)
raise TTransportException(
type=TTransportException.NOT_OPEN,
message=message)
raise gen.Return(self)
def set_close_callback(self, callback):
"""
Should be called only after open() returns
"""
self.stream.set_close_callback(callback)
def close(self):
# don't raise if we intend to close
self.stream.set_close_callback(None)
self.stream.close()
def read(self, _):
# The generated code for Tornado shouldn't do individual reads -- only
# frames at a time
assert False, "you're doing it wrong"
@contextmanager
def io_exception_context(self):
try:
yield
except (socket.error, IOError) as e:
raise TTransportException(
type=TTransportException.END_OF_FILE,
message=str(e))
except iostream.StreamBufferFullError as e:
raise TTransportException(
type=TTransportException.UNKNOWN,
message=str(e))
@gen.coroutine
def readFrame(self):
# IOStream processes reads one at a time
with (yield self._read_lock.acquire()):
with self.io_exception_context():
frame_header = yield self.stream.read_bytes(4)
if len(frame_header) == 0:
raise iostream.StreamClosedError('Read zero bytes from stream')
frame_length, = struct.unpack('!i', frame_header)
frame = yield self.stream.read_bytes(frame_length)
raise gen.Return(frame)
def write(self, buf):
self.__wbuf.write(buf)
def flush(self):
frame = self.__wbuf.getvalue()
# reset wbuf before write/flush to preserve state on underlying failure
frame_length = struct.pack('!i', len(frame))
self.__wbuf = BytesIO()
with self.io_exception_context():
return self.stream.write(frame_length + frame)
class TTornadoServer(tcpserver.TCPServer):
def __init__(self, processor, iprot_factory, oprot_factory=None,
*args, **kwargs):
super(TTornadoServer, self).__init__(*args, **kwargs)
self._processor = processor
self._iprot_factory = iprot_factory
self._oprot_factory = (oprot_factory if oprot_factory is not None
else iprot_factory)
@gen.coroutine
def handle_stream(self, stream, address):
host, port = address[:2]
trans = TTornadoStreamTransport(host=host, port=port, stream=stream,
io_loop=self.io_loop)
oprot = self._oprot_factory.getProtocol(trans)
try:
while not trans.stream.closed():
try:
frame = yield trans.readFrame()
except TTransportException as e:
if e.type == TTransportException.END_OF_FILE:
break
else:
raise
tr = TMemoryBuffer(frame)
iprot = self._iprot_factory.getProtocol(tr)
yield self._processor.process(iprot, oprot)
except Exception:
logger.exception('thrift exception in handle_stream')
trans.close()
logger.info('client disconnected %s:%d', host, port)
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
class TType(object):
STOP = 0
VOID = 1
BOOL = 2
BYTE = 3
I08 = 3
DOUBLE = 4
I16 = 6
I32 = 8
I64 = 10
STRING = 11
UTF7 = 11
STRUCT = 12
MAP = 13
SET = 14
LIST = 15
UTF8 = 16
UTF16 = 17
_VALUES_TO_NAMES = (
'STOP',
'VOID',
'BOOL',
'BYTE',
'DOUBLE',
None,
'I16',
None,
'I32',
None,
'I64',
'STRING',
'STRUCT',
'MAP',
'SET',
'LIST',
'UTF8',
'UTF16',
)
class TMessageType(object):
CALL = 1
REPLY = 2
EXCEPTION = 3
ONEWAY = 4
class TProcessor(object):
"""Base class for processor, which works on two streams."""
def process(self, iprot, oprot):
"""
Process a request. The normal behvaior is to have the
processor invoke the correct handler and then it is the
server's responsibility to write the response to oprot.
"""
pass
def on_message_begin(self, func):
"""
Install a callback that receives (name, type, seqid)
after the message header is read.
"""
pass
class TException(Exception):
"""Base class for all thrift exceptions."""
# BaseException.message is deprecated in Python v[2.6,3.0)
if (2, 6, 0) <= sys.version_info < (3, 0):
def _get_message(self):
return self._message
def _set_message(self, message):
self._message = message
message = property(_get_message, _set_message)
def __init__(self, message=None):
Exception.__init__(self, message)
self.message = message
class TApplicationException(TException):
"""Application level thrift exceptions."""
UNKNOWN = 0
UNKNOWN_METHOD = 1
INVALID_MESSAGE_TYPE = 2
WRONG_METHOD_NAME = 3
BAD_SEQUENCE_ID = 4
MISSING_RESULT = 5
INTERNAL_ERROR = 6
PROTOCOL_ERROR = 7
INVALID_TRANSFORM = 8
INVALID_PROTOCOL = 9
UNSUPPORTED_CLIENT_TYPE = 10
def __init__(self, type=UNKNOWN, message=None):
TException.__init__(self, message)
self.type = type
def __str__(self):
if self.message:
return self.message
elif self.type == self.UNKNOWN_METHOD:
return 'Unknown method'
elif self.type == self.INVALID_MESSAGE_TYPE:
return 'Invalid message type'
elif self.type == self.WRONG_METHOD_NAME:
return 'Wrong method name'
elif self.type == self.BAD_SEQUENCE_ID:
return 'Bad sequence ID'
elif self.type == self.MISSING_RESULT:
return 'Missing result'
elif self.type == self.INTERNAL_ERROR:
return 'Internal error'
elif self.type == self.PROTOCOL_ERROR:
return 'Protocol error'
elif self.type == self.INVALID_TRANSFORM:
return 'Invalid transform'
elif self.type == self.INVALID_PROTOCOL:
return 'Invalid protocol'
elif self.type == self.UNSUPPORTED_CLIENT_TYPE:
return 'Unsupported client type'
else:
return 'Default (unknown) TApplicationException'
def read(self, iprot):
iprot.readStructBegin()
while True:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
if fid == 1:
if ftype == TType.STRING:
self.message = iprot.readString()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.I32:
self.type = iprot.readI32()
else:
iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
iprot.readStructEnd()
def write(self, oprot):
oprot.writeStructBegin('TApplicationException')
if self.message is not None:
oprot.writeFieldBegin('message', TType.STRING, 1)
oprot.writeString(self.message)
oprot.writeFieldEnd()
if self.type is not None:
oprot.writeFieldBegin('type', TType.I32, 2)
oprot.writeI32(self.type)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
class TFrozenDict(dict):
"""A dictionary that is "frozen" like a frozenset"""
def __init__(self, *args, **kwargs):
super(TFrozenDict, self).__init__(*args, **kwargs)
# Sort the items so they will be in a consistent order.
# XOR in the hash of the class so we don't collide with
# the hash of a list of tuples.
self.__hashval = hash(TFrozenDict) ^ hash(tuple(sorted(self.items())))
def __setitem__(self, *args):
raise TypeError("Can't modify frozen TFreezableDict")
def __delitem__(self, *args):
raise TypeError("Can't modify frozen TFreezableDict")
def __hash__(self):
return self.__hashval
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
__all__ = ['Thrift', 'TSCons']
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import sys
if sys.version_info[0] == 2:
from cStringIO import StringIO as BufferIO
def binary_to_str(bin_val):
return bin_val
def str_to_binary(str_val):
return str_val
def byte_index(bytes_val, i):
return ord(bytes_val[i])
else:
from io import BytesIO as BufferIO # noqa
def binary_to_str(bin_val):
return bin_val.decode('utf8')
def str_to_binary(str_val):
return bytes(str_val, 'utf8')
def byte_index(bytes_val, i):
return bytes_val[i]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from thrift.transport import TTransport
class TBase(object):
__slots__ = ()
def __repr__(self):
L = ['%s=%r' % (key, getattr(self, key)) for key in self.__slots__]
return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
for attr in self.__slots__:
my_val = getattr(self, attr)
other_val = getattr(other, attr)
if my_val != other_val:
return False
return True
def __ne__(self, other):
return not (self == other)
def read(self, iprot):
if (iprot._fast_decode is not None and
isinstance(iprot.trans, TTransport.CReadableTransport) and
self.thrift_spec is not None):
iprot._fast_decode(self, iprot, [self.__class__, self.thrift_spec])
else:
iprot.readStruct(self, self.thrift_spec)
def write(self, oprot):
if (oprot._fast_encode is not None and self.thrift_spec is not None):
oprot.trans.write(
oprot._fast_encode(self, [self.__class__, self.thrift_spec]))
else:
oprot.writeStruct(self, self.thrift_spec)
class TExceptionBase(TBase, Exception):
pass
class TFrozenBase(TBase):
def __setitem__(self, *args):
raise TypeError("Can't modify frozen struct")
def __delitem__(self, *args):
raise TypeError("Can't modify frozen struct")
def __hash__(self, *args):
return hash(self.__class__) ^ hash(self.__slots__)
@classmethod
def read(cls, iprot):
if (iprot._fast_decode is not None and
isinstance(iprot.trans, TTransport.CReadableTransport) and
cls.thrift_spec is not None):
self = cls()
return iprot._fast_decode(None, iprot,
[self.__class__, self.thrift_spec])
else:
return iprot.readStruct(cls, cls.thrift_spec, True)
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from .TProtocol import TType, TProtocolBase, TProtocolException, TProtocolFactory
from struct import pack, unpack
class TBinaryProtocol(TProtocolBase):
"""Binary implementation of the Thrift protocol driver."""
# NastyHaxx. Python 2.4+ on 32-bit machines forces hex constants to be
# positive, converting this into a long. If we hardcode the int value
# instead it'll stay in 32 bit-land.
# VERSION_MASK = 0xffff0000
VERSION_MASK = -65536
# VERSION_1 = 0x80010000
VERSION_1 = -2147418112
TYPE_MASK = 0x000000ff
def __init__(self, trans, strictRead=False, strictWrite=True, **kwargs):
TProtocolBase.__init__(self, trans)
self.strictRead = strictRead
self.strictWrite = strictWrite
self.string_length_limit = kwargs.get('string_length_limit', None)
self.container_length_limit = kwargs.get('container_length_limit', None)
def _check_string_length(self, length):
self._check_length(self.string_length_limit, length)
def _check_container_length(self, length):
self._check_length(self.container_length_limit, length)
def writeMessageBegin(self, name, type, seqid):
if self.strictWrite:
self.writeI32(TBinaryProtocol.VERSION_1 | type)
self.writeString(name)
self.writeI32(seqid)
else:
self.writeString(name)
self.writeByte(type)
self.writeI32(seqid)
def writeMessageEnd(self):
pass
def writeStructBegin(self, name):
pass
def writeStructEnd(self):
pass
def writeFieldBegin(self, name, type, id):
self.writeByte(type)
self.writeI16(id)
def writeFieldEnd(self):
pass
def writeFieldStop(self):
self.writeByte(TType.STOP)
def writeMapBegin(self, ktype, vtype, size):
self.writeByte(ktype)
self.writeByte(vtype)
self.writeI32(size)
def writeMapEnd(self):
pass
def writeListBegin(self, etype, size):
self.writeByte(etype)
self.writeI32(size)
def writeListEnd(self):
pass
def writeSetBegin(self, etype, size):
self.writeByte(etype)
self.writeI32(size)
def writeSetEnd(self):
pass
def writeBool(self, bool):
if bool:
self.writeByte(1)
else:
self.writeByte(0)
def writeByte(self, byte):
buff = pack("!b", byte)
self.trans.write(buff)
def writeI16(self, i16):
buff = pack("!h", i16)
self.trans.write(buff)
def writeI32(self, i32):
buff = pack("!i", i32)
self.trans.write(buff)
def writeI64(self, i64):
buff = pack("!q", i64)
self.trans.write(buff)
def writeDouble(self, dub):
buff = pack("!d", dub)
self.trans.write(buff)
def writeBinary(self, str):
self.writeI32(len(str))
self.trans.write(str)
def readMessageBegin(self):
sz = self.readI32()
if sz < 0:
version = sz & TBinaryProtocol.VERSION_MASK
if version != TBinaryProtocol.VERSION_1:
raise TProtocolException(
type=TProtocolException.BAD_VERSION,
message='Bad version in readMessageBegin: %d' % (sz))
type = sz & TBinaryProtocol.TYPE_MASK
name = self.readString()
seqid = self.readI32()
else:
if self.strictRead:
raise TProtocolException(type=TProtocolException.BAD_VERSION,
message='No protocol version header')
name = self.trans.readAll(sz)
type = self.readByte()
seqid = self.readI32()
return (name, type, seqid)
def readMessageEnd(self):
pass
def readStructBegin(self):
pass
def readStructEnd(self):
pass
def readFieldBegin(self):
type = self.readByte()
if type == TType.STOP:
return (None, type, 0)
id = self.readI16()
return (None, type, id)
def readFieldEnd(self):
pass
def readMapBegin(self):
ktype = self.readByte()
vtype = self.readByte()
size = self.readI32()
self._check_container_length(size)
return (ktype, vtype, size)
def readMapEnd(self):
pass
def readListBegin(self):
etype = self.readByte()
size = self.readI32()
self._check_container_length(size)
return (etype, size)
def readListEnd(self):
pass
def readSetBegin(self):
etype = self.readByte()
size = self.readI32()
self._check_container_length(size)
return (etype, size)
def readSetEnd(self):
pass
def readBool(self):
byte = self.readByte()
if byte == 0:
return False
return True
def readByte(self):
buff = self.trans.readAll(1)
val, = unpack('!b', buff)
return val
def readI16(self):
buff = self.trans.readAll(2)
val, = unpack('!h', buff)
return val
def readI32(self):
buff = self.trans.readAll(4)
val, = unpack('!i', buff)
return val
def readI64(self):
buff = self.trans.readAll(8)
val, = unpack('!q', buff)
return val
def readDouble(self):
buff = self.trans.readAll(8)
val, = unpack('!d', buff)
return val
def readBinary(self):
size = self.readI32()
self._check_string_length(size)
s = self.trans.readAll(size)
return s
class TBinaryProtocolFactory(TProtocolFactory):
def __init__(self, strictRead=False, strictWrite=True, **kwargs):
self.strictRead = strictRead
self.strictWrite = strictWrite
self.string_length_limit = kwargs.get('string_length_limit', None)
self.container_length_limit = kwargs.get('container_length_limit', None)
def getProtocol(self, trans):
prot = TBinaryProtocol(trans, self.strictRead, self.strictWrite,
string_length_limit=self.string_length_limit,
container_length_limit=self.container_length_limit)
return prot
class TBinaryProtocolAccelerated(TBinaryProtocol):
"""C-Accelerated version of TBinaryProtocol.
This class does not override any of TBinaryProtocol's methods,
but the generated code recognizes it directly and will call into
our C module to do the encoding, bypassing this object entirely.
We inherit from TBinaryProtocol so that the normal TBinaryProtocol
encoding can happen if the fastbinary module doesn't work for some
reason. (TODO(dreiss): Make this happen sanely in more cases.)
To disable this behavior, pass fallback=False constructor argument.
In order to take advantage of the C module, just use
TBinaryProtocolAccelerated instead of TBinaryProtocol.
NOTE: This code was contributed by an external developer.
The internal Thrift team has reviewed and tested it,
but we cannot guarantee that it is production-ready.
Please feel free to report bugs and/or success stories
to the public mailing list.
"""
pass
def __init__(self, *args, **kwargs):
fallback = kwargs.pop('fallback', True)
super(TBinaryProtocolAccelerated, self).__init__(*args, **kwargs)
try:
from thrift.protocol import fastbinary
except ImportError:
if not fallback:
raise
else:
self._fast_decode = fastbinary.decode_binary
self._fast_encode = fastbinary.encode_binary
class TBinaryProtocolAcceleratedFactory(TProtocolFactory):
def __init__(self,
string_length_limit=None,
container_length_limit=None,
fallback=True):
self.string_length_limit = string_length_limit
self.container_length_limit = container_length_limit
self._fallback = fallback
def getProtocol(self, trans):
return TBinaryProtocolAccelerated(
trans,
string_length_limit=self.string_length_limit,
container_length_limit=self.container_length_limit,
fallback=self._fallback)
This diff is collapsed.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
from thrift.protocol.TCompactProtocol import TCompactProtocolAccelerated
from thrift.protocol.TProtocol import TProtocolBase, TProtocolException, TProtocolFactory
from thrift.Thrift import TApplicationException, TMessageType
from thrift.transport.THeaderTransport import THeaderTransport, THeaderSubprotocolID, THeaderClientType
PROTOCOLS_BY_ID = {
THeaderSubprotocolID.BINARY: TBinaryProtocolAccelerated,
THeaderSubprotocolID.COMPACT: TCompactProtocolAccelerated,
}
class THeaderProtocol(TProtocolBase):
"""A framed protocol with headers and payload transforms.
THeaderProtocol frames other Thrift protocols and adds support for optional
out-of-band headers. The currently supported subprotocols are
TBinaryProtocol and TCompactProtocol.
It's also possible to apply transforms to the encoded message payload. The
only transform currently supported is to gzip.
When used in a server, THeaderProtocol can accept messages from
non-THeaderProtocol clients if allowed (see `allowed_client_types`). This
includes framed and unframed transports and both TBinaryProtocol and
TCompactProtocol. The server will respond in the appropriate dialect for
the connected client. HTTP clients are not currently supported.
THeaderProtocol does not currently support THTTPServer, TNonblockingServer,
or TProcessPoolServer.
See doc/specs/HeaderFormat.md for details of the wire format.
"""
def __init__(self, transport, allowed_client_types):
# much of the actual work for THeaderProtocol happens down in
# THeaderTransport since we need to do low-level shenanigans to detect
# if the client is sending us headers or one of the headerless formats
# we support. this wraps the real transport with the one that does all
# the magic.
if not isinstance(transport, THeaderTransport):
transport = THeaderTransport(transport, allowed_client_types)
super(THeaderProtocol, self).__init__(transport)
self._set_protocol()
def get_headers(self):
return self.trans.get_headers()
def set_header(self, key, value):
self.trans.set_header(key, value)
def clear_headers(self):
self.trans.clear_headers()
def add_transform(self, transform_id):
self.trans.add_transform(transform_id)
def writeMessageBegin(self, name, ttype, seqid):
self.trans.sequence_id = seqid
return self._protocol.writeMessageBegin(name, ttype, seqid)
def writeMessageEnd(self):
return self._protocol.writeMessageEnd()
def writeStructBegin(self, name):
return self._protocol.writeStructBegin(name)
def writeStructEnd(self):
return self._protocol.writeStructEnd()
def writeFieldBegin(self, name, ttype, fid):
return self._protocol.writeFieldBegin(name, ttype, fid)
def writeFieldEnd(self):
return self._protocol.writeFieldEnd()
def writeFieldStop(self):
return self._protocol.writeFieldStop()
def writeMapBegin(self, ktype, vtype, size):
return self._protocol.writeMapBegin(ktype, vtype, size)
def writeMapEnd(self):
return self._protocol.writeMapEnd()
def writeListBegin(self, etype, size):
return self._protocol.writeListBegin(etype, size)
def writeListEnd(self):
return self._protocol.writeListEnd()
def writeSetBegin(self, etype, size):
return self._protocol.writeSetBegin(etype, size)
def writeSetEnd(self):
return self._protocol.writeSetEnd()
def writeBool(self, bool_val):
return self._protocol.writeBool(bool_val)
def writeByte(self, byte):
return self._protocol.writeByte(byte)
def writeI16(self, i16):
return self._protocol.writeI16(i16)
def writeI32(self, i32):
return self._protocol.writeI32(i32)
def writeI64(self, i64):
return self._protocol.writeI64(i64)
def writeDouble(self, dub):
return self._protocol.writeDouble(dub)
def writeBinary(self, str_val):
return self._protocol.writeBinary(str_val)
def _set_protocol(self):
try:
protocol_cls = PROTOCOLS_BY_ID[self.trans.protocol_id]
except KeyError:
raise TApplicationException(
TProtocolException.INVALID_PROTOCOL,
"Unknown protocol requested.",
)
self._protocol = protocol_cls(self.trans)
self._fast_encode = self._protocol._fast_encode
self._fast_decode = self._protocol._fast_decode
def readMessageBegin(self):
try:
self.trans.readFrame(0)
self._set_protocol()
except TApplicationException as exc:
self._protocol.writeMessageBegin(b"", TMessageType.EXCEPTION, 0)
exc.write(self._protocol)
self._protocol.writeMessageEnd()
self.trans.flush()
return self._protocol.readMessageBegin()
def readMessageEnd(self):
return self._protocol.readMessageEnd()
def readStructBegin(self):
return self._protocol.readStructBegin()
def readStructEnd(self):
return self._protocol.readStructEnd()
def readFieldBegin(self):
return self._protocol.readFieldBegin()
def readFieldEnd(self):
return self._protocol.readFieldEnd()
def readMapBegin(self):
return self._protocol.readMapBegin()
def readMapEnd(self):
return self._protocol.readMapEnd()
def readListBegin(self):
return self._protocol.readListBegin()
def readListEnd(self):
return self._protocol.readListEnd()
def readSetBegin(self):
return self._protocol.readSetBegin()
def readSetEnd(self):
return self._protocol.readSetEnd()
def readBool(self):
return self._protocol.readBool()
def readByte(self):
return self._protocol.readByte()
def readI16(self):
return self._protocol.readI16()
def readI32(self):
return self._protocol.readI32()
def readI64(self):
return self._protocol.readI64()
def readDouble(self):
return self._protocol.readDouble()
def readBinary(self):
return self._protocol.readBinary()
class THeaderProtocolFactory(TProtocolFactory):
def __init__(self, allowed_client_types=(THeaderClientType.HEADERS,)):
self.allowed_client_types = allowed_client_types
def getProtocol(self, trans):
return THeaderProtocol(trans, self.allowed_client_types)
This diff is collapsed.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from thrift.Thrift import TMessageType
from thrift.protocol import TProtocolDecorator
SEPARATOR = ":"
class TMultiplexedProtocol(TProtocolDecorator.TProtocolDecorator):
def __init__(self, protocol, serviceName):
self.serviceName = serviceName
def writeMessageBegin(self, name, type, seqid):
if (type == TMessageType.CALL or
type == TMessageType.ONEWAY):
super(TMultiplexedProtocol, self).writeMessageBegin(
self.serviceName + SEPARATOR + name,
type,
seqid
)
else:
super(TMultiplexedProtocol, self).writeMessageBegin(name, type, seqid)
This diff is collapsed.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
class TProtocolDecorator(object):
def __new__(cls, protocol, *args, **kwargs):
decorated_cls = type(''.join(['Decorated', protocol.__class__.__name__]),
(cls, protocol.__class__),
protocol.__dict__)
return object.__new__(decorated_cls)
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
__all__ = ['fastbinary', 'TBase', 'TBinaryProtocol', 'TCompactProtocol',
'TJSONProtocol', 'TProtocol', 'TProtocolDecorator']
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import ssl
from six.moves import BaseHTTPServer
from thrift.Thrift import TMessageType
from thrift.server import TServer
from thrift.transport import TTransport
class ResponseException(Exception):
"""Allows handlers to override the HTTP response
Normally, THttpServer always sends a 200 response. If a handler wants
to override this behavior (e.g., to simulate a misconfigured or
overloaded web server during testing), it can raise a ResponseException.
The function passed to the constructor will be called with the
RequestHandler as its only argument. Note that this is irrelevant
for ONEWAY requests, as the HTTP response must be sent before the
RPC is processed.
"""
def __init__(self, handler):
self.handler = handler
class THttpServer(TServer.TServer):
"""A simple HTTP-based Thrift server
This class is not very performant, but it is useful (for example) for
acting as a mock version of an Apache-based PHP Thrift endpoint.
Also important to note the HTTP implementation pretty much violates the
transport/protocol/processor/server layering, by performing the transport
functions here. This means things like oneway handling are oddly exposed.
"""
def __init__(self,
processor,
server_address,
inputProtocolFactory,
outputProtocolFactory=None,
server_class=BaseHTTPServer.HTTPServer,
**kwargs):
"""Set up protocol factories and HTTP (or HTTPS) server.
See BaseHTTPServer for server_address.
See TServer for protocol factories.
To make a secure server, provide the named arguments:
* cafile - to validate clients [optional]
* cert_file - the server cert
* key_file - the server's key
"""
if outputProtocolFactory is None:
outputProtocolFactory = inputProtocolFactory
TServer.TServer.__init__(self, processor, None, None, None,
inputProtocolFactory, outputProtocolFactory)
thttpserver = self
self._replied = None
class RequestHander(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
# Don't care about the request path.
thttpserver._replied = False
iftrans = TTransport.TFileObjectTransport(self.rfile)
itrans = TTransport.TBufferedTransport(
iftrans, int(self.headers['Content-Length']))
otrans = TTransport.TMemoryBuffer()
iprot = thttpserver.inputProtocolFactory.getProtocol(itrans)
oprot = thttpserver.outputProtocolFactory.getProtocol(otrans)
try:
thttpserver.processor.on_message_begin(self.on_begin)
thttpserver.processor.process(iprot, oprot)
except ResponseException as exn:
exn.handler(self)
else:
if not thttpserver._replied:
# If the request was ONEWAY we would have replied already
data = otrans.getvalue()
self.send_response(200)
self.send_header("Content-Length", len(data))
self.send_header("Content-Type", "application/x-thrift")
self.end_headers()
self.wfile.write(data)
def on_begin(self, name, type, seqid):
"""
Inspect the message header.
This allows us to post an immediate transport response
if the request is a ONEWAY message type.
"""
if type == TMessageType.ONEWAY:
self.send_response(200)
self.send_header("Content-Type", "application/x-thrift")
self.end_headers()
thttpserver._replied = True
self.httpd = server_class(server_address, RequestHander)
if (kwargs.get('cafile') or kwargs.get('cert_file') or kwargs.get('key_file')):
context = ssl.create_default_context(cafile=kwargs.get('cafile'))
context.check_hostname = False
context.load_cert_chain(kwargs.get('cert_file'), kwargs.get('key_file'))
context.verify_mode = ssl.CERT_REQUIRED if kwargs.get('cafile') else ssl.CERT_NONE
self.httpd.socket = context.wrap_socket(self.httpd.socket, server_side=True)
def serve(self):
self.httpd.serve_forever()
def shutdown(self):
self.httpd.socket.close()
# self.httpd.shutdown() # hangs forever, python doesn't handle POLLNVAL properly!
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
__all__ = ['TServer', 'TNonblockingServer']
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
__all__ = ['TTransport', 'TSocket', 'THttpClient', 'TZlibTransport']
This diff is collapsed.
Metadata-Version: 1.2
Name: thrift-sasl
Version: 0.3.0
Summary: Thrift SASL Python module that implements SASL transports for Thrift (`TSaslClientTransport`).
Home-page: https://github.com/cloudera/thrift_sasl
Author: Uri Laserson
Author-email: laserson@cloudera.com
Maintainer: Wes McKinney
Maintainer-email: wes@cloudera.com
License: Apache License, Version 2.0
Description: Thrift SASL Python module that implements SASL transports for Thrift (`TSaslClientTransport`).
Keywords: thrift sasl transport
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.6
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
setup.cfg
setup.py
thrift_sasl/__init__.py
thrift_sasl.egg-info/PKG-INFO
thrift_sasl.egg-info/SOURCES.txt
thrift_sasl.egg-info/dependency_links.txt
thrift_sasl.egg-info/requires.txt
thrift_sasl.egg-info/top_level.txt
\ No newline at end of file
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment