# This file is part of RestAuthCommon.
#
# RestAuthCommon is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# RestAuthCommon is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with RestAuthCommon. If not, see <http://www.gnu.org/licenses/>.
"""
Classes and methods related to content handling.
.. moduleauthor:: Mathias Ertl <mati@restauth.net>
"""
import json as libjson
import pickle
import sys
try:
from urllib.parse import parse_qs # python3
from urllib.parse import urlencode # python3
except ImportError:
from urlparse import parse_qs # python2
from urllib import urlencode # python2
from RestAuthCommon import error
if sys.version_info >= (3, 0):
IS_PYTHON3 = True
IS_PYTHON2 = False
else:
IS_PYTHON3 = False
IS_PYTHON2 = True
[docs]class ContentHandler(object):
"""
This class is a common base class for all content handlers. If you
want to implement your own content handler, you must subclass this
class and implement all marshal_* and unmarshal_* methods.
**Never use this class directly.** It does not marshal or unmarshal any
content itself. """
mime = None
"""Override this with the MIME type handled by your handler."""
librarypath = None
"""Override ``librarypath`` to lazily load named library upon first use.
This may be a toplevel module (i.e. ``"json"``) or a submodule (i.e.
``"lxml.etree"``). The named library is accessable via ``self.library``.
Example::
class XMLContentHandler(ContentHandler):
librarypath = 'lxml.etree'
def unmarshal_str(self, data):
tree = self.library.Element(data)
# ...
"""
SUPPORT_NESTED_DICTS = True
"""Set to False if your content handler does not support nested
dictionaries as used i.e. during user-creation."""
_library = None
@property
def library(self):
"""Library configured with the ``librarypath`` class variable."""
if self._library is None:
if '.' in self.librarypath:
mod, lib = self.librarypath.rsplit('.', 1)
_temp = __import__(mod, fromlist=[lib])
self._library = getattr(_temp, lib)
else:
self._library = __import__(self.librarypath)
return self._library
[docs] def marshal(self, obj):
"""
Shortcut for marshalling just any object.
**Note:** If you know the type of **obj** in advance, you should
use the marshal_* methods directly for improved speed.
:param obj: The object to marshall.
:return: The marshalled representation of the object.
:rtype: str
:raise error.MarshalError: If marshalling goes wrong in any way.
"""
if isinstance(obj, (bytes, str)):
func_name = 'marshal_str'
if IS_PYTHON2 and isinstance(obj, unicode):
func_name = 'marshal_str'
else:
func_name = 'marshal_%s' % (obj.__class__.__name__)
try:
func = getattr(self, func_name)
return func(obj)
except error.MarshalError as e:
raise e
except Exception as e:
raise error.MarshalError(e)
[docs] def unmarshal(self, raw_data, typ):
"""
Shortcut for unmarshalling a string to an object of type *typ*.
**Note:** You may want to use the unmarshal_* methods directly
for improved speed.
:param raw_data: The string to unmarshall.
:type raw_data: str
:param typ: The typ of the unmarshalled object.
:type typ: type
:return: The unmarshalled object.
:rtype: typ
:raise error.UnmarshalError: If unmarshalling goes wrong in any way.
"""
try:
func = getattr(self, 'unmarshal_%s' % (typ.__name__))
val = func(raw_data)
except error.UnmarshalError as e:
raise e
except Exception as e:
raise error.UnmarshalError(e)
if val.__class__ != typ:
raise error.UnmarshalError(
"Request body contained %s instead of %s" %
(val.__class__, typ)
)
return val
[docs] def unmarshal_str(self, data):
"""Unmarshal a string.
:param data: Data to unmarshal.
:type data: bytes in python3, str in python2
:rtype: str in python3, unicode in python2
"""
pass
[docs] def unmarshal_dict(self, body):
"""Unmarshal a dictionary.
:param data: Data to unmarshal.
:type data: bytes in python3, str in python2
:rtype: dict
"""
pass
[docs] def unmarshal_list(self, body):
"""Unmarshal a list.
:param data: Data to unmarshal.
:type data: bytes in python3, str in python2
:rtype: list
"""
pass
[docs] def unmarshal_bool(self, body):
"""Unmarshal a boolean.
:param data: Data to unmarshal.
:type data: bytes in python3, str in python2
:rtype: str
"""
pass
[docs] def marshal_str(self, obj):
"""Marshal a string.
:param obj: Data to marshal.
:type obj: str, bytes, unicode
:rtype: bytes in python3, str in python2
"""
pass
[docs] def marshal_bool(self, obj):
"""
Marshal a boolean.
"""
pass
[docs] def marshal_list(self, obj):
"""Marshal a list.
:param obj: Data to marshal.
:type obj: list
:rtype: bytes in python3, str in python2
"""
pass
[docs] def marshal_dict(self, obj):
"""Marshal a dictionary.
:param obj: Data to marshal.
:type obj: dict
:rtype: bytes in python3, str in python2
"""
pass
[docs]class JSONContentHandler(ContentHandler):
"""Handler for JSON encoded content.
.. seealso:: `Specification <http://www.json.org>`_, `WikiPedia
<http://en.wikipedia.org/wiki/JSON>`_
"""
mime = 'application/json'
"""The mime-type used by this content handler is 'application/json'."""
SEPARATORS = (',', ':')
class ByteEncoder(libjson.JSONEncoder):
def default(self, obj):
if IS_PYTHON3 and isinstance(obj, bytes):
return obj.decode('utf-8')
return libjson.JSONEncoder.default(self, obj)
class ByteDecoder(libjson.JSONDecoder):
def decode(self, obj):
if IS_PYTHON3 and isinstance(obj, bytes):
obj = obj.decode('utf-8')
return libjson.JSONDecoder.decode(self, obj)
def unmarshal_str(self, body):
try:
pure = libjson.loads(body, cls=self.ByteDecoder)
if not isinstance(pure, list) or len(pure) != 1:
raise error.UnmarshalError("Could not parse body as string")
string = pure[0]
# In python 2.7.1 (not 2.7.2) json.loads("") returns a str and
# not unicode.
if IS_PYTHON2 and isinstance(string, str):
return unicode(string)
return string
except ValueError as e:
raise error.UnmarshalError(e)
def unmarshal_dict(self, body):
try:
return libjson.loads(body, cls=self.ByteDecoder)
except ValueError as e:
raise error.UnmarshalError(e)
def unmarshal_list(self, body):
try:
return libjson.loads(body, cls=self.ByteDecoder)
except ValueError as e:
raise error.UnmarshalError(e)
def unmarshal_bool(self, body):
try:
return libjson.loads(body)
except ValueError as e:
raise error.UnmarshalError(e)
def marshal_str(self, obj):
try:
dumped = libjson.dumps([obj], separators=self.SEPARATORS,
cls=self.ByteEncoder)
if IS_PYTHON3:
return dumped.encode('utf-8')
else:
return dumped
except ValueError as e:
raise error.MarshalError(e)
def marshal_bool(self, obj):
try:
dumped = libjson.dumps(obj, separators=self.SEPARATORS,
cls=self.ByteEncoder)
if IS_PYTHON3:
return dumped.encode('utf-8')
else:
return dumped
except ValueError as e:
raise error.MarshalError(e)
def marshal_list(self, obj):
try:
dumped = libjson.dumps(obj, separators=self.SEPARATORS,
cls=self.ByteEncoder)
if IS_PYTHON3:
return dumped.encode('utf-8')
else:
return dumped
except ValueError as e:
raise error.MarshalError(e)
def marshal_dict(self, obj):
try:
dumped = libjson.dumps(obj, separators=self.SEPARATORS,
cls=self.ByteEncoder)
if IS_PYTHON3:
return dumped.encode('utf-8')
else:
return dumped
except ValueError as e:
raise error.MarshalError(e)
[docs]class FormContentHandler(ContentHandler):
"""Handler for HTML Form urlencoded content.
.. WARNING:: Because of the limitations of urlencoded forms, this handler
does not support nested dictionaries.
"""
mime = 'application/x-www-form-urlencoded'
"""The mime-type used by this content handler is
'application/x-www-form-urlencoded'."""
SUPPORT_NESTED_DICTS = False
def _decode_dict(self, d):
decoded = {}
for key, value in d.items():
key = key.decode('utf-8')
if isinstance(value, (str, unicode)):
decoded[key] = value.decode('utf-8')
elif isinstance(value, list):
decoded[key] = [e.decode('utf-8') for e in value]
elif isinstance(value, dict):
decoded[key] = self._decode_dict(value)
return decoded
def unmarshal_dict(self, body):
if IS_PYTHON3:
body = body.decode('utf-8')
parsed_dict = parse_qs(body, True)
ret_dict = {}
for key, value in parsed_dict.items():
if isinstance(value, list) and len(value) == 1:
ret_dict[key] = value[0]
else:
ret_dict[key] = value
if IS_PYTHON2:
ret_dict = self._decode_dict(ret_dict)
return ret_dict
def unmarshal_list(self, body):
if IS_PYTHON3:
body = body.decode('utf-8')
if body == '':
return []
parsed = parse_qs(body, True)['list']
if IS_PYTHON2:
parsed = [e.decode('utf-8') for e in parsed]
return parsed
def unmarshal_str(self, body):
if IS_PYTHON3:
body = body.decode('utf-8')
parsed = parse_qs(body, True)['str'][0]
if IS_PYTHON2:
parsed = parsed.decode('utf-8')
return parsed
def marshal_str(self, obj):
if IS_PYTHON2:
obj = obj.encode('utf-8')
if IS_PYTHON3:
return urlencode({'str': obj}).encode('utf-8')
else:
return urlencode({'str': obj})
def marshal_bool(self, obj):
if obj:
return "1"
else:
return "0"
def _encode_dict(self, d):
encoded = {}
for key, value in d.items():
key = key.encode('utf-8')
if isinstance(value, (str, unicode)):
encoded[key] = value.encode('utf-8')
elif isinstance(value, list):
encoded[key] = [e.encode('utf-8') for e in value]
elif isinstance(value, dict):
encoded[key] = self._encode_dict(value)
return encoded
def marshal_dict(self, obj):
if IS_PYTHON2:
obj = self._encode_dict(obj)
# verify that no value is a dictionary, because the unmarshalling for
# that doesn't work:
for v in obj.values():
if isinstance(v, dict):
raise error.MarshalError(
"FormContentHandler doesn't support nested dictionaries.")
if IS_PYTHON3:
return urlencode(obj, doseq=True).encode('utf-8')
else:
return urlencode(obj, doseq=True)
def marshal_list(self, obj):
if IS_PYTHON2:
obj = [e.encode('utf-8') for e in obj]
if IS_PYTHON3:
return urlencode({'list': obj}, doseq=True).encode('utf-8')
else:
return urlencode({'list': obj}, doseq=True)
[docs]class PickleContentHandler(ContentHandler):
"""Handler for pickle-encoded content.
.. seealso:: `module documentation
<http://docs.python.org/2/library/pickle.html>`_,
`WikiPedia <http://en.wikipedia.org/wiki/Pickle_(Python)>`_
"""
mime = 'application/pickle'
"""The mime-type used by this content handler is 'application/pickle'."""
PROTOCOL = 2
def marshal_str(self, obj):
try:
return pickle.dumps(obj, protocol=self.PROTOCOL)
except pickle.PickleError as e:
raise error.MarshalError(str(e))
def marshal_dict(self, obj):
try:
return pickle.dumps(obj, protocol=self.PROTOCOL)
except pickle.PickleError as e:
raise error.MarshalError(str(e))
def marshal_list(self, obj):
try:
return pickle.dumps(obj, protocol=self.PROTOCOL)
except pickle.PickleError as e:
raise error.MarshalError(str(e))
def unmarshal_str(self, data):
try:
unpickled = pickle.loads(data)
if IS_PYTHON3 and isinstance(unpickled, bytes):
# if bytes were pickled, we have to decode them
unpickled = unpickled.decode('utf-8')
return unpickled
except pickle.PickleError as e:
raise error.UnmarshalError(str(e))
def unmarshal_list(self, data):
try:
return pickle.loads(data)
except pickle.PickleError as e:
raise error.UnmarshalError(str(e))
def unmarshal_dict(self, data):
try:
return pickle.loads(data)
except pickle.PickleError as e:
raise error.UnmarshalError(str(e))
[docs]class Pickle3ContentHandler(PickleContentHandler):
"""Handler for pickle-encoded content, protocol level version 3.
This version is only supported by the Python3 version the pickle module,
this ContentHandler is only usable in Python3.
.. seealso:: `module documentation
<http://docs.python.org/3/library/pickle.html>`_,
`WikiPedia <http://en.wikipedia.org/wiki/Pickle_(Python)>`_
"""
mime = 'application/pickle3'
"""The mime-type used by this content handler is 'application/pickle3'."""
PROTOCOL = 3
[docs]class YAMLContentHandler(ContentHandler):
"""Handler for YAML encoded content.
.. NOTE:: This ContentHandler requires `PyYAML library
<http://pyyaml.org/>`_.
.. seealso:: `Specification <http://www.yaml.org/>`_,
`WikiPedia <http://en.wikipedia.org/wiki/YAML>`_
"""
mime = 'application/yaml'
"""The mime-type used by this content handler is 'application/yaml'."""
librarypath = 'yaml'
def _marshal_str3(self, obj):
return self.library.dump(obj).encode('utf-8')
def _marshal_str2(self, obj):
return self.library.dump(obj)
def marshal_str(self, obj):
try:
return self._marshal_str(obj)
except self.library.YAMLError as e:
raise error.MarshalError(str(e))
def _marshal_dict3(self, obj):
return self.library.dump(obj).encode('utf-8')
def _marshal_dict2(self, obj):
return self.library.dump(obj)
def marshal_dict(self, obj):
try:
return self._marshal_dict(obj)
except self.library.YAMLError as e:
raise error.MarshalError(str(e))
def _marshal_list3(self, obj):
return self.library.dump(obj).encode('utf-8')
def _marshal_list2(self, obj):
return self.library.dump(obj)
def marshal_list(self, obj):
try:
return self._marshal_list(obj)
except self.library.YAMLError as e:
raise error.MarshalError(str(e))
def _unmarshal_str3(self, unmarshalled):
if unmarshalled is None:
return ''
if isinstance(unmarshalled, bytes):
return unmarshalled.decode('utf-8')
else:
return unmarshalled
def _unmarshal_str2(self, unmarshalled):
if unmarshalled is None:
return unicode('')
return unmarshalled
def unmarshal_str(self, data):
try:
unmarshalled = self.library.load(data)
return self._unmarshal_str(unmarshalled)
except self.library.YAMLError as e:
raise error.UnmarshalError(str(e))
def unmarshal_list(self, data):
try:
return self.library.load(data)
except self.library.YAMLError as e:
raise error.UnmarshalError(str(e))
def unmarshal_dict(self, data):
try:
return self.library.load(data)
except self.library.YAMLError as e:
raise error.UnmarshalError(str(e))
if IS_PYTHON3:
_marshal_str = _marshal_str3
_marshal_dict = _marshal_dict3
_marshal_list = _marshal_list3
_unmarshal_str = _unmarshal_str3
else:
_marshal_str = _marshal_str2
_marshal_dict = _marshal_dict2
_marshal_list = _marshal_list2
_unmarshal_str = _unmarshal_str2
[docs]class XMLContentHandler(ContentHandler):
"""Future location of the XML content handler.
.. NOTE:: This ContentHandler requires the `lxml library
<http://lxml.de/>`_.
"""
mime = 'application/xml'
"""The mime-type used by this content handler is 'application/xml'."""
librarypath = 'lxml.etree'
def unmarshal_str(self, data):
text = self.library.fromstring(data).text
if text is None:
text = ''
if not IS_PYTHON3:
text = unicode(text)
return text
def _unmarshal_dict(self, tree):
d = {}
# find all strings
for e in tree.iterfind('str'):
if e.text is None:
d[e.attrib['key']] = ''
else:
d[e.attrib['key']] = e.text
# parse subdictionaries
for subdict in tree.iterfind('dict'):
d[subdict.attrib['key']] = self._unmarshal_dict(subdict)
return d
def unmarshal_dict(self, body):
return self._unmarshal_dict(self.library.fromstring(body))
def unmarshal_list(self, body):
l = []
for elem in self.library.fromstring(body).iterfind('str'):
if elem.text is None:
l.append('')
else:
l.append(elem.text)
return l
def marshal_str(self, obj):
root = self.library.Element('str')
if IS_PYTHON3 and isinstance(obj, bytes):
obj = obj.decode('utf-8')
root.text = obj
return self.library.tostring(root)
def marshal_list(self, obj):
root = self.library.Element('list')
for value in obj:
elem = self.library.Element('str')
elem.text = value
root.append(elem)
return self.library.tostring(root)
def _marshal_dict(self, obj, key=None):
root = self.library.Element('dict')
if key is not None:
root.attrib['key'] = key
for key, value in obj.items():
if isinstance(value, str):
elem = self.library.Element('str', attrib={'key': key})
elem.text = value
root.append(elem)
elif not IS_PYTHON3 and isinstance(value, unicode):
elem = self.library.Element('str', attrib={'key': key})
elem.text = value
root.append(elem)
elif isinstance(value, dict):
root.append(self._marshal_dict(value, key=key))
else:
raise error.MarshalError('MarshalError (type %s): %s'
% (type(value), value))
return root
def marshal_dict(self, obj):
return self.library.tostring(self._marshal_dict(obj))
CONTENT_HANDLERS = {
'application/json': JSONContentHandler,
'application/pickle': PickleContentHandler,
'application/pickle3': Pickle3ContentHandler,
'application/x-www-form-urlencoded': FormContentHandler,
'application/xml': XMLContentHandler,
'application/yaml': YAMLContentHandler,
}
"""
Mapping of MIME types to their respective handler implemenation. You can use
this dictionary to dynamically look up a content handler if you do not know the
requested content type in advance.
================================= ===========================================
MIME type Handler
================================= ===========================================
application/json :py:class:`.handlers.JSONContentHandler`
application/x-www-form-urlencoded :py:class:`.handlers.FormContentHandler`
application/pickle :py:class:`.handlers.PickleContentHandler`
application/pickle3 :py:class:`.handlers.Pickle3ContentHandler`
application/xml :py:class:`.handlers.XMLContentHandler`
application/yaml :py:class:`.handlers.YAMLContentHandler`
================================= ===========================================
If you want to provide your own implementation of a
:py:class:`.ContentHandler`, you can add it to this dictionary with the
appropriate MIME type as the key.
"""
# old names, for compatability:
content_handler = ContentHandler
json = JSONContentHandler
xml = XMLContentHandler
form = FormContentHandler
# 'YamlContentHandler' was introduced in 0.6.1 and renamed for consistency to
# 'YAMLContentHandler' in 0.6.2
YamlContentHandler = YAMLContentHandler