Source code for eodag.api.product.metadata_mapping
# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import ast
import json
import logging
import re
from datetime import datetime, timedelta
from string import Formatter
from typing import TYPE_CHECKING, Any, AnyStr, Callable, Iterator, Optional, Union, cast
import geojson
import orjson
import pyproj
import shapely
from dateutil.parser import isoparse
from dateutil.relativedelta import relativedelta
from dateutil.tz import UTC, tzutc
from jsonpath_ng.jsonpath import Child, JSONPath
from lxml import etree
from lxml.etree import XPathEvalError
from shapely import wkt
from shapely.geometry import MultiPolygon, Polygon
from shapely.ops import transform
from eodag.types.queryables import Queryables
from eodag.utils import (
    DEFAULT_PROJ,
    deepcopy,
    dict_items_recursive_apply,
    format_string,
    get_geometry_from_various,
    items_recursive_apply,
    nested_pairs2dict,
    remove_str_array_quotes,
    sanitize,
    string_to_jsonpath,
    update_nested_dict,
)
from eodag.utils.dates import get_timestamp
from eodag.utils.exceptions import ValidationError
if TYPE_CHECKING:
    from shapely.geometry.base import BaseGeometry
    from eodag.config import PluginConfig
logger = logging.getLogger("eodag.product.metadata_mapping")
SEP = r"#"
INGEST_CONVERSION_REGEX = re.compile(
    r"^{(?P<path>[^#]*)" + SEP + r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*}$"
)
NOT_AVAILABLE = "Not Available"
NOT_MAPPED = "Not Mapped"
ONLINE_STATUS = "ONLINE"
STAGING_STATUS = "STAGING"
OFFLINE_STATUS = "OFFLINE"
COORDS_ROUNDING_PRECISION = 4
WKT_MAX_LEN = 1600
COMPLEX_QS_REGEX = re.compile(r"^(.+=)?([^=]*)({.+})+([^=&]*)$")
DEFAULT_GEOMETRY = "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))"
def get_metadata_path(
    map_value: Union[str, list[str]],
) -> tuple[Union[list[str], None], str]:
    """Return the jsonpath or xpath to the value of a EO product metadata in a provider
    search result.
    The path is retrieved depending on if the metadata is queryable (the value
    associated to it in the provider search config metadata mapping is a list) or not
    (the value is directly the string corresponding to the path).
    Assume we have the following provider config::
        provider:
            ...
            search:
                ...
                metadata_mapping:
                    productType:
                        - productType
                        - $.properties.productType
                    id: $.properties.id
                    ...
                ...
            ...
    Then the metadata `id` is not queryable for this provider meanwhile `productType`
    is queryable. The first value of the `metadata_mapping.productType` is how the
    eodag search parameter `productType` is interpreted in the
    :class:`~eodag.plugins.search.base.Search` plugin implemented by `provider`, and is
    used when eodag delegates search process to the corresponding plugin.
    :param map_value: The value originating from the definition of `metadata_mapping`
                      in the provider search config. For example, it is the list
                      `['productType', '$.properties.productType']` with the sample
                      above. Or the string `$.properties.id`.
    :returns: Either, None and the path to the metadata value, or a list of converter
             and its args, and the path to the metadata value.
    """
    path = get_metadata_path_value(map_value)
    try:
        match = INGEST_CONVERSION_REGEX.match(path)
    except TypeError as e:
        logger.error("Could not match regex on metadata path '%s'" % str(path))
        raise e
    if match:
        g = match.groupdict()
        return [g["converter"], g["args"]], g["path"]
    return None, path
def get_metadata_path_value(map_value: Union[str, list[str]]) -> str:
    """Get raw metadata path without converter"""
    return map_value[1] if isinstance(map_value, list) else map_value
def get_search_param(map_value: list[str]) -> str:
    """See :func:`~eodag.api.product.metadata_mapping.get_metadata_path`
    :param map_value: The value originating from the definition of `metadata_mapping`
                      in the provider search config
    :returns: The value of the search parameter as defined in the provider config
    """
    # Assume that caller will pass in the value as a list
    return map_value[0]
[docs]
def format_metadata(search_param: str, *args: Any, **kwargs: Any) -> str:
    """Format a string of form ``{<field_name>#<conversion_function>}``
    The currently understood converters are:
        - ``ceda_collection_name``: generate a CEDA collection name from a string
        - ``csv_list``: convert to a comma separated list
        - ``datetime_to_timestamp_milliseconds``: converts a utc date string to a timestamp in milliseconds
        - ``dict_filter_and_sub``: filter dict items using jsonpath and then apply recursive_sub_str
        - ``fake_l2a_title_from_l1c``: used to generate SAFE format metadata for data from AWS
        - ``from_alternate``: update assets using given alternate
        - ``from_ewkt``: convert EWKT to shapely geometry / WKT in DEFAULT_PROJ
        - ``from_georss``: convert GeoRSS to shapely geometry / WKT in DEFAULT_PROJ
        - ``get_ecmwf_time``: get the time of a datetime string in the ECMWF format
        - ``get_group_name``: get the matching regex group name
        - ``recursive_sub_str``: recursively substitue in the structure (e.g. dict) values matching a regex
        - ``remove_extension``: on a string that contains dots, only take the first part of the list obtained by
          splitting the string on dots
        - ``replace_str``: execute "string".replace(old, new)
        - ``s2msil2a_title_to_aws_productinfo``: used to generate SAFE format metadata for data from AWS
        - ``sanitize``: sanitize string
        - ``slice_str``: slice a string (equivalent to s[start, end, step])
        - ``split_cop_dem_id``: get the bbox by splitting the product id
        - ``split_corine_id``: get the product type by splitting the product id
        - ``to_bounds_lists``: convert to list(s) of bounds
        - ``to_datetime_dict``: convert a datetime string to a dictionary where values are either a string or a list
        - ``to_ewkt``: convert to EWKT (Extended Well-Known text)
        - ``to_geojson``: convert to a GeoJSON (via __geo_interface__ if exists)
        - ``to_iso_date``: remove the time part of a iso datetime string
        - ``to_iso_utc_datetime_from_milliseconds``: convert a utc timestamp in given milliseconds to a utc iso datetime
        - ``to_iso_utc_datetime``: convert a UTC datetime string to ISO UTC datetime string
        - ``to_lower``: Convert a string to lowercase
        - ``to_nwse_bounds_str``: convert to North,West,South,East bounds string with given separator
        - ``to_nwse_bounds``: convert to North,West,South,East bounds
        - ``to_rounded_wkt``: simplify the WKT of a geometry
        - ``to_title``: Convert a string to title case
        - ``to_upper``: Convert a string to uppercase
    :param search_param: The string to be formatted
    :param args: (optional) Additional arguments to use in the formatting process
    :param kwargs: (optional) Additional named-arguments to use when formatting
    :returns: The formatted string
    """
    class MetadataFormatter(Formatter):
        CONVERSION_REGEX = re.compile(
            r"^(?P<field_name>.+)"
            + SEP
            + r"(?P<converter>[^\d\W]\w*)(\((?P<args>.*)\))*$"
        )
        def __init__(self) -> None:
            self.custom_converter: Optional[Callable] = None
            self.custom_args: Optional[str] = None
        def get_field(self, field_name: str, args: Any, kwargs: Any) -> Any:
            conversion_func_spec = self.CONVERSION_REGEX.match(field_name)
            # Register a custom converter if any for later use (see convert_field)
            # This is done because we don't have the value associated to field_name at
            # this stage
            if conversion_func_spec:
                field_name = conversion_func_spec.groupdict()["field_name"]
                converter = conversion_func_spec.groupdict()["converter"]
                self.custom_args = conversion_func_spec.groupdict()["args"]
                self.custom_converter = getattr(self, "convert_{}".format(converter))
            return super(MetadataFormatter, self).get_field(field_name, args, kwargs)
        def convert_field(self, value: Any, conversion: Any) -> Any:
            # Do custom conversion if any (see get_field)
            if self.custom_converter is not None:
                if self.custom_args is not None and value is not None:
                    converted = self.custom_converter(value, self.custom_args)
                elif value is not None:
                    converted = self.custom_converter(value)
                else:
                    converted = None
                # Clear this state variable in case the same converter is used to
                # resolve other named arguments
                self.custom_converter = None
                self.custom_args = None
                return converted
            return super(MetadataFormatter, self).convert_field(value, conversion)
        @staticmethod
        def convert_datetime_to_timestamp_milliseconds(date_time: str) -> int:
            """Convert a date_time (str) to a Unix timestamp in milliseconds
            "2021-04-21T18:27:19.123Z" => "1619029639123"
            "2021-04-21" => "1618963200000"
            "2021-04-21T00:00:00+02:00" => "1618956000000"
            """
            return int(1e3 * get_timestamp(date_time))
        @staticmethod
        def convert_to_iso_utc_datetime_from_milliseconds(
            timestamp: int,
        ) -> Union[str, int]:
            """Convert a timestamp in milliseconds (int) to its ISO8601 UTC format
            1619029639123 => "2021-04-21T18:27:19.123Z"
            """
            try:
                return (
                    datetime.fromtimestamp(timestamp / 1e3, tzutc())
                    .isoformat(timespec="milliseconds")
                    .replace("+00:00", "Z")
                )
            except TypeError:
                return timestamp
        @staticmethod
        def convert_to_iso_utc_datetime(
            date_time: str, timespec: str = "milliseconds"
        ) -> str:
            """Convert a date_time (str) to its ISO 8601 representation in UTC
            "2021-04-21" => "2021-04-21T00:00:00.000Z"
            "2021-04-21T00:00:00.000+02:00" => "2021-04-20T22:00:00.000Z"
            The optional argument timespec specifies the number of additional
            terms of the time to include. Valid options are 'auto', 'hours',
            'minutes', 'seconds', 'milliseconds' and 'microseconds'.
            """
            try:
                dt = isoparse(date_time)
            except ValueError:
                return date_time
            if not dt.tzinfo:
                dt = dt.replace(tzinfo=UTC)
            elif dt.tzinfo is not UTC:
                dt = dt.astimezone(UTC)
            return dt.isoformat(timespec=timespec).replace("+00:00", "Z")
        @staticmethod
        def convert_to_iso_date(
            datetime_string: str, time_delta_args_str: str = "0,0,0,0,0,0,0"
        ) -> str:
            """Convert an ISO8601 datetime (str) to its ISO8601 date format
            "2021-04-21T18:27:19.123Z" => "2021-04-21"
            "2021-04-21" => "2021-04-21"
            "2021-04-21T00:00:00+06:00" => "2021-04-20" !
            """
            dt = isoparse(datetime_string)
            if not dt.tzinfo:
                dt = dt.replace(tzinfo=UTC)
            elif dt.tzinfo is not UTC:
                dt = dt.astimezone(UTC)
            time_delta_args = ast.literal_eval(time_delta_args_str)
            dt += timedelta(*time_delta_args)
            return dt.isoformat()[:10]
        @staticmethod
        def convert_to_non_separated_date(datetime_string):
            iso_date = MetadataFormatter.convert_to_iso_date(datetime_string)
            return iso_date.replace("-", "")
        @staticmethod
        def convert_to_rounded_wkt(value: BaseGeometry) -> str:
            wkt_value = cast(
                str, wkt.dumps(value, rounding_precision=COORDS_ROUNDING_PRECISION)
            )
            # If needed, simplify WKT to prevent too long request failure
            tolerance = 0.1
            while len(wkt_value) > WKT_MAX_LEN and tolerance <= 1:
                logger.debug(
                    "Geometry WKT is too long (%s), trying to simplify it with tolerance %s",
                    len(wkt_value),
                    tolerance,
                )
                wkt_value = cast(
                    str,
                    wkt.dumps(
                        value.simplify(tolerance),
                        rounding_precision=COORDS_ROUNDING_PRECISION,
                    ),
                )
                tolerance += 0.1
            if len(wkt_value) > WKT_MAX_LEN and tolerance > 1:
                logger.warning("Failed to reduce WKT length lower than %s", WKT_MAX_LEN)
            return wkt_value
        @staticmethod
        def convert_to_bounds_lists(input_geom: BaseGeometry) -> list[list[float]]:
            if isinstance(input_geom, MultiPolygon):
                geoms = [geom for geom in input_geom.geoms]
                # sort with larger one at first (stac-browser only plots first one)
                geoms.sort(key=lambda x: x.area, reverse=True)
                return [list(x.bounds[0:4]) for x in geoms]
            else:
                return [list(input_geom.bounds[0:4])]
        @staticmethod
        def convert_to_bounds(input_geom_unformatted: Any) -> list[float]:
            input_geom = get_geometry_from_various(geometry=input_geom_unformatted)
            if isinstance(input_geom, MultiPolygon):
                geoms = [geom for geom in input_geom.geoms]
                # sort with larger one at first (stac-browser only plots first one)
                geoms.sort(key=lambda x: x.area, reverse=True)
                min_lon = 180
                min_lat = 90
                max_lon = -180
                max_lat = -90
                for geom in geoms:
                    min_lon = min(min_lon, geom.bounds[0])
                    min_lat = min(min_lat, geom.bounds[1])
                    max_lon = max(max_lon, geom.bounds[2])
                    max_lat = max(max_lat, geom.bounds[3])
                return [min_lon, min_lat, max_lon, max_lat]
            else:
                return list(input_geom.bounds[0:4])
        @staticmethod
        def convert_to_nwse_bounds(input_geom: BaseGeometry) -> list[float]:
            if isinstance(input_geom, str):
                input_geom = shapely.wkt.loads(input_geom)
            return list(input_geom.bounds[-1:] + input_geom.bounds[:-1])
        @staticmethod
        def convert_to_nwse_bounds_str(
            input_geom: BaseGeometry, separator: str = ","
        ) -> str:
            return separator.join(
                str(x) for x in MetadataFormatter.convert_to_nwse_bounds(input_geom)
            )
        @staticmethod
        def convert_to_geojson(value: Any) -> str:
            return geojson.dumps(value)
        @staticmethod
        def convert_to_geojson_polytope(
            value: BaseGeometry,
        ) -> Union[dict[Any, Any], str]:
            # ECMWF Polytope uses non-geojson structure for features
            if isinstance(value, Polygon):
                return {
                    "type": "polygon",
                    "shape": [[y, x] for x, y in value.exterior.coords],
                }
            raise ValidationError("to_geojson_polytope only accepts shapely Polygon")
        @staticmethod
        def convert_from_ewkt(ewkt_string: str) -> Union[BaseGeometry, str]:
            """Convert EWKT (Extended Well-Known text) to shapely geometry"""
            ewkt_regex = re.compile(
                r"^.*(?P<proj>SRID=[0-9]+);(?P<wkt>[A-Z0-9 \(\),\.-]+).*$"
            )
            ewkt_match = ewkt_regex.match(ewkt_string)
            if ewkt_match:
                g = ewkt_match.groupdict()
                from_proj = g["proj"].replace("SRID", "EPSG").replace("=", ":")
                input_geom = wkt.loads(g["wkt"])
                from_proj = pyproj.CRS(from_proj)
                to_proj = pyproj.CRS(DEFAULT_PROJ)
                if from_proj != to_proj:
                    # reproject
                    project = pyproj.Transformer.from_crs(
                        from_proj, to_proj, always_xy=True
                    ).transform
                    return transform(project, input_geom)
                else:
                    return input_geom
            else:
                logger.warning(f"Could not read {ewkt_string} as EWKT")
                return ewkt_string
        @staticmethod
        def convert_to_ewkt(input_geom: BaseGeometry) -> str:
            """Convert shapely geometry to EWKT (Extended Well-Known text)"""
            proj = DEFAULT_PROJ.upper().replace("EPSG", "SRID").replace(":", "=")
            wkt_geom = MetadataFormatter.convert_to_rounded_wkt(input_geom)
            return f"{proj};{wkt_geom}"
        @staticmethod
        def convert_from_georss(georss: Any) -> Union[BaseGeometry, Any]:
            """Convert GeoRSS to shapely geometry"""
            if "polygon" in georss.tag:
                # Polygon
                coords_list = georss.text.split()
                polygon_args = [
                    (float(coords_list[2 * i]), float(coords_list[2 * i + 1]))
                    for i in range(int(len(coords_list) / 2))
                ]
                return Polygon(polygon_args)
            elif len(georss) == 1 and "multisurface" in georss[0].tag.lower():
                # Multipolygon
                from_proj = getattr(georss[0], "attrib", {}).get("srsName")
                if from_proj:
                    from_proj = pyproj.CRS(from_proj)
                    to_proj = pyproj.CRS(DEFAULT_PROJ)
                    project = pyproj.Transformer.from_crs(
                        from_proj, to_proj, always_xy=True
                    ).transform
                # function to get deepest elements
                def flatten_elements(nested) -> Iterator[Any]:
                    for e in nested:
                        if len(e) > 0:
                            yield from flatten_elements(e)
                        else:
                            yield e
                polygons_list: list[Polygon] = []
                for elem in flatten_elements(georss[0]):
                    coords_list = elem.text.split()
                    polygon_args = [
                        (float(coords_list[2 * i]), float(coords_list[2 * i + 1]))
                        for i in range(int(len(coords_list) / 2))
                    ]
                    polygon = Polygon(polygon_args)
                    # reproject if needed
                    if from_proj and from_proj != to_proj:
                        polygons_list.append(transform(project, polygon))
                    else:
                        polygons_list.append(polygon)
                return MultiPolygon(polygons_list)
            else:
                logger.warning(
                    f"Incoming GeoRSS format not supported yet: {str(georss)}"
                )
                return georss
        @staticmethod
        def convert_to_longitude_latitude(
            input_geom_unformatted: Any,
        ) -> dict[str, float]:
            bounds = MetadataFormatter.convert_to_bounds(input_geom_unformatted)
            lon = (bounds[0] + bounds[2]) / 2
            lat = (bounds[1] + bounds[3]) / 2
            return {"lon": lon, "lat": lat}
        @staticmethod
        def convert_csv_list(values_list: Any, separator=",") -> Any:
            if isinstance(values_list, list):
                return separator.join([str(x) for x in values_list])
            else:
                return values_list
        @staticmethod
        def convert_remove_extension(string: str) -> str:
            parts = string.split(".")
            if parts:
                return parts[0]
            return ""
        @staticmethod
        def convert_get_group_name(string: str, pattern: str) -> str:
            sanitized_pattern = pattern.replace(" ", "_SPACE_")
            try:
                match = re.search(sanitized_pattern, str(string))
                if match:
                    if result := match.lastgroup:
                        return result.replace("_SPACE_", " ")
                    else:
                        return NOT_AVAILABLE
            except AttributeError:
                pass
            logger.warning(
                "Could not extract property from %s using %s", string, pattern
            )
            return NOT_AVAILABLE
        @staticmethod
        def convert_replace_str(value: Any, args: str) -> str:
            if isinstance(value, dict):
                value = MetadataFormatter.convert_to_geojson(value)
            elif not isinstance(value, str):
                raise TypeError(
                    f"convert_replace_str expects a string or a dict (apply to_geojson). Got {type(value)}: {value}"
                )
            old, new = ast.literal_eval(args)
            return re.sub(old, new, value)
        @staticmethod
        def convert_replace_str_tuple(value: Any, args: str) -> str:
            """
            Apply multiple replacements on a string.
            args should be a string representing a list/tuple of (old, new) pairs.
            Example: '(("old1", "new1"), ("old2", "new2"))'
            """
            if isinstance(value, dict):
                value = MetadataFormatter.convert_to_geojson(value)
            elif not isinstance(value, str):
                raise TypeError(
                    f"convert_replace_str_tuple expects a string or a dict (apply to_geojson). "
                    f"Got {type(value)}: {value}"
                )
            # args sera une chaîne représentant une liste/tuple de tuples
            replacements = ast.literal_eval(args)
            if not isinstance(replacements, (list, tuple)):
                raise TypeError(
                    f"convert_replace_str_tuple expects a list/tuple of (old,new) pairs. "
                    f"Got {type(replacements)}: {replacements}"
                )
            for old, new in replacements:
                value = re.sub(old, new, value)
            return value
        @staticmethod
        def convert_ceda_collection_name(value: str) -> str:
            data_regex = re.compile(r"/data/(?P<name>.+?)/?$")
            match = data_regex.search(value)
            if match:
                return match.group("name").replace("/", "_").upper()
            return "NOT_AVAILABLE"
        @staticmethod
        def convert_recursive_sub_str(
            input_obj: Union[dict[Any, Any], list[Any]], args: str
        ) -> Union[dict[Any, Any], list[Any]]:
            old, new = ast.literal_eval(args)
            return items_recursive_apply(
                input_obj,
                lambda k, v, x, y: re.sub(x, y, v) if isinstance(v, str) else v,
                **{"x": old, "y": new},
            )
        @staticmethod
        def convert_dict_update(
            input_dict: dict[Any, Any], args: str
        ) -> dict[Any, Any]:
            """Converts"""
            new_items_list = ast.literal_eval(args)
            new_items_dict = nested_pairs2dict(new_items_list)
            return dict(input_dict, **new_items_dict)
        @staticmethod
        def convert_dict_filter(
            input_dict: dict[Any, Any], jsonpath_filter_str: str
        ) -> dict[Any, Any]:
            """Fitlers dict items using jsonpath"""
            jsonpath_filter = string_to_jsonpath(jsonpath_filter_str, force=True)
            if isinstance(jsonpath_filter, str) or not isinstance(input_dict, dict):
                return {}
            keys_list = list(input_dict.keys())
            matches = jsonpath_filter.find(input_dict)
            result = {}
            for match in matches:
                # extract key index from matched jsonpath
                matched_jsonpath_str = str(match.full_path)
                matched_index = int(matched_jsonpath_str.split(".")[-1][1:-1])
                key = keys_list[matched_index]
                result[key] = match.value
            return result
        @staticmethod
        def convert_dict_filter_and_sub(
            input_dict: dict[Any, Any], args: str
        ) -> Union[dict[Any, Any], list[Any]]:
            """Fitlers dict items using jsonpath and then apply recursive_sub_str"""
            jsonpath_filter_str, old, new = ast.literal_eval(args)
            filtered = MetadataFormatter.convert_dict_filter(
                input_dict, jsonpath_filter_str
            )
            args_str = f"('{old}', '{new}')"
            return MetadataFormatter.convert_recursive_sub_str(filtered, args_str)
        @staticmethod
        def convert_from_alternate(
            input_obj: dict[str, Any], value: str
        ) -> dict[str, Any]:
            """
            Update assets using given alternate.
            """
            result: dict[str, Any] = {}
            for k, v in input_obj.items():
                if not isinstance(v, dict):
                    continue
                alt_dict = deepcopy(v).get("alternate")
                if not isinstance(alt_dict, dict):
                    continue
                value_entry = alt_dict.pop(value, None)
                if not isinstance(value_entry, dict):
                    continue
                result[k] = v | value_entry | {"alternate": alt_dict}
                if len(result[k]["alternate"]) == 0:
                    del result[k]["alternate"]
            return result
        @staticmethod
        def convert_slice_str(string: str, args: str) -> str:
            cmin, cmax, cstep = [
                int(x.strip()) if x.strip().lstrip("-").isdigit() else None
                for x in args.split(",")
            ]
            return string[cmin:cmax:cstep]
        @staticmethod
        def convert_to_lower(string: str) -> str:
            """Convert a string to lowercase."""
            if string == NOT_AVAILABLE:
                return string
            return string.lower()
        @staticmethod
        def convert_to_upper(string: str) -> str:
            """Convert a string to uppercase."""
            return string.upper()
        @staticmethod
        def convert_to_title(string: str) -> str:
            """Convert a string to title case."""
            if string == NOT_AVAILABLE:
                return string
            return string.title()
        @staticmethod
        def convert_fake_l2a_title_from_l1c(string: str) -> str:
            id_regex = re.compile(
                r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<id3>\w+)_(?P<id4>\w+)_(?P<id5>\w+)_(?P<id6>\w+)_(?P<id7>\w+)$"
            )
            id_match = id_regex.match(string)
            if id_match:
                id_dict = id_match.groupdict()
                return "%s_MSIL2A_%s____________%s________________" % (
                    id_dict["id1"],
                    id_dict["id3"],
                    id_dict["id6"],
                )
            else:
                logger.error("Could not extract fake title from %s" % string)
                return NOT_AVAILABLE
        @staticmethod
        def convert_s2msil2a_title_to_aws_productinfo(string: str) -> str:
            id_regex = re.compile(
                r"^(?P<id1>\w+)_(?P<id2>\w+)_(?P<year>[0-9]{4})(?P<month>[0-9]{2})(?P<day>[0-9]{2})T[0-9]+_"
                + r"(?P<id4>[A-Z0-9_]+)_(?P<id5>[A-Z0-9_]+)_T(?P<tile1>[0-9]{2})(?P<tile2>[A-Z])(?P<tile3>[A-Z]{2})_"
                + r"(?P<id7>[A-Z0-9_]+)$"
            )
            id_match = id_regex.match(string)
            if id_match:
                id_dict = id_match.groupdict()
                return (
                    "https://roda.sentinel-hub.com/sentinel-s2-l2a/tiles/%s/%s/%s/%s/%s/%s/0/{collection}.json"
                    % (
                        id_dict["tile1"],
                        id_dict["tile2"],
                        id_dict["tile3"],
                        id_dict["year"],
                        int(id_dict["month"]),
                        int(id_dict["day"]),
                    )
                )
            else:
                logger.error("Could not extract title infos from %s" % string)
                return NOT_AVAILABLE
        @staticmethod
        def convert_split_id_into_s3_params(product_id: str) -> dict[str, str]:
            parts: list[str] = re.split(r"_(?!_)", product_id)
            params = {"productType": product_id[4:15]}
            dates = re.findall("[0-9]{8}T[0-9]{6}", product_id)
            start_date = datetime.strptime(dates[0], "%Y%m%dT%H%M%S") - timedelta(
                seconds=1
            )
            params["startDate"] = start_date.strftime("%Y-%m-%dT%H:%M:%SZ")
            end_date = datetime.strptime(dates[1], "%Y%m%dT%H%M%S") + timedelta(
                seconds=1
            )
            params["endDate"] = end_date.strftime("%Y-%m-%dT%H:%M:%SZ")
            params["timeliness"] = parts[-2]
            params["sat"] = "Sentinel-" + parts[0][1:]
            return params
        @staticmethod
        def convert_dates_from_cmems_id(product_id: str):
            date_format_1 = "[0-9]{10}"
            date_format_2 = "[0-9]{8}"
            dates = re.findall(date_format_1, product_id)
            if dates:
                date = dates[0]
            else:
                dates = re.findall(date_format_2, product_id)
                date = dates[0]
            if len(date) == 10:
                date_time = datetime.strptime(dates[0], "%Y%m%d%H")
            else:
                date_time = datetime.strptime(dates[0], "%Y%m%d")
            return {
                "min_date": date_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
                "max_date": (date_time + timedelta(days=1)).strftime(
                    "%Y-%m-%dT%H:%M:%SZ"
                ),
            }
        @staticmethod
        def convert_to_datetime_dict(
            date: str, format: str
        ) -> dict[str, Union[list[str], str]]:
            """Convert a date (str) to a dictionary where values are in the format given in argument
            date == "2021-04-21T18:27:19.123Z" and format == "list" => {
                "year": ["2021"],
                "month": ["04"],
                "day": ["21"],
                "hour": ["18"],
                "minute": ["27"],
                "second": ["19"],
            }
            date == "2021-04-21T18:27:19.123Z" and format == "string" => {
                "year": "2021",
                "month": "04",
                "day": "21",
                "hour": "18",
                "minute": "27",
                "second": "19",
            }
            date == "2021-04-21" and format == "list" => {
                "year": ["2021"],
                "month": ["04"],
                "day": ["21"],
                "hour": ["00"],
                "minute": ["00"],
                "second": ["00"],
            }
            """
            utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date)
            date_object = datetime.strptime(utc_date, "%Y-%m-%dT%H:%M:%S.%fZ")
            if format == "list":
                return {
                    "year": [date_object.strftime("%Y")],
                    "month": [date_object.strftime("%m")],
                    "day": [date_object.strftime("%d")],
                    "hour": [date_object.strftime("%H")],
                    "minute": [date_object.strftime("%M")],
                    "second": [date_object.strftime("%S")],
                }
            else:
                return {
                    "year": date_object.strftime("%Y"),
                    "month": date_object.strftime("%m"),
                    "day": date_object.strftime("%d"),
                    "hour": date_object.strftime("%H"),
                    "minute": date_object.strftime("%M"),
                    "second": date_object.strftime("%S"),
                }
        @staticmethod
        def convert_interval_to_datetime_dict(
            date: str, separator: str = "/"
        ) -> dict[str, list[str]]:
            """Convert a date interval ('/' separated str) to a dictionary where values are lists
            date == "2021-04-21/2021-04-22" => {
                "year": ["2021"],
                "month": ["04"],
                "day": ["21", "22"],
            }
            """
            if separator not in date:
                raise ValueError(
                    f"Could not format {date} using convert_interval_to_datetime_dict: {separator} separator missing"
                )
            start, end = date.split(separator)
            start_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(start)
            end_utc_date = MetadataFormatter.convert_to_iso_utc_datetime(end)
            start_date_object = datetime.strptime(
                start_utc_date, "%Y-%m-%dT%H:%M:%S.%fZ"
            )
            end_date_object = datetime.strptime(end_utc_date, "%Y-%m-%dT%H:%M:%S.%fZ")
            delta_utc_date = end_date_object - start_date_object
            years = set()
            months = set()
            days = set()
            for i in range(delta_utc_date.days + 1):
                date_object = start_date_object + timedelta(days=i)
                years.add(date_object.strftime("%Y"))
                months.add(date_object.strftime("%m"))
                days.add(date_object.strftime("%d"))
            return {
                "year": list(years),
                "month": list(months),
                "day": list(days),
            }
        @staticmethod
        def convert_get_ecmwf_time(date: str) -> list[str]:
            """Get the time of a date (str) in the ECMWF format (["HH:00"])
            "2021-04-21T18:27:19.123Z" => ["18:00"]
            "2021-04-21" => ["00:00"]
            """
            return [
                str(MetadataFormatter.convert_to_datetime_dict(date, "str")["hour"])
                + ":00"
            ]
        @staticmethod
        def convert_sanitize(text: str) -> str:
            """Sanitize string"""
            return sanitize(text)
        @staticmethod
        def convert_get_dates_from_string(text: str, split_param="-"):
            reg = "[0-9]{8}" + split_param + "[0-9]{8}"
            match = re.search(reg, text)
            if not match:
                return NOT_AVAILABLE
            dates_str = match.group()
            dates = dates_str.split(split_param)
            start_date = datetime.strptime(dates[0], "%Y%m%d")
            end_date = datetime.strptime(dates[1], "%Y%m%d")
            return {
                "startDate": start_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
                "endDate": end_date.strftime("%Y-%m-%dT%H:%M:%SZ"),
            }
        @staticmethod
        def convert_get_hydrological_year(date: str):
            utc_date = MetadataFormatter.convert_to_iso_utc_datetime(date)
            date_object = datetime.strptime(utc_date, "%Y-%m-%dT%H:%M:%S.%fZ")
            date_object_second_year = date_object + relativedelta(years=1)
            return [
                f"{date_object.strftime('%Y')}_{date_object_second_year.strftime('%y')}"
            ]
        @staticmethod
        def convert_get_variables_from_path(path: str):
            if "?" not in path:
                return []
            variables = path.split("?")[1]
            return variables.split(",")
        @staticmethod
        def convert_assets_list_to_dict(
            assets_list: list[dict[str, str]], asset_name_key: str = "title"
        ) -> dict[str, dict[str, str]]:
            """Convert a list of assets to a dictionary where keys represent
            name of assets and are found among values of asset dictionaries.
            assets_list == [
                {"href": "foo", "title": "asset1", "name": "foo-name"},
                {"href": "bar", "title": "path/to/asset1", "name": "bar-name"},
                {"href": "baz", "title": "path/to/asset2", "name": "baz-name"},
                {"href": "qux", "title": "asset3", "name": "qux-name"},
            ] and asset_name_key == "title" => {
                "asset1": {"href": "foo", "title": "asset1", "name": "foo-name"},
                "path/to/asset1": {"href": "bar", "title": "path/to/asset1", "name": "bar-name"},
                "asset2": {"href": "baz", "title": "path/to/asset2", "name": "baz-name"},
                "asset3": {"href": "qux", "title": "asset3", "name": "qux-name"},
            }
            assets_list == [
                {"href": "foo", "title": "foo-title", "name": "asset1"},
                {"href": "bar", "title": "bar-title", "name": "path/to/asset1"},
                {"href": "baz", "title": "baz-title", "name": "path/to/asset2"},
                {"href": "qux", "title": "qux-title", "name": "asset3"},
            ] and asset_name_key == "name" => {
                "asset1": {"href": "foo", "title": "foo-title", "name": "asset1"},
                "path/to/asset1": {"href": "bar", "title": "bar-title", "name": "path/to/asset1"},
                "asset2": {"href": "baz", "title": "baz-title", "name": "path/to/asset2"},
                "asset3": {"href": "qux", "title": "qux-title", "name": "asset3"},
            }
            """
            asset_names: list[str] = []
            assets_dict: dict[str, dict[str, str]] = {}
            for asset in assets_list:
                asset_name = asset[asset_name_key]
                asset_names.append(asset_name)
                assets_dict[asset_name] = asset
            # we only keep the equivalent of the path basename in the case where the
            # asset name has a path pattern and this basename is only found once
            immutable_asset_indexes: list[int] = []
            for i, asset_name in enumerate(asset_names):
                if i in immutable_asset_indexes:
                    continue
                change_asset_name = True
                asset_basename = asset_name.split("/")[-1]
                j = i + 1
                while change_asset_name and j < len(asset_names):
                    asset_tmp_basename = asset_names[j].split("/")[-1]
                    if asset_basename == asset_tmp_basename:
                        change_asset_name = False
                        immutable_asset_indexes.extend([i, j])
                    j += 1
                if change_asset_name:
                    assets_dict[asset_basename] = assets_dict.pop(asset_name)
            return assets_dict
    # if stac extension colon separator `:` is in search params, parse it to prevent issues with vformat
    if re.search(r"{[\w-]*:[\w#-]*}", search_param):
        search_param = re.sub(r"{([\w-]*):([\w#-]*)}", r"{\1_COLON_\2}", search_param)
        kwargs = {k.replace(":", "_COLON_"): v for k, v in kwargs.items()}
    return MetadataFormatter().vformat(search_param, args, kwargs)
def properties_from_json(
    json: dict[str, Any],
    mapping: dict[str, Any],
    discovery_config: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    """Extract properties from a provider json result.
    :param json: The representation of a provider result as a json object
    :param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata
                    keys and the location of the values of these properties in the json
                    representation, expressed as a
                    `jsonpath <http://goessner.net/articles/JsonPath/>`_
    :param discovery_config: (optional) metadata discovery configuration dict, accepting among other items
                             `discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"),
                             `discovery_path` (String representation of jsonpath)
    :returns: The metadata of the :class:`~eodag.api.product._product.EOProduct`
    """
    properties: dict[str, Any] = {}
    templates = {}
    used_jsonpaths = []
    for metadata, value in mapping.items():
        # Treat the case when the value is from a queryable metadata
        if isinstance(value, list):
            conversion_or_none, path_or_text = value[1]
        else:
            conversion_or_none, path_or_text = value
        if isinstance(path_or_text, str):
            if re.search(r"({[^{}:]+})+", path_or_text):
                templates[metadata] = path_or_text
            else:
                properties[metadata] = path_or_text
        else:
            try:
                match = path_or_text.find(json)
            except KeyError:
                match = []
            if len(match) == 1:
                extracted_value = match[0].value
                used_jsonpaths.append(match[0].full_path)
            else:
                extracted_value = NOT_AVAILABLE
            if extracted_value is None:
                properties[metadata] = None
            else:
                if conversion_or_none is None:
                    properties[metadata] = extracted_value
                else:
                    # reformat conversion_or_none as metadata#converter(args) or metadata#converter
                    if (
                        len(conversion_or_none) > 1
                        and isinstance(conversion_or_none, list)
                        and conversion_or_none[1] is not None
                    ):
                        conversion_or_none = "%s(%s)" % (
                            conversion_or_none[0],
                            conversion_or_none[1],
                        )
                    elif isinstance(conversion_or_none, list):
                        conversion_or_none = conversion_or_none[0]
                    # check if conversion uses variables to format
                    if re.search(r"({[^{}:]+})+", conversion_or_none):
                        conversion_or_none = conversion_or_none.format(**properties)
                    if extracted_value == NOT_AVAILABLE:
                        # try if value can be formatted even if it is not available
                        try:
                            properties[metadata] = format_metadata(
                                "{%s%s%s}" % (metadata, SEP, conversion_or_none),
                                **{metadata: extracted_value},
                            )
                        except ValueError:
                            logger.debug(
                                f"{metadata}: {extracted_value} could not be formatted with {conversion_or_none}"
                            )
                            continue
                    else:
                        # in this case formatting should work, otherwise something is wrong in the mapping
                        properties[metadata] = format_metadata(
                            "{%s%s%s}" % (metadata, SEP, conversion_or_none),
                            **{metadata: extracted_value},
                        )
        # properties as python objects when possible (format_metadata returns only strings)
        try:
            properties[metadata] = ast.literal_eval(properties[metadata])
        except Exception:
            pass
    # Resolve templates
    for metadata, template in templates.items():
        try:
            properties[metadata] = format_string(metadata, template, **properties)
        except ValueError:
            logger.warning(
                f"Could not parse {metadata} ({template}) using product properties"
            )
            logger.debug(f"available properties: {properties}")
            properties[metadata] = NOT_AVAILABLE
    # adds missing discovered properties
    if not discovery_config:
        discovery_config = {}
    discovery_pattern = discovery_config.get("metadata_pattern")
    discovery_path = discovery_config.get("metadata_path")
    if discovery_pattern and discovery_path:
        discovery_jsonpath = string_to_jsonpath(discovery_path)
        discovered_properties = (
            discovery_jsonpath.find(json)
            if isinstance(discovery_jsonpath, JSONPath)
            else []
        )
        for found_jsonpath in discovered_properties:
            if "metadata_path_id" in discovery_config.keys():
                found_key_paths = string_to_jsonpath(
                    discovery_config["metadata_path_id"], force=True
                ).find(found_jsonpath.value)
                if not found_key_paths or isinstance(found_key_paths, int):
                    continue
                found_key = found_key_paths[0].value
                used_jsonpath = Child(
                    found_jsonpath.full_path,
                    string_to_jsonpath(
                        discovery_config["metadata_path_value"], force=True
                    ),
                )
            else:
                # default key got from metadata_path
                found_key = found_jsonpath.path.fields[-1]
                used_jsonpath = found_jsonpath.full_path
            if (
                re.compile(discovery_pattern).match(found_key)
                and found_key not in properties.keys()
                and used_jsonpath not in used_jsonpaths
            ):
                if "metadata_path_value" in discovery_config.keys():
                    found_value_path = string_to_jsonpath(
                        discovery_config["metadata_path_value"], force=True
                    ).find(found_jsonpath.value)
                    properties[found_key] = (
                        found_value_path[0].value
                        if found_value_path and not isinstance(found_value_path, int)
                        else NOT_AVAILABLE
                    )
                else:
                    # default value got from metadata_path
                    properties[found_key] = found_jsonpath.value
                # properties as python objects when possible (format_metadata returns only strings)
                try:
                    properties[found_key] = ast.literal_eval(properties[found_key])
                except Exception:
                    pass
    return properties
def properties_from_xml(
    xml_as_text: AnyStr,
    mapping: Any,
    empty_ns_prefix: str = "ns",
    discovery_config: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
    """Extract properties from a provider xml result.
    :param xml_as_text: The representation of a provider result as xml
    :param mapping: A mapping between :class:`~eodag.api.product._product.EOProduct`'s metadata
                    keys and the location of the values of these properties in the xml
                    representation, expressed as a
                    `xpath <https://www.w3schools.com/xml/xml_xpath.asp>`_
    :param empty_ns_prefix: (optional) The name to give to the default namespace of `xml_as_text`.
                            This is a technical workaround for the limitation of lxml
                            not supporting empty namespace prefix. The
                            xpath in `mapping` must use this value to be able to
                            correctly reach empty-namespace prefixed elements
    :param discovery_config: (optional) metadata discovery configuration dict, accepting among other items
                             `discovery_pattern` (Regex pattern for metadata key discovery, e.g. "^[a-zA-Z]+$"),
                             `discovery_path` (String representation of xpath)
    :returns: the metadata of the :class:`~eodag.api.product._product.EOProduct`
    """
    properties: dict[str, Any] = {}
    templates = {}
    used_xpaths = []
    root = etree.XML(xml_as_text)
    for metadata, value in mapping.items():
        # Treat the case when the value is from a queryable metadata
        if isinstance(value, list):
            conversion_or_none, path_or_text = value[1]
        else:
            conversion_or_none, path_or_text = value
        try:
            extracted_value = root.xpath(
                path_or_text,
                namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()},
            )
            if len(extracted_value) <= 1:
                if len(extracted_value) < 1:
                    # If there is no matched value (empty list), mark the metadata as not
                    # available
                    extracted_value = [NOT_AVAILABLE]
                else:
                    # store element tag in used_xpaths
                    used_xpaths.append(
                        getattr(
                            root.xpath(
                                path_or_text.replace("/text()", ""),
                                namespaces={
                                    k or empty_ns_prefix: v
                                    for k, v in root.nsmap.items()
                                },
                            )[0],
                            "tag",
                            None,
                        )
                    )
                if conversion_or_none is None:
                    properties[metadata] = extracted_value[0]
                else:
                    # reformat conversion_or_none as metadata#converter(args) or metadata#converter
                    if (
                        len(conversion_or_none) > 1
                        and isinstance(conversion_or_none, list)
                        and conversion_or_none[1] is not None
                    ):
                        conversion_or_none = "%s(%s)" % (
                            conversion_or_none[0],
                            conversion_or_none[1],
                        )
                    elif isinstance(conversion_or_none, list):
                        conversion_or_none = conversion_or_none[0]
                    properties[metadata] = format_metadata(
                        "{%s%s%s}" % (metadata, SEP, conversion_or_none),
                        **{metadata: extracted_value[0]},
                    )
            # If there are multiple matches, consider the result as a list, doing a
            # formatting if any
            else:
                if conversion_or_none is None:
                    properties[metadata] = extracted_value
                else:
                    # reformat conversion_or_none as metadata#converter(args) or metadata#converter
                    if (
                        len(conversion_or_none) > 1
                        and isinstance(conversion_or_none, list)
                        and conversion_or_none[1] is not None
                    ):
                        conversion_or_none = "%s(%s)" % (
                            conversion_or_none[0],
                            conversion_or_none[1],
                        )
                    elif isinstance(conversion_or_none, list):
                        conversion_or_none = conversion_or_none[0]
                    # check if conversion uses variables to format
                    if re.search(r"({[^{}:]+})+", conversion_or_none):
                        conversion_or_none = conversion_or_none.format(**properties)
                    properties[metadata] = [
                        format_metadata(
                            "{%s%s%s}"
                            % (
                                metadata,
                                SEP,
                                conversion_or_none,
                            ),  # Re-build conversion format identifier
                            **{metadata: extracted_value_item},
                        )
                        for extracted_value_item in extracted_value
                    ]
        except XPathEvalError:
            # Assume the mapping is to be passed as is, in which case we readily
            # register it, or is a template, in which case we register it for later
            # formatting resolution using previously successfully resolved properties
            # Ignore any transformation specified. If a value is to be passed as is,
            # we don't want to transform it further
            if re.search(r"({[^{}:]+})+", path_or_text):
                templates[metadata] = path_or_text
            else:
                properties[metadata] = path_or_text
    # Resolve templates
    for metadata, template in templates.items():
        properties[metadata] = template.format(**properties)
    # adds missing discovered properties
    if not discovery_config:
        discovery_config = {}
    discovery_pattern = discovery_config.get("metadata_pattern")
    discovery_path = discovery_config.get("metadata_path")
    if discovery_pattern and discovery_path:
        discovered_properties = root.xpath(
            discovery_path,
            namespaces={k or empty_ns_prefix: v for k, v in root.nsmap.items()},
        )
        for found_xpath in discovered_properties:
            found_key = found_xpath.tag.rpartition("}")[-1]
            if (
                re.compile(discovery_pattern).match(found_key)
                and found_key not in properties.keys()
                and found_xpath.tag not in used_xpaths
            ):
                properties[found_key] = found_xpath.text
    return properties
def mtd_cfg_as_conversion_and_querypath(
    src_dict: dict[str, Any],
    dest_dict: dict[str, Any] = {},
    result_type: str = "json",
) -> dict[str, Any]:
    """Metadata configuration dictionary to querypath with conversion dictionary
    Transform every src_dict value from jsonpath_str to tuple `(conversion, jsonpath_object)`
    or from xpath_str to tuple `(conversion, xpath_str)`
    :param src_dict: Input dict containing jsonpath str as values
    :param dest_dict: (optional) Output dict containing jsonpath objects as values
    :returns: dest_dict
    """
    # check if the configuration has already been converted
    some_configured_value = (
        next(iter(dest_dict.values())) if dest_dict else next(iter(src_dict.values()))
    )
    if (
        isinstance(some_configured_value, list)
        and isinstance(some_configured_value[1], tuple)
        or isinstance(some_configured_value, tuple)
    ):
        return dest_dict or src_dict
    if not dest_dict:
        dest_dict = deepcopy(src_dict)
    for metadata in src_dict:
        if metadata not in dest_dict:
            dest_dict[metadata] = (None, NOT_MAPPED)
        else:
            conversion, path = get_metadata_path(dest_dict[metadata])
            if result_type == "json":
                parsed_path = string_to_jsonpath(path)
                if isinstance(parsed_path, str):
                    # not a jsonpath: assume the mapping is to be passed as is. Ignore any transformation specified.
                    # If a value is to be passed as is, we don't want to transform it further
                    conversion = None
            else:
                parsed_path = path
            if isinstance(dest_dict[metadata], list) and len(dest_dict[metadata]) == 2:
                dest_dict[metadata][1] = (conversion, parsed_path)
            else:
                dest_dict[metadata] = (conversion, parsed_path)
            # Put the updated mapping at the end
            dest_dict[metadata] = dest_dict.pop(metadata)
    return dest_dict
def format_query_params(
    product_type: str,
    config: PluginConfig,
    query_dict: dict[str, Any],
    error_context: str = "",
) -> dict[str, Any]:
    """format the search parameters to query parameters"""
    if "raise_errors" in query_dict.keys():
        del query_dict["raise_errors"]
    # . not allowed in eodag_search_key, replaced with %2E
    query_dict = {k.replace(".", "%2E"): v for k, v in query_dict.items()}
    product_type_metadata_mapping = dict(
        config.metadata_mapping,
        **config.products.get(product_type, {}).get("metadata_mapping", {}),
    )
    # Raise error if non-queryables parameters are used and raise_mtd_discovery_error configured
    if (
        raise_mtd_discovery_error := config.products.get(product_type, {})
        .get("discover_metadata", {})
        .get("raise_mtd_discovery_error")
    ) is None:
        raise_mtd_discovery_error = getattr(config, "discover_metadata", {}).get(
            "raise_mtd_discovery_error", False
        )
    query_params: dict[str, Any] = {}
    # Get all the search parameters that are recognised as queryables by the
    # provider (they appear in the queryables dictionary)
    queryables = _get_queryables(
        query_dict,
        config,
        product_type_metadata_mapping,
        raise_mtd_discovery_error,
        error_context,
    )
    for eodag_search_key, provider_search_param in queryables.items():
        user_input = query_dict[eodag_search_key]
        if provider_search_param == user_input:
            # means the mapping is to be passed as is, in which case we
            # readily register it
            if (
                eodag_search_key in query_params
                and isinstance(query_params[eodag_search_key], dict)
                and isinstance(user_input, dict)
            ):
                query_params[eodag_search_key].update(user_input)
            else:
                query_params[eodag_search_key] = user_input
            continue
        if COMPLEX_QS_REGEX.match(provider_search_param):
            parts = provider_search_param.split("=")
            if len(parts) == 1:
                formatted_query_param = format_metadata(
                    provider_search_param, product_type, **query_dict
                )
                formatted_query_param = formatted_query_param.replace("'", '"')
                if "{{" in provider_search_param:
                    # retrieve values from hashes where keys are given in the param
                    if "}[" in formatted_query_param:
                        formatted_query_param = _resolve_hashes(formatted_query_param)
                    # remove quotes around arrays
                    formatted_query_param = remove_str_array_quotes(
                        formatted_query_param
                    )
                    # json query string (for POST request)
                    update_nested_dict(
                        query_params,
                        orjson.loads(formatted_query_param),
                        extend_list_values=True,
                        allow_extend_duplicates=False,
                    )
                else:
                    query_params[eodag_search_key] = formatted_query_param
            else:
                provider_search_key, provider_value = parts
                query_params[provider_search_key] = format_metadata(
                    provider_value, product_type, **query_dict
                )
        else:
            query_params[provider_search_param] = user_input
    # Now get all the literal search params (i.e params to be passed "as is"
    # in the search request)
    # ignore additional_params if it isn't a dictionary
    literal_search_params = getattr(config, "literal_search_params", {})
    if not isinstance(literal_search_params, dict):
        literal_search_params = {}
    # Now add formatted free text search parameters (this is for cases where a
    # complex query through a free text search parameter is available for the
    # provider and needed for the consumer)
    product_type_metadata_mapping = dict(
        config.metadata_mapping,
        **config.products.get(product_type, {}).get("metadata_mapping", {}),
    )
    literal_search_params.update(
        _format_free_text_search(config, product_type_metadata_mapping, **query_dict)
    )
    for provider_search_key, provider_value in literal_search_params.items():
        if isinstance(provider_value, list):
            query_params.setdefault(provider_search_key, []).extend(provider_value)
        else:
            query_params.setdefault(provider_search_key, []).append(provider_value)
    return query_params
def _resolve_hashes(formatted_query_param: str) -> str:
    """
    resolves structures of the format {"a": "abc", "b": "cde"}["a"] given in the formatted_query_param
    the structure is replaced by the value corresponding to the given key in the hash
    (in this case "abc")
    """
    # check if there is still a hash to be resolved
    while '}["' in formatted_query_param:
        # find and parse code between {}
        ind_open = formatted_query_param.find('}["')
        ind_close = formatted_query_param.find('"]', ind_open)
        hash_start = formatted_query_param[:ind_open].rfind(": {") + 2
        h = orjson.loads(formatted_query_param[hash_start : ind_open + 1])
        # find key and get value
        ind_key_start = formatted_query_param.find('"', ind_open) + 1
        key = formatted_query_param[ind_key_start:ind_close]
        value = h[key]
        # replace hash with value
        if isinstance(value, str):
            formatted_query_param = formatted_query_param.replace(
                formatted_query_param[hash_start : ind_close + 2], '"' + value + '"'
            )
        else:
            formatted_query_param = formatted_query_param.replace(
                formatted_query_param[hash_start : ind_close + 2], json.dumps(value)
            )
    return formatted_query_param
def _format_free_text_search(
    config: PluginConfig, metadata_mapping: dict[str, Any], **kwargs: Any
) -> dict[str, Any]:
    """Build the free text search parameter using the search parameters"""
    query_params: dict[str, Any] = {}
    if not getattr(config, "free_text_search_operations", None):
        return query_params
    for param, operations_config in config.free_text_search_operations.items():
        union = operations_config["union"]
        wrapper = operations_config.get("wrapper", "{}")
        formatted_query = []
        for operator, operands in operations_config["operations"].items():
            # The Operator string is the operator wrapped with spaces
            operator = " {} ".format(operator)
            # Build the operation string by joining the formatted operands together
            # using the operation string
            operation_string = operator.join(
                format_metadata(operand, **kwargs)
                for operand in operands
                if any(
                    re.search(rf"{{{kw}[}}#]", operand)
                    and val is not None
                    and isinstance(metadata_mapping.get(kw, []), list)
                    for kw, val in kwargs.items()
                )
            )
            # Finally wrap the operation string as specified by the wrapper and add
            # it to the list of queries (only if the operation string is not empty)
            if operation_string:
                query = wrapper.format(operation_string)
                formatted_query.append(query)
        # Join the formatted query using the "union" config parameter, and then
        # wrap it with the Python format string specified in the "wrapper" config
        # parameter
        final_query = union.join(formatted_query)
        if len(operations_config["operations"]) > 1 and len(formatted_query) > 1:
            final_query = wrapper.format(query_params[param])
        if final_query:
            query_params[param] = final_query
    return query_params
def _get_queryables(
    search_params: dict[str, Any],
    config: PluginConfig,
    metadata_mapping: dict[str, Any],
    raise_mtd_discovery_error: bool,
    error_context: str,
) -> dict[str, Any]:
    """Retrieve the metadata mappings that are query-able"""
    logger.debug("Retrieving queryable metadata from metadata_mapping")
    queryables: dict[str, Any] = {}
    for eodag_search_key, user_input in search_params.items():
        if user_input is not None:
            md_mapping = metadata_mapping.get(eodag_search_key, (None, NOT_MAPPED))
            # raise an error when a query param not allowed by the provider is found
            if not isinstance(md_mapping, list) and raise_mtd_discovery_error:
                raise ValidationError(
                    "Search parameters which are not queryable are disallowed for this product type on this provider: "
                    f"please remove '{eodag_search_key}' from your search parameters. {error_context}",
                    {eodag_search_key},
                )
            _, md_value = md_mapping
            # query param from defined metadata_mapping
            if md_mapping is not None and isinstance(md_mapping, list):
                search_param = get_search_param(md_mapping)
                if search_param is not None:
                    queryables[eodag_search_key] = search_param
            # query param from metadata auto discovery
            elif md_value == NOT_MAPPED and getattr(
                config, "discover_metadata", {}
            ).get("auto_discovery", False):
                pattern = re.compile(
                    config.discover_metadata.get("metadata_pattern", "")
                )
                search_param_cfg = config.discover_metadata.get("search_param", "")
                search_param_unparsed_cfg = config.discover_metadata.get(
                    "search_param_unparsed", []
                )
                if (
                    search_param_unparsed_cfg
                    and eodag_search_key in search_param_unparsed_cfg
                ):
                    queryables[eodag_search_key] = user_input
                elif pattern.match(eodag_search_key) and isinstance(
                    search_param_cfg, str
                ):
                    search_param = search_param_cfg.format(metadata=eodag_search_key)
                    queryables[eodag_search_key] = search_param
                elif pattern.match(eodag_search_key) and isinstance(
                    search_param_cfg, dict
                ):
                    search_param_cfg_parsed = dict_items_recursive_apply(
                        search_param_cfg,
                        lambda k, v: v.format(metadata=eodag_search_key),
                    )
                    for k, v in search_param_cfg_parsed.items():
                        if getattr(config, k, None):
                            update_nested_dict(
                                getattr(config, k),
                                v,
                                extend_list_values=True,
                                allow_extend_duplicates=False,
                            )
                        else:
                            logger.warning(
                                "Could not use discover_metadata[search_param]: no entry for %s in plugin config",
                                k,
                            )
    return queryables
def get_queryable_from_provider(
    provider_queryable: str, metadata_mapping: dict[str, Union[str, list[str]]]
) -> Optional[str]:
    """Get EODAG configured queryable parameter from provider queryable parameter
    :param provider_queryable: provider queryable parameter
    :param metadata_mapping: metadata-mapping configuration
    :returns: EODAG configured queryable parameter or None
    """
    pattern = rf"\"{provider_queryable}\""
    # if 1:1 mapping exists privilege this one instead of other mapping
    # e.g. provider queryable = year -> use year and not date in which year also appears
    mapping_values = [
        v[0] if isinstance(v, list) else "" for v in metadata_mapping.values()
    ]
    if provider_queryable in mapping_values:
        ind = mapping_values.index(provider_queryable)
        return Queryables.get_queryable_from_alias(list(metadata_mapping.keys())[ind])
    for param, param_conf in metadata_mapping.items():
        if (
            isinstance(param_conf, list)
            and param_conf[0]
            and re.search(pattern, param_conf[0])
        ):
            return Queryables.get_queryable_from_alias(param)
    return None
def get_provider_queryable_path(
    queryable: str, metadata_mapping: dict[str, Union[str, list[str]]]
) -> Optional[str]:
    """Get EODAG configured queryable path from its parameter
    :param queryable: eodag queryable parameter
    :param metadata_mapping: metadata-mapping configuration
    :returns: EODAG configured queryable path or None
    """
    parameter_conf = metadata_mapping.get(queryable)
    if isinstance(parameter_conf, list):
        return parameter_conf[0]
    else:
        return None
def get_provider_queryable_key(
    eodag_key: str,
    provider_queryables: dict[str, Any],
    metadata_mapping: dict[str, Union[list[Any], str]],
) -> str:
    """Finds the provider queryable corresponding to the given eodag key based on the metadata mapping
    :param eodag_key: key in eodag
    :param provider_queryables: queryables returned from the provider
    :param metadata_mapping: metadata mapping from which the keys are retrieved
    :returns: provider queryable key
    """
    if eodag_key not in metadata_mapping:
        return ""
    mapping_key = metadata_mapping[eodag_key]
    if isinstance(mapping_key, list):
        for queryable in provider_queryables:
            pattern = rf"\b{queryable}\b"
            if re.search(pattern, mapping_key[0]):
                return queryable
        return ""
    else:
        return eodag_key
# Keys taken from OpenSearch extension for Earth Observation http://docs.opengeospatial.org/is/13-026r9/13-026r9.html
# For a metadata to be queryable, The way to query it must be specified in the
# provider metadata_mapping configuration parameter. It will be automatically
# detected as queryable by eodag when this is done
OSEO_METADATA_MAPPING = {
    # Opensearch resource identifier within the search engine context (in our case
    # within the context of the data provider)
    "uid": "$.uid",
    # OpenSearch Parameters for Collection Search (Table 3)
    "productType": "$.properties.productType",
    "doi": "$.properties.doi",
    "platform": "$.properties.platform",
    "platformSerialIdentifier": "$.properties.platformSerialIdentifier",
    "instrument": "$.properties.instrument",
    "sensorType": "$.properties.sensorType",
    "compositeType": "$.properties.compositeType",
    "processingLevel": "$.properties.processingLevel",
    "orbitType": "$.properties.orbitType",
    "spectralRange": "$.properties.spectralRange",
    "wavelengths": "$.properties.wavelengths",
    "hasSecurityConstraints": "$.properties.hasSecurityConstraints",
    "dissemination": "$.properties.dissemination",
    # INSPIRE obligated OpenSearch Parameters for Collection Search (Table 4)
    "title": "$.properties.title",
    "topicCategory": "$.properties.topicCategory",
    "keyword": "$.properties.keyword",
    "abstract": "$.properties.abstract",
    "resolution": "$.properties.resolution",
    "organisationName": "$.properties.organisationName",
    "organisationRole": "$.properties.organisationRole",
    "publicationDate": "$.properties.publicationDate",
    "lineage": "$.properties.lineage",
    "useLimitation": "$.properties.useLimitation",
    "accessConstraint": "$.properties.accessConstraint",
    "otherConstraint": "$.properties.otherConstraint",
    "classification": "$.properties.classification",
    "language": "$.properties.language",
    "specification": "$.properties.specification",
    # OpenSearch Parameters for Product Search (Table 5)
    "parentIdentifier": "$.properties.parentIdentifier",
    "productionStatus": "$.properties.productionStatus",
    "acquisitionType": "$.properties.acquisitionType",
    "orbitNumber": "$.properties.orbitNumber",
    "orbitDirection": "$.properties.orbitDirection",
    "track": "$.properties.track",
    "frame": "$.properties.frame",
    "swathIdentifier": "$.properties.swathIdentifier",
    "cloudCover": "$.properties.cloudCover",
    "snowCover": "$.properties.snowCover",
    "lowestLocation": "$.properties.lowestLocation",
    "highestLocation": "$.properties.highestLocation",
    "productVersion": "$.properties.productVersion",
    "productQualityStatus": "$.properties.productQualityStatus",
    "productQualityDegradationTag": "$.properties.productQualityDegradationTag",
    "processorName": "$.properties.processorName",
    "processingCenter": "$.properties.processingCenter",
    "creationDate": "$.properties.creationDate",
    "modificationDate": "$.properties.modificationDate",
    "processingDate": "$.properties.processingDate",
    "sensorMode": "$.properties.sensorMode",
    "archivingCenter": "$.properties.archivingCenter",
    "processingMode": "$.properties.processingMode",
    # OpenSearch Parameters for Acquistion Parameters Search (Table 6)
    "availabilityTime": "$.properties.availabilityTime",
    "acquisitionStation": "$.properties.acquisitionStation",
    "acquisitionSubType": "$.properties.acquisitionSubType",
    "startTimeFromAscendingNode": "$.properties.startTimeFromAscendingNode",
    "completionTimeFromAscendingNode": "$.properties.completionTimeFromAscendingNode",
    "illuminationAzimuthAngle": "$.properties.illuminationAzimuthAngle",
    "illuminationZenithAngle": "$.properties.illuminationZenithAngle",
    "illuminationElevationAngle": "$.properties.illuminationElevationAngle",
    "polarizationMode": "$.properties.polarizationMode",
    "polarizationChannels": "$.properties.polarizationChannels",
    "antennaLookDirection": "$.properties.antennaLookDirection",
    "minimumIncidenceAngle": "$.properties.minimumIncidenceAngle",
    "maximumIncidenceAngle": "$.properties.maximumIncidenceAngle",
    "dopplerFrequency": "$.properties.dopplerFrequency",
    "incidenceAngleVariation": "$.properties.incidenceAngleVariation",
}
DEFAULT_METADATA_MAPPING = dict(
    OSEO_METADATA_MAPPING,
    **{
        # Custom parameters (not defined in the base document referenced above)
        # id differs from uid. The id is an identifier by which a product which is
        # distributed by many providers can be retrieved (a property that it has in common
        # in the catalogues of all the providers on which it is referenced)
        "id": "$.id",
        # The geographic extent of the product
        "geometry": "$.geometry",
        # The url of the quicklook
        "quicklook": "$.properties.quicklook",
        # The url to download the product "as is" (literal or as a template to be completed
        # either after the search result is obtained from the provider or during the eodag
        # download phase)
        "downloadLink": "$.properties.downloadLink",
    },
)
