Source code for ows.wms.v13.encoders

# -------------------------------------------------------------------------------
#
# Project: pyows <http://eoxserver.org>
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# -------------------------------------------------------------------------------
# Copyright (C) 2019 EOX IT Services GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# -------------------------------------------------------------------------------

from datetime import date, datetime, timedelta

from ows.util import Result, isoformat, duration
from ..types import (
    ServiceCapabilities, Operation, Layer, Style,
    Dimension, Range, GetMapRequest,
    DimensionValueType, DimensionResolutionType
)
from .namespaces import WMS, ns_xlink


[docs]def reference_attrs(href=None, type=None, role=None, arcrole=None, title=None, about=None): attrs = [ (ns_xlink('href'), href), (ns_xlink('type'), type), (ns_xlink('role'), role), (ns_xlink('arcrole'), arcrole), (ns_xlink('title'), title), (ns_xlink('about'), about), ] return { name: value for name, value in attrs if value is not None }
[docs]def encode_operation(operation: Operation): return WMS(operation.name, *[ WMS('Format', frmt) for frmt in operation.formats ] + [ WMS('DCPType', WMS('HTTP', *[ WMS(operation_method.method.value.capitalize(), WMS('OnlineResource', **reference_attrs( href=operation_method.service_url ) ) ) for operation_method in operation.operation_methods ]) ) ] )
[docs]def encode_boolean(value: bool): if value is None: return None return 'true' if value else 'false'
[docs]def encode_dimension_value(value: DimensionValueType): if isinstance(value, str): return value elif isinstance(value, datetime): return isoformat(value) return str(value)
[docs]def encode_dimension_resolution(value: DimensionResolutionType): if isinstance(value, timedelta): return duration(value) return str(value)
[docs]def encode_dimension(dimension: Dimension): value = None if isinstance(dimension.values, list): value = ','.join( encode_dimension_value(v) for v in dimension.values ) elif isinstance(dimension.values, Range): value = '/'.join([ encode_dimension_value(dimension.values.start), encode_dimension_value(dimension.values.stop), encode_dimension_resolution(dimension.values.resolution), ]) return WMS('Dimension', value, name=dimension.name, units=dimension.units, unitSymbol=dimension.unit_symbol, default=dimension.default, multipleValues=encode_boolean(dimension.multiple_values), nearestValue=encode_boolean(dimension.nearest_value), current=encode_boolean(dimension.current), )
[docs]def encode_style(style: Style): return WMS('Style', WMS('Name', style.name), WMS('Title', style.title), WMS('Abstract', style.abstract) if style.abstract else None, *[ WMS('LegendURL', WMS('Format', legend_url.format), WMS('OnlineResource', **reference_attrs( href=legend_url.href )), width=str(legend_url.width), height=str(legend_url.height), ) for legend_url in style.legend_urls ] + [ WMS('StyleSheetURL', WMS('Format', style.style_sheet_url.format), WMS('OnlineResource', **reference_attrs( href=style.style_sheet_url.href )), ) if style.style_sheet_url else None, WMS('StyleURL', WMS('Format', style.style_url.format), WMS('OnlineResource', **reference_attrs( href=style.style_url.href )), ) if style.style_url else None, ] )
[docs]def encode_layer(layer: Layer): return WMS('Layer', WMS('Name', layer.name) if layer.name else None, WMS('Title', layer.title), WMS('Abstract', layer.abstract) if layer.abstract else None, WMS('KeywordList', *[ WMS('Keyword', keyword) for keyword in layer.keywords ]) if layer.keywords else None, *[ WMS('CRS', crs) for crs in layer.crss ] + [ WMS('EX_GeographicBoundingBox', WMS('westBoundLongitude', str(layer.wgs84_bounding_box.bbox[0]) ), WMS('eastBoundLongitude', str(layer.wgs84_bounding_box.bbox[2]) ), WMS('southBoundLatitude', str(layer.wgs84_bounding_box.bbox[1]) ), WMS('northBoundLatitude', str(layer.wgs84_bounding_box.bbox[3]) ), ) if layer.wgs84_bounding_box else None ] + [ WMS('BoundingBox', CRS=bounding_box.crs, minx=str(bounding_box.bbox[0]), miny=str(bounding_box.bbox[1]), maxx=str(bounding_box.bbox[2]), maxy=str(bounding_box.bbox[3]), ) for bounding_box in layer.bounding_boxes ] + [ encode_dimension(dimension) for dimension in layer.dimensions ] + [ WMS('Attribution', layer.attribution) if layer.attribution else None ] + [ WMS('AuthorityURL', WMS('OnlineResource', **reference_attrs(href=href)), name=name, ) for name, href in layer.authority_urls.items() ] + [ WMS('Identifier', identifier, authority=identifier, ) for authority, identifier in layer.identifiers.items() ] + [ WMS('MetadataURL', WMS('Format', metadata_url.format), WMS('OnlineResource', **reference_attrs( href=metadata_url.href )), # type=metadata_url.type, ) for metadata_url in layer.metadata_urls ] + [ WMS('DataURL', WMS('Format', data_url.format), WMS('OnlineResource', **reference_attrs( href=data_url.href )), ) for data_url in layer.data_urls ] + [ WMS('FeatureListURL', WMS('Format', feature_list_url.format), WMS('OnlineResource', **reference_attrs( href=feature_list_url.href )), ) for feature_list_url in layer.feature_list_urls ] + [ encode_style(style) for style in layer.styles ] + [ WMS('MinScaleDenominator', str(layer.min_scale_denominator) ) if layer.min_scale_denominator else None, WMS('MaxScaleDenominator', str(layer.max_scale_denominator) ) if layer.max_scale_denominator else None, ] + [ encode_layer(sub_layer) for sub_layer in layer.layers ] )
[docs]def xml_encode_capabilities(capabilities: ServiceCapabilities, **kwargs): root = WMS('WMS_Capabilities', WMS('Service', WMS('Name', 'WMS'), WMS('Title', capabilities.title or ''), WMS('Abstract', capabilities.abstract), WMS('KeywordList', *[ WMS('Keyword', keyword) for keyword in capabilities.keywords ]) if capabilities.keywords else None, WMS('OnlineResource', **{ **reference_attrs(href=capabilities.online_resource) }), WMS('ContactInformation', WMS('ContactPerson', capabilities.individual_name ) if capabilities.individual_name else None, WMS('ContactOrganization', capabilities.organisation_name ) if capabilities.organisation_name else None, WMS('ContactPosition', capabilities.position_name ) if capabilities.position_name else None, WMS('ContactAddress', WMS('AddressType', ''), WMS('Address', capabilities.delivery_point or ''), WMS('City', capabilities.city or ''), WMS('StateOrProvince', capabilities.administrative_area or '' ), WMS('PostCode', capabilities.postal_code or ''), WMS('Country', capabilities.country or ''), ) if capabilities.delivery_point else None, WMS('ContactVoiceTelephone', capabilities.phone_voice ) if capabilities.phone_voice else None, WMS('ContactFacsimileTelephone', capabilities.phone_facsimile ) if capabilities.phone_facsimile else None, WMS('ContactElectronicMailAddress', capabilities.electronic_mail_address ) if capabilities.electronic_mail_address else None, ), WMS('Fees', capabilities.fees ) if capabilities.fees else None, WMS('AccessConstraints', capabilities.access_constraints[0] ) if capabilities.access_constraints else None, WMS('LayerLimit', str(capabilities.layer_limit) ) if capabilities.layer_limit is not None else None, WMS('MaxWidth', str(capabilities.max_width) ) if capabilities.max_width is not None else None, WMS('MaxHeight', str(capabilities.max_height) ) if capabilities.max_height is not None else None, ), WMS('Capability', WMS('Request', *[ encode_operation(operation) for operation in capabilities.operations ]), WMS('Exception', *[ WMS('Format', exception_format) for exception_format in capabilities.exception_formats ]), encode_layer(capabilities.layer) if capabilities.layer else None, ), version=( str(capabilities.service_type_versions[0]) if capabilities.service_type_versions else '1.3.0' ), updateSequence=capabilities.update_sequence ) return Result.from_etree(root, **kwargs)
def _encode_dimension(values, value_encoder=str, resolution_encoder=str): values = [values] if not isinstance(values, list) else values def encode_value(value): if isinstance(value, Range): if value.resolution is not None: return f'{value_encoder(value.start)}/{value_encoder(value.stop)}/{resolution_encoder(value.resolution)}' # noqa else: return f'{value_encoder(value.start)}/{value_encoder(value.stop)}' return value_encoder(value) return ','.join(encode_value(v) for v in values)
[docs]def kvp_encode_get_map_request(request: GetMapRequest, swap_coordinates=False): bbox = request.bounding_box.bbox if swap_coordinates: bbox = [bbox[1], bbox[0], bbox[3], bbox[2]] params = [ ('service', 'WMS'), ('version', str(request.version)), ('request', 'GetMap'), ('layers', ','.join(request.layers)), ('styles', ','.join(s or '' for s in request.styles)), ('crs', request.bounding_box.crs), ('bbox', ','.join(str(v) for v in bbox)), ('width', str(request.width)), ('height', str(request.height)), ('format', request.format), ] if request.transparent is not None: params.append(('transparent', str(request.transparent).upper())) if request.background_color is not None: params.append(('bgcolor', request.background_color)) if request.exceptions is not None: params.append(('exceptions', request.exceptions)) if request.time is not None: params.append(('time', _encode_dimension(request.time, isoformat, duration))) if request.elevation is not None: params.append(('elevation', _encode_dimension(request.elevation))) for name, value in request.dimensions.items(): lower = name.lower() params.append((f'dim_{lower}', _encode_dimension(value))) return Result.from_kvp(params)