# ------------------------------------------------------------------------------
#
# Project: pyows <http://eoxserver.org>
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# ------------------------------------------------------------------------------
# Copyright (C) 2019 EOX IT Services GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ------------------------------------------------------------------------------
""" This module provides base functionality for any other decoder class.
"""
ZERO_OR_ONE = "?"
ONE_OR_MORE = "+"
ANY = "*"
SINGLE_VALUES = (ZERO_OR_ONE, 1)
[docs]class DecodingException(Exception):
""" Base Exception class to be thrown whenever a decoding failed.
"""
def __init__(self, message, locator=None):
super().__init__(message)
self.locator = locator
[docs]class WrongMultiplicityException(DecodingException):
""" Decoding Exception to be thrown when the multiplicity of a parameter did
not match the expected count.
"""
code = "InvalidParameterValue"
def __init__(self, locator, expected, result):
super().__init__(
"Parameter '%s': expected %s got %d" % (locator, expected, result),
locator
)
[docs]class MissingParameterException(DecodingException):
""" Exception to be thrown, when a decoder could not read one parameter,
where exactly one was required.
"""
code = "MissingParameterValue"
def __init__(self, locator):
super().__init__(
"Missing required parameter '%s'" % locator, locator
)
[docs]class MissingParameterMultipleException(DecodingException):
""" Exception to be thrown, when a decoder could not read at least one
parameter, where one ore more were required.
"""
code = "MissingParameterValue"
def __init__(self, locator):
super().__init__(
"Missing at least one required parameter '%s'" % locator, locator
)
[docs]class NoChoiceResultException(DecodingException):
pass
[docs]class ExclusiveException(DecodingException):
pass
# NOTE: The following exceptions may get propagated as OWS exceptions
# therefore it is necessary to set the proper OWS exception code.
[docs]class InvalidParameterException(DecodingException):
code = "InvalidParameterValue"
# Compound fields
[docs]class Choice:
""" Tries all given choices until one does return something.
"""
def __init__(self, *choices):
self.choices = choices
def __get__(self, decoder, decoder_class=None):
for choice in self.choices:
try:
return choice.__get__(decoder, decoder_class)
except Exception:
continue
raise NoChoiceResultException
[docs]class Exclusive:
""" For mutual exclusive Parameters.
"""
def __init__(self, *choices):
self.choices = choices
def __get__(self, decoder, decoder_class=None):
result = None
num = 0
for choice in self.choices:
try:
result = choice.__get__(decoder, decoder_class)
num += 1
except Exception:
continue
if num != 1:
raise ExclusiveException
return result
[docs]class Concatenate:
""" Helper to concatenate the results of all sub-parameters to one.
"""
def __init__(self, *choices, **kwargs):
self.choices = choices
self.allow_errors = kwargs.get("allow_errors", True)
def __get__(self, decoder, decoder_class=None):
result = []
for choice in self.choices:
try:
value = choice.__get__(decoder, decoder_class)
if isinstance(value, (list, tuple)):
result.extend(value)
else:
result.append(value)
except Exception as exc:
if self.allow_errors:
# swallow exception
continue
raise exc
return result
# Type conversion helpers
[docs]class typelist:
""" Helper for XMLDecoder schemas that expect a string that represents a
list of a type separated by some separator.
"""
def __init__(self, typ=None, separator=" "):
self.typ = typ
self.separator = separator
def __call__(self, value):
split = value.split(self.separator)
if self.typ:
return [self.typ(v) for v in split]
return split
[docs]class fixed:
""" Helper for parameters that are expected to be have a fixed value and
raises a ValueError if not.
"""
def __init__(self, value, case_sensitive=True):
self.value = value if case_sensitive else value.lower()
self.case_sensitive = case_sensitive
def __call__(self, value):
compare = value if self.case_sensitive else value.lower()
if self.value != compare:
raise ValueError(
"Value mismatch, expected %s, got %s." %
(self.value, value)
)
return value
[docs]class enum:
""" Helper for parameters that are expected to be in a certain enumeration.
A ValueError is raised if not.
"""
def __init__(self, values, case_sensitive=True):
self.values = values
self.compare_values = values if case_sensitive else [
lower(v) for v in values
]
self.case_sensitive = case_sensitive
def __call__(self, value):
compare = value if self.case_sensitive else value.lower()
if compare not in self.compare_values:
raise ValueError(
"Unexpected value '%s'. Expected one of: %s." %
(value, ", ".join(map(lambda s: "'%s'" % s, self.values)))
)
return value
[docs]def lower(value):
""" Functor to return a lower-case string.
"""
return value.lower()
[docs]def upper(value):
""" Functor to return a upper-case string.
"""
return value.upper()
[docs]def strip(value):
""" Functor to return a whitespace stripped string.
"""
return value.strip()
[docs]class value_range:
""" Helper to assert that a given parameter is within a specified range.
"""
def __init__(self, min, max, type=float):
self._min = min
self._max = max
self._type = type
def __call__(self, raw):
value = self._type(raw)
if value < self._min or value > self._max:
raise ValueError(
"Given value '%s' exceeds expected bounds (%s, %s)"
% (value, self._min, self._max)
)
return value
[docs]def boolean(raw):
""" Functor to convert "true"/"false" to a boolean.
"""
raw = raw.lower()
if raw not in ("true", "false"):
raise ValueError("Could not parse a boolean value from '%s'." % raw)
return raw == "true"
[docs]def to_dict(decoder, dict_class=dict):
""" Utility function to get a dictionary representation of the given decoder.
This function invokes all decoder parameters and sets the dictionary
fields accordingly
"""
return dict_class(
(name, getattr(decoder, name))
for name in dir(decoder)
if not name.startswith("_") and name != "namespaces"
)
[docs]class BaseParameter(property):
""" Abstract base class for XML, KVP or any other kind of parameter.
"""
def __init__(self, type=None, num=1, default=NO_DEFAULT,
default_factory=None):
super().__init__(self.fget)
self.type = type or str
self.num = num
self.default = default
self.default_factory = default_factory
[docs] def select(self, decoder):
""" Interface method.
"""
raise NotImplementedError
@property
def locator(self):
return ""
[docs] def fget(self, decoder):
""" Property getter function.
"""
results = self.select(decoder)
count = len(results)
locator = self.locator
multiple = self.num not in SINGLE_VALUES
# check the correct count of the result
if not multiple and count > 1:
raise WrongMultiplicityException(locator, "at most one", count)
elif self.num == 1 and count == 0:
raise MissingParameterException(locator)
elif self.num == ONE_OR_MORE and count == 0:
raise MissingParameterMultipleException(locator)
elif isinstance(self.num, int) and count != self.num:
raise WrongMultiplicityException(locator, self.num, count)
# parse the value/values, or return the defaults
if multiple:
if count == 0 and self.num == ANY:
if self.default_factory:
return self.default_factory()
elif self.default is not NO_DEFAULT:
return self.default
try:
return [self.type(v) for v in results]
except Exception as e:
# let some more sophisticated exceptions pass
if hasattr(e, "locator") or hasattr(e, "code"):
raise
raise InvalidParameterException(str(e), locator)
elif self.num == ZERO_OR_ONE and count == 0:
if self.default_factory:
return self.default_factory()
elif self.default is not NO_DEFAULT:
return self.default
else:
return None
elif self.type:
try:
return self.type(results[0])
except Exception as e:
# let some more sophisticated exceptions pass
if hasattr(e, "locator") or hasattr(e, "code"):
raise
raise InvalidParameterException(str(e), locator)
return results[0]
[docs]class BaseDecoder:
object_class = None
[docs] def create_object(self, params: dict):
""" Create the associated object for that decoder using the
passed parameters.
"""
if self.object_class is not None:
return self.object_class(**params)
raise NotImplementedError
[docs] def map_params(self, params):
""" Map parameters, if necessary. Default implementation is
a no-op.
"""
return params
[docs] def collect_params(self):
""" Collect all parameters. This will collect all values
which are computed using properties.
"""
cls = type(self)
return {
name: getattr(self, name)
for name in dir(self)
if isinstance(getattr(cls, name, None), property)
}
[docs] def decode(self):
""" Collect all decoder parameters and construct the object.
"""
return self.create_object(
self.map_params(
self.collect_params()
)
)