# -*- coding: utf-8 -*-
# Copyright 2022, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import hashlib
import logging
import re
from collections import OrderedDict
from datetime import date, datetime, timedelta, timezone
from types import MethodType
from typing import TYPE_CHECKING, Annotated, Any, Optional, Union
from urllib.parse import quote_plus, unquote_plus
import geojson
import orjson
from dateutil.parser import isoparse
from dateutil.tz import tzutc
from dateutil.utils import today
from pydantic import Field
from pydantic.fields import FieldInfo
from requests.auth import AuthBase
from shapely.geometry.base import BaseGeometry
from typing_extensions import get_args  # noqa: F401
from eodag.api.product import EOProduct
from eodag.api.product.metadata_mapping import (
    DEFAULT_GEOMETRY,
    NOT_AVAILABLE,
    OFFLINE_STATUS,
    STAGING_STATUS,
    format_metadata,
    mtd_cfg_as_conversion_and_querypath,
    properties_from_json,
)
from eodag.api.search_result import RawSearchResult
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.qssearch import PostJsonSearch, QueryStringSearch
from eodag.types import json_field_definition_to_python  # noqa: F401
from eodag.types.queryables import Queryables, QueryablesDict
from eodag.utils import (
    DEFAULT_MISSION_START_DATE,
    DEFAULT_SEARCH_TIMEOUT,
    deepcopy,
    dict_items_recursive_sort,
    get_geometry_from_various,
)
from eodag.utils.cache import instance_cached_method
from eodag.utils.dates import is_range_in_range
from eodag.utils.exceptions import DownloadError, NotAvailableError, ValidationError
from eodag.utils.requests import fetch_json
if TYPE_CHECKING:
    from eodag.config import PluginConfig
logger = logging.getLogger("eodag.search.build_search_result")
ECMWF_PREFIX = "ecmwf:"
# keywords from ECMWF keyword database + "dataset" (not part of database but exists)
# database: https://confluence.ecmwf.int/display/UDOC/Keywords+in+MARS+and+Dissemination+requests
ECMWF_KEYWORDS = {
    "dataset",
    "accuracy",
    "activity",
    "anoffset",
    "bitmap",
    "block",
    "channel",
    "class",
    "database",
    "date",
    "diagnostic",
    "direction",
    "domain",
    "duplicates",
    "expect",
    "expver",
    "fcmonth",
    "fcperiod",
    "fieldset",
    "filter",
    "feature",
    "format",
    "frame",
    "frequency",
    "generation",
    "grid",
    "hdate",
    "ident",
    "interpolation",
    "intgrid",
    "iteration",
    "latitude",
    "levelist",
    "levtype",
    "longitude",
    "lsm",
    "method",
    "number",
    "obsgroup",
    "obstype",
    "origin",
    "packing",
    "padding",
    "param",
    "priority",
    "product",
    "range",
    "realization",
    "refdate",
    "reference",
    "reportype",
    "repres",
    "resolution",
    "rotation",
    "section",
    "source",
    "step",
    "stream",
    "system",
    "target",
    "time",
    "truncation",
    "type",
    "use",
}
# additional keywords from copernicus services
COP_DS_KEYWORDS = {
    "aerosol_type",
    "altitude",
    "product_type",
    "band",
    "cdr_type",
    "data_format",
    "dataset_type",
    "day",
    "download_format",
    "ensemble_member",
    "experiment",
    "forcing_type",
    "gcm",
    "hday",
    "hmonth",
    "horizontal_resolution",
    "hydrological_model",
    "hydrological_year",
    "hyear",
    "input_observations",
    "leadtime_hour",
    "leadtime_month",
    "level",
    "location",
    "model",
    "model_level",
    "model_levels",
    "month",
    "nominal_day",
    "originating_centre",
    "period",
    "pressure_level",
    "processing_level",
    "processing_type",
    "product_version",
    "quantity",
    "rcm",
    "region",
    "release_version",
    "satellite",
    "sensor",
    "sensor_and_algorithm",
    "soil_level",
    "sky_type",
    "statistic",
    "system_version",
    "temporal_aggregation",
    "time_aggregation",
    "time_reference",
    "time_step",
    "variable",
    "variable_type",
    "version",
    "year",
}
ALLOWED_KEYWORDS = ECMWF_KEYWORDS | COP_DS_KEYWORDS
END = "completionTimeFromAscendingNode"
START = "startTimeFromAscendingNode"
def ecmwf_mtd() -> dict[str, Any]:
    """
    Make metadata mapping dict from a list of defined ECMWF Keywords
    We automatically add the #to_geojson convert to prevent modification of entries by eval() in the metadata mapping.
    keyword:
        - keyword
        - $."keyword"#to_geojson
    :return: metadata mapping dict
    """
    return {k: [k, f'{{$."{k}"#to_geojson}}'] for k in ALLOWED_KEYWORDS}
def _update_properties_from_element(
    prop: dict[str, Any], element: dict[str, Any], values: list[str]
) -> None:
    """updates a property dict with the given values based on the information from the element dict
    e.g. the type is set based on the type of the element
    """
    # multichoice elements are transformed into array
    if element["type"] in ("StringListWidget", "StringListArrayWidget"):
        prop["type"] = "array"
        if values:
            prop["items"] = {"type": "string", "enum": sorted(values)}
    # single choice elements are transformed into string
    elif element["type"] in (
        "StringChoiceWidget",
        "DateRangeWidget",
        "FreeformInputWidget",
    ):
        prop["type"] = "string"
        if values:
            prop["enum"] = sorted(values)
    # a bbox element
    elif element["type"] in ["GeographicExtentWidget", "GeographicExtentMapWidget"]:
        prop.update(
            {
                "type": "array",
                "minItems": 4,
                "additionalItems": False,
                "items": [
                    {
                        "type": "number",
                        "maximum": 180,
                        "minimum": -180,
                        "description": "West border of the bounding box",
                    },
                    {
                        "type": "number",
                        "maximum": 90,
                        "minimum": -90,
                        "description": "South border of the bounding box",
                    },
                    {
                        "type": "number",
                        "maximum": 180,
                        "minimum": -180,
                        "description": "East border of the bounding box",
                    },
                    {
                        "type": "number",
                        "maximum": 90,
                        "minimum": -90,
                        "description": "North border of the bounding box",
                    },
                ],
            }
        )
    # DateRangeWidget is a calendar date picker
    if element["type"] == "DateRangeWidget":
        prop["description"] = "date formatted like yyyy-mm-dd/yyyy-mm-dd"
    if description := element.get("help"):
        prop["description"] = description
def ecmwf_format(v: str) -> str:
    """Add ECMWF prefix to value v if v is a ECMWF keyword."""
    return ECMWF_PREFIX + v if v in ALLOWED_KEYWORDS else v
def get_min_max(
    value: Optional[Union[str, list[str]]] = None,
) -> tuple[Optional[str], Optional[str]]:
    """Returns the min and max from a list of strings or the same string if a single string is given."""
    if isinstance(value, list):
        sorted_values = sorted(value)
        return sorted_values[0], sorted_values[-1]
    return value, value
def append_time(input_date: date, time: Optional[str]) -> datetime:
    """
    Parses a time string in format HHMM and appends it to a date.
    if the time string is in format HH:MM or HH_MM we convert it to HHMM
    """
    if not time:
        time = "0000"
    time = re.sub(":|_", "", time)
    if time == "2400":
        time = "0000"
    dt = datetime.combine(input_date, datetime.strptime(time, "%H%M").time())
    dt.replace(tzinfo=timezone.utc)
    return dt
def parse_date(
    date: str, time: Optional[Union[str, list[str]]]
) -> tuple[datetime, datetime]:
    """Parses a date string in formats YYYY-MM-DD, YYYMMDD, solo or in start/end or start/to/end intervals."""
    if "to" in date:
        start_date_str, end_date_str = date.split("/to/")
    elif "/" in date:
        dates = date.split("/")
        start_date_str = dates[0]
        end_date_str = dates[-1]
    else:
        start_date_str = end_date_str = date
    # Update YYYYMMDD formatted dates
    if re.match(r"^\d{8}$", start_date_str):
        start_date_str = (
            f"{start_date_str[:4]}-{start_date_str[4:6]}-{start_date_str[6:]}"
        )
    if re.match(r"^\d{8}$", end_date_str):
        end_date_str = f"{end_date_str[:4]}-{end_date_str[4:6]}-{end_date_str[6:]}"
    start_date = datetime.fromisoformat(start_date_str.rstrip("Z"))
    end_date = datetime.fromisoformat(end_date_str.rstrip("Z"))
    if time:
        start_t, end_t = get_min_max(time)
        start_date = append_time(start_date.date(), start_t)
        end_date = append_time(end_date.date(), end_t)
    return start_date, end_date
def parse_year_month_day(
    year: Union[str, list[str]],
    month: Optional[Union[str, list[str]]] = None,
    day: Optional[Union[str, list[str]]] = None,
    time: Optional[Union[str, list[str]]] = None,
) -> tuple[datetime, datetime]:
    """Extracts and returns the year, month, day, and time from the parameters."""
    def build_date(year, month=None, day=None, time=None) -> datetime:
        """Datetime from default_date with updated year, month, day and time."""
        updated_date = datetime(int(year), 1, 1).replace(
            month=int(month) if month is not None else 1,
            day=int(day) if day is not None else 1,
        )
        if time is not None:
            updated_date = append_time(updated_date.date(), time)
        return updated_date
    start_y, end_y = get_min_max(year)
    start_m, end_m = get_min_max(month)
    start_d, end_d = get_min_max(day)
    start_t, end_t = get_min_max(time)
    start_date = build_date(start_y, start_m, start_d, start_t)
    end_date = build_date(end_y, end_m, end_d, end_t)
    return start_date, end_date
def ecmwf_temporal_to_eodag(
    params: dict[str, Any]
) -> tuple[Optional[str], Optional[str]]:
    """
    Converts ECMWF temporal parameters to EODAG temporal parameters.
    ECMWF temporal parameters:
        - **year** or **hyear**: Union[str, list[str]] — Year(s) as a string or list of strings.
        - **month** or **hmonth**: Union[str, list[str]] — Month(s) as a string or list of strings.
        - **day** or **hday**: Union[str, list[str]] — Day(s) as a string or list of strings.
        - **time**: str — A string representing the time in the format `HHMM` (e.g., `0200`, `0800`, `1400`).
        - **date**: str — A string in one of the formats:
            - `YYYY-MM-DD`
            - `YYYY-MM-DD/YYYY-MM-DD`
            - `YYYY-MM-DD/to/YYYY-MM-DD`
    :param params: Dictionary containing ECMWF temporal parameters.
    :return: A tuple with:
        - **start**: A string in the format `YYYY-MM-DDTHH:MM:SSZ`.
        - **end**: A string in the format `YYYY-MM-DDTHH:MM:SSZ`.
    """
    start = end = None
    if date := params.get("date"):
        if isinstance(date, list):
            date = "/".join(date)
        start, end = parse_date(date, params.get("time"))
    elif year := (params.get("year") or params.get("hyear")):
        month = params.get("month") or params.get("hmonth")
        day = params.get("day") or params.get("hday")
        time = params.get("time")
        start, end = parse_year_month_day(year, month, day, time)
    if start and end:
        return start.strftime("%Y-%m-%dT%H:%M:%SZ"), end.strftime("%Y-%m-%dT%H:%M:%SZ")
    else:
        return None, None
[docs]
class ECMWFSearch(PostJsonSearch):
    """ECMWF search plugin.
    This plugin builds a :class:`~eodag.api.search_result.SearchResult` containing a single product
    using given query parameters as product properties.
    The available configuration parameters inherits from parent classes, with some particular parameters
    for this plugin.
    :param provider: An eodag providers configuration dictionary
    :param config: Search plugin configuration:
        * :attr:`~eodag.config.PluginConfig.remove_from_query` (``list[str]``): List of parameters
          used to parse metadata but that must not be included to the query
        * :attr:`~eodag.config.PluginConfig.end_date_excluded` (``bool``): Set to `False` if
          provider does not include end date to search
        * :attr:`~eodag.config.PluginConfig.dates_required` (``bool``): if date parameters are mandatory in the request
        * :attr:`~eodag.config.PluginConfig.discover_queryables`
          (:class:`~eodag.config.PluginConfig.DiscoverQueryables`): configuration to fetch the queryables from a
          provider queryables endpoint; It has the following keys:
          * :attr:`~eodag.config.PluginConfig.DiscoverQueryables.fetch_url` (``str``): url to fetch the queryables valid
            for all product types
          * :attr:`~eodag.config.PluginConfig.DiscoverQueryables.product_type_fetch_url` (``str``): url to fetch the
            queryables for a specific product type
          * :attr:`~eodag.config.PluginConfig.DiscoverQueryables.constraints_url` (``str``): url of the constraint file
            used to build queryables
    """
[docs]
    def __init__(self, provider: str, config: PluginConfig) -> None:
        config.metadata_mapping = {
            **ecmwf_mtd(),
            **{
                "id": "$.id",
                "title": "$.id",
                "storageStatus": OFFLINE_STATUS,
                "downloadLink": "$.null",
                "geometry": ["feature", "$.geometry"],
                "defaultGeometry": "POLYGON((180 -90, 180 90, -180 90, -180 -90, 180 -90))",
            },
            **config.metadata_mapping,
        }
        super().__init__(provider, config)
        # ECMWF providers do not feature any api_endpoint or next_page_query_obj.
        # Searched is faked by EODAG.
        self.config.__dict__.setdefault("api_endpoint", "")
        self.config.pagination.setdefault("next_page_query_obj", "{{}}")
        # defaut conf for accepting custom query params
        self.config.__dict__.setdefault(
            "discover_metadata",
            {
                "auto_discovery": False,
                "search_param": "{metadata}",
                "metadata_pattern": "^[a-zA-Z0-9][a-zA-Z0-9_]*$",
            },
        ) 
    def do_search(self, *args: Any, **kwargs: Any) -> list[dict[str, Any]]:
        """Should perform the actual search request.
        :param args: arguments to be used in the search
        :param kwargs: keyword arguments to be used in the search
        :return: list containing the results from the provider in json format
        """
        # no real search. We fake it all
        return [{}]
    def query(
        self,
        prep: PreparedSearch = PreparedSearch(),
        **kwargs: Any,
    ) -> tuple[list[EOProduct], Optional[int]]:
        """Build ready-to-download SearchResult
        :param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information needed for the search
        :param kwargs: keyword arguments to be used in the search
        :returns: list of products and number of products (optional)
        """
        product_type = prep.product_type
        if not product_type:
            product_type = kwargs.get("productType")
        kwargs = self._preprocess_search_params(kwargs, product_type)
        result, num_items = super().query(prep, **kwargs)
        if prep.count and not num_items:
            num_items = 1
        return result, num_items
    def clear(self) -> None:
        """Clear search context"""
        super().clear()
    def build_query_string(
        self, product_type: str, query_dict: dict[str, Any]
    ) -> tuple[dict[str, Any], str]:
        """Build The query string using the search parameters
        :param product_type: product type id
        :param query_dict: keyword arguments to be used in the query string
        :return: formatted query params and encode query string
        """
        query_dict["_date"] = f"{query_dict.get(START)}/{query_dict.get(END)}"
        # Reorder kwargs to make sure year/month/day/time if set overwrite default datetime.
        priority_keys = [
            START,
            END,
        ]
        ordered_kwargs = {k: query_dict[k] for k in priority_keys if k in query_dict}
        ordered_kwargs.update(query_dict)
        return super().build_query_string(
            product_type=product_type, query_dict=ordered_kwargs
        )
    def _preprocess_search_params(
        self, params: dict[str, Any], product_type: Optional[str]
    ) -> dict[str, Any]:
        """Preprocess search parameters before making a request to the CDS API.
        This method is responsible for checking and updating the provided search parameters
        to ensure that required parameters like 'productType', 'startTimeFromAscendingNode',
        'completionTimeFromAscendingNode', and 'geometry' are properly set. If not specified
        in the input parameters, default values or values from the configuration are used.
        :param params: Search parameters to be preprocessed.
        :param product_type: (optional) product type id
        """
        _dc_qs = params.get("_dc_qs")
        if _dc_qs is not None:
            # if available, update search params using datacube query-string
            _dc_qp = geojson.loads(unquote_plus(unquote_plus(_dc_qs)))
            if "/to/" in _dc_qp.get("date", ""):
                params[START], params[END] = _dc_qp["date"].split("/to/")
            elif "/" in _dc_qp.get("date", ""):
                (params[START], params[END],) = _dc_qp[
                    "date"
                ].split("/")
            elif _dc_qp.get("date"):
                params[START] = params[END] = _dc_qp["date"]
            if "/" in _dc_qp.get("area", ""):
                params["geometry"] = _dc_qp["area"].split("/")
        params = {
            k.removeprefix(ECMWF_PREFIX): v for k, v in params.items() if v is not None
        }
        # dates
        # check if default dates have to be added
        if getattr(self.config, "dates_required", False):
            self._check_date_params(params, product_type)
        # adapt end date if it is midnight
        if END in params:
            end_date_excluded = getattr(self.config, "end_date_excluded", True)
            is_datetime = True
            try:
                end_date = datetime.strptime(params[END], "%Y-%m-%dT%H:%M:%SZ")
                end_date = end_date.replace(tzinfo=tzutc())
            except ValueError:
                try:
                    end_date = datetime.strptime(
                        params[END],
                        "%Y-%m-%dT%H:%M:%S.%fZ",
                    )
                    end_date = end_date.replace(tzinfo=tzutc())
                except ValueError:
                    end_date = isoparse(params[END])
                    is_datetime = False
            start_date = isoparse(params[START])
            if (
                not end_date_excluded
                and is_datetime
                and end_date > start_date
                and end_date
                == end_date.replace(hour=0, minute=0, second=0, microsecond=0)
            ):
                end_date += timedelta(days=-1)
                params[END] = end_date.isoformat()
        # geometry
        if "geometry" in params:
            params["geometry"] = get_geometry_from_various(geometry=params["geometry"])
        return params
    def _check_date_params(
        self, keywords: dict[str, Any], product_type: Optional[str]
    ) -> None:
        """checks if start and end date are present in the keywords and adds them if not"""
        if START and END in keywords:
            return
        product_type_conf = getattr(self.config, "metadata_mapping", {})
        if (
            product_type
            and product_type in self.config.products
            and "metadata_mapping" in self.config.products[product_type]
        ):
            product_type_conf = self.config.products[product_type]["metadata_mapping"]
        # start time given, end time missing
        if START in keywords:
            keywords[END] = (
                keywords[START]
                if END in product_type_conf
                else self.get_product_type_cfg_value(
                    "missionEndDate", today().isoformat()
                )
            )
            return
        if END in product_type_conf:
            mapping = product_type_conf[START]
            if not isinstance(mapping, list):
                mapping = product_type_conf[END]
            if isinstance(mapping, list):
                # if startTime is not given but other time params (e.g. year/month/(day)) are given,
                # no default date is required
                start, end = ecmwf_temporal_to_eodag(keywords)
                if start is None:
                    keywords[START] = self.get_product_type_cfg_value(
                        "missionStartDate", DEFAULT_MISSION_START_DATE
                    )
                    keywords[END] = (
                        keywords[START]
                        if END in product_type_conf
                        else self.get_product_type_cfg_value(
                            "missionEndDate", today().isoformat()
                        )
                    )
                else:
                    keywords[START] = start
                    keywords[END] = end
    def _get_product_type_queryables(
        self, product_type: Optional[str], alias: Optional[str], filters: dict[str, Any]
    ) -> QueryablesDict:
        """Override to set additional_properties to false."""
        default_values: dict[str, Any] = deepcopy(
            getattr(self.config, "products", {}).get(product_type, {})
        )
        default_values.pop("metadata_mapping", None)
        filters["productType"] = product_type
        queryables = self.discover_queryables(**{**default_values, **filters}) or {}
        return QueryablesDict(additional_properties=False, **queryables)
    def discover_queryables(
        self, **kwargs: Any
    ) -> Optional[dict[str, Annotated[Any, FieldInfo]]]:
        """Fetch queryables list from provider using its constraints file
        :param kwargs: additional filters for queryables (`productType` and other search
                       arguments)
        :returns: fetched queryable parameters dict
        """
        product_type = kwargs.pop("productType")
        pt_config = self.get_product_type_def_params(product_type)
        default_values = deepcopy(pt_config)
        default_values.pop("metadata_mapping", None)
        filters = {**default_values, **kwargs}
        if "start" in filters:
            filters[START] = filters.pop("start")
        if "end" in filters:
            filters[END] = filters.pop("end")
        # extract default datetime
        processed_filters = self._preprocess_search_params(
            deepcopy(filters), product_type
        )
        constraints_url = format_metadata(
            getattr(self.config, "discover_queryables", {}).get("constraints_url", ""),
            **filters,
        )
        constraints: list[dict[str, Any]] = self._fetch_data(constraints_url)
        form_url = format_metadata(
            getattr(self.config, "discover_queryables", {}).get("form_url", ""),
            **filters,
        )
        form: list[dict[str, Any]] = self._fetch_data(form_url)
        formated_filters = self.format_as_provider_keyword(
            product_type, processed_filters
        )
        # we re-apply kwargs input to consider override of year, month, day and time.
        for k, v in {**default_values, **kwargs}.items():
            key = k.removeprefix(ECMWF_PREFIX)
            if key not in ALLOWED_KEYWORDS | {
                START,
                END,
                "geom",
                "geometry",
            }:
                raise ValidationError(
                    f"'{key}' is not a queryable parameter for {self.provider}", {key}
                )
            formated_filters[key] = v
        # we use non empty filters as default to integrate user inputs
        # it is needed because pydantic json schema does not represent "value"
        # but only "default"
        non_empty_formated: dict[str, Any] = {
            k: v
            for k, v in formated_filters.items()
            if v and (not isinstance(v, list) or all(v))
        }
        required_keywords: set[str] = set()
        # calculate available values
        if constraints:
            # Apply constraints filtering
            available_values = self.available_values_from_constraints(
                constraints,
                non_empty_formated,
                form_keywords=[f["name"] for f in form],
            )
            # Pre-compute the required keywords (present in all constraint dicts)
            # when form, required keywords are extracted directly from form
            if not form:
                required_keywords = set.intersection(
                    *(map(lambda d: set(d.keys()), constraints))
                )
        else:
            values_url = getattr(self.config, "available_values_url", "")
            if not values_url:
                return self.queryables_from_metadata_mapping(product_type)
            if "{" in values_url:
                values_url = values_url.format(**filters)
            data = self._fetch_data(values_url)
            available_values = data["constraints"]
            required_keywords = data.get("required", [])
        # To check if all keywords are queryable parameters, we check if they are in the
        # available values or the product type config (available values calculated from the
        # constraints might not include all queryables)
        for keyword in filters:
            if (
                keyword
                not in available_values.keys()
                | pt_config.keys()
                | {
                    START,
                    END,
                    "geom",
                    "geometry",
                }
                and keyword not in [f["name"] for f in form]
                and keyword.removeprefix(ECMWF_PREFIX)
                not in set(list(available_values.keys()) + [f["name"] for f in form])
            ):
                raise ValidationError(
                    f"'{keyword}' is not a queryable parameter", {keyword}
                )
        # generate queryables
        if form:
            queryables = self.queryables_by_form(
                form,
                available_values,
                non_empty_formated,
            )
        else:
            queryables = self.queryables_by_values(
                available_values, list(required_keywords), non_empty_formated
            )
        # ecmwf:date is replaced by start and end.
        # start and end filters are supported whenever combinations of "year", "month", "day" filters exist
        if (
            queryables.pop(f"{ECMWF_PREFIX}date", None)
            or f"{ECMWF_PREFIX}year" in queryables
            or f"{ECMWF_PREFIX}hyear" in queryables
        ):
            queryables.update(
                {
                    "start": Queryables.get_with_default(
                        "start", processed_filters.get(START)
                    ),
                    "end": Queryables.get_with_default(
                        "end",
                        processed_filters.get(END),
                    ),
                }
            )
        # area is geom in EODAG.
        if queryables.pop("area", None):
            queryables["geom"] = Annotated[
                Union[str, dict[str, float], BaseGeometry],
                Field(
                    None,
                    description="Read EODAG documentation for all supported geometry format.",
                ),
            ]
        return queryables
    def available_values_from_constraints(
        self,
        constraints: list[dict[str, Any]],
        input_keywords: dict[str, Any],
        form_keywords: list[str],
    ) -> dict[str, list[str]]:
        """
        Filter constraints using input_keywords. Return list of available queryables.
        All constraint entries must have the same parameters.
        :param constraints: list of constraints received from the provider
        :param input_keywords: dict of input parameters given by the user
        :param form_keywords: list of keyword names from the provider form endpoint
        :return: dict with available values for each parameter
        """
        # get ordered constraint keywords
        constraints_keywords = list(
            OrderedDict.fromkeys(k for c in constraints for k in c.keys())
        )
        # prepare ordered input keywords formatted as provider's keywords
        # required to filter with constraints
        ordered_keywords = (
            [kw for kw in form_keywords if kw in constraints_keywords]
            if form_keywords
            else constraints_keywords
        )
        # filter constraint entries matching input keyword values
        filtered_constraints: list[dict[str, Any]]
        parsed_keywords: list[str] = []
        for keyword in ordered_keywords:
            values = input_keywords.get(keyword)
            if values is None:
                parsed_keywords.append(keyword)
                continue
            # we only compare list of strings.
            if isinstance(values, dict):
                raise ValidationError(
                    f"Parameter value as object is not supported: {keyword}={values}",
                    {keyword},
                )
            # We convert every single value to a list of string
            filter_v = values if isinstance(values, (list, tuple)) else [values]
            # We strip values of superfluous quotes (added by mapping converter to_geojson).
            # ECMWF accept values with /to/. We need to split it to an array
            # ECMWF accept values in format val1/val2. We need to split it to an array
            sep = re.compile(r"/to/|/")
            filter_v = [i for v in filter_v for i in sep.split(str(v))]
            # special handling for time 0000 converted to 0 by pre-formating with metadata_mapping
            if keyword.split(":")[-1] == "time":
                filter_v = ["0000" if str(v) == "0" else v for v in filter_v]
            # Collect missing values to report errors
            missing_values = set(filter_v)
            # Filter constraints and check for missing values
            filtered_constraints = []
            for entry in constraints:
                # Filter based on the presence of any value in filter_v
                entry_values = entry.get(keyword, [])
                # date constraint may be intervals. We identify intervals with a "/" in the value
                # we assume that if the first value is an interval, all values are intervals
                present_values = []
                if keyword == "date" and "/" in entry[keyword][0]:
                    input_range = values
                    if isinstance(values, list):
                        input_range = values[0]
                    if any(is_range_in_range(x, input_range) for x in entry[keyword]):
                        present_values = filter_v
                else:
                    present_values = [
                        value for value in filter_v if value in entry_values
                    ]
                # Remove present values from the missing_values set
                missing_values -= set(present_values)
                if present_values:
                    filtered_constraints.append(entry)
            # raise an error as no constraint entry matched the input keywords
            # raise an error if one value from input is not allowed
            if not filtered_constraints or missing_values:
                allowed_values = list(
                    {value for c in constraints for value in c.get(keyword, [])}
                )
                # restore ecmwf: prefix before raising error
                keyword = ECMWF_PREFIX + keyword
                all_keywords_str = ""
                if len(parsed_keywords) > 1:
                    keywords = [
                        f"{ECMWF_PREFIX + k}={pk}"
                        for k in parsed_keywords
                        if (pk := input_keywords.get(k))
                    ]
                    all_keywords_str = f" with {', '.join(keywords)}"
                raise ValidationError(
                    f"{keyword}={values} is not available"
                    f"{all_keywords_str}."
                    f" Allowed values are {', '.join(allowed_values)}.",
                    set(
                        [keyword] + [k for k in parsed_keywords if k in input_keywords]
                    ),
                )
            parsed_keywords.append(keyword)
            constraints = filtered_constraints
        available_values: dict[str, Any] = {k: set() for k in ordered_keywords}
        # we aggregate the constraint entries left
        for entry in constraints:
            for key, value in entry.items():
                available_values[key].update(value)
        return {k: list(v) for k, v in available_values.items()}
    def queryables_by_form(
        self,
        form: list[dict[str, Any]],
        available_values: dict[str, list[str]],
        defaults: dict[str, Any],
    ) -> dict[str, Annotated[Any, FieldInfo]]:
        """
        Generate Annotated field definitions from form entries and available values
        Used by Copernicus services like cop_cds, cop_ads, cop_ewds.
        :param form: data fetched from the form endpoint of the provider
        :param available_values: available values for each parameter
        :param defaults: default values for the parameters
        :return: dict of annotated queryables
        """
        queryables: dict[str, Annotated[Any, FieldInfo]] = {}
        required_list: list[str] = []
        for element in form:
            name: str = element["name"]
            # those are not parameter elements.
            if name in ("area_group", "global", "warning", "licences"):
                continue
            if "type" not in element or element["type"] == "FreeEditionWidget":
                # FreeEditionWidget used to select the whole available region
                # and to provide comments for the dataset
                continue
            # ordering done by id -> set id to high value if not present -> element will be last
            if "id" not in element:
                element["id"] = 100
            prop = {"title": element.get("label", name)}
            details = element.get("details", {})
            # add values from form if keyword was not in constraints
            values = (
                available_values[name]
                if name in available_values
                else details.get("values")
            )
            # updates the properties with the values given based on the information from the element
            _update_properties_from_element(prop, element, values)
            default = defaults.get(name)
            if details:
                fields = details.get("fields")
                if fields and (comment := fields[0].get("comment")):
                    prop["description"] = comment
            if name == "area" and isinstance(default, dict):
                default = list(default.values())
            # sometimes form returns default as array instead of string
            if default and prop.get("type") == "string" and isinstance(default, list):
                default = ",".join(default)
            is_required = bool(element.get("required")) and bool(
                available_values.get(name)
            )
            if is_required:
                required_list.append(name)
            queryables[ecmwf_format(name)] = Annotated[
                get_args(
                    json_field_definition_to_python(
                        prop,
                        default_value=default,
                        required=is_required,
                    )
                )
            ]
        return queryables
    def queryables_by_values(
        self,
        available_values: dict[str, list[str]],
        required_keywords: list[str],
        defaults: dict[str, Any],
    ) -> dict[str, Annotated[Any, FieldInfo]]:
        """
        Generate Annotated field definitions from available values.
        Used by ECMWF data providers like dedt_lumi.
        :param available_values: available values for each parameter
        :param required_keywords: list of required parameters
        :param defaults: default values for the parameters
        :return: dict of annotated queryables
        """
        # Rename keywords from form with metadata mapping.
        # Needed to map constraints like "xxxx" to eodag parameter "ecmwf:xxxx"
        required = [ecmwf_format(k) for k in required_keywords]  # noqa: F841
        queryables: dict[str, Annotated[Any, FieldInfo]] = {}
        for name, values in available_values.items():
            # Rename keywords from form with metadata mapping.
            # Needed to map constraints like "xxxx" to eodag parameter "ecmwf:xxxx"
            key = ecmwf_format(name)
            queryables[key] = Annotated[
                get_args(
                    json_field_definition_to_python(
                        {"type": "string", "title": name, "enum": values},
                        default_value=defaults.get(name),
                        required=bool(key in required),
                    )
                )
            ]
        return queryables
    def format_as_provider_keyword(
        self, product_type: str, properties: dict[str, Any]
    ) -> dict[str, Any]:
        """Return provider equivalent keyword names from EODAG keywords.
        :param product_type: product type id
        :param properties: dict of properties to be formatted
        :return: dict of formatted properties
        """
        properties["productType"] = product_type
        # provider product type specific conf
        product_type_def_params = self.get_product_type_def_params(
            product_type, format_variables=properties
        )
        # Add to the query, the queryable parameters set in the provider product type definition
        properties.update(
            {
                k: v
                for k, v in product_type_def_params.items()
                if k not in properties.keys()
                and k in self.config.metadata_mapping.keys()
                and isinstance(self.config.metadata_mapping[k], list)
            }
        )
        qp, _ = self.build_query_string(product_type, properties)
        return qp
    @instance_cached_method()
    def _fetch_data(self, url: str) -> Any:
        """
        fetches from a provider elements like constraints or forms.
        :param url: url from which the constraints can be fetched
        :returns: json file content fetched from the provider
        """
        if not url:
            return []
        auth = (
            self.auth
            if hasattr(self, "auth") and isinstance(self.auth, AuthBase)
            else None
        )
        timeout = getattr(self.config, "timeout", DEFAULT_SEARCH_TIMEOUT)
        return fetch_json(url, auth=auth, timeout=timeout)
    def normalize_results(
        self, results: RawSearchResult, **kwargs: Any
    ) -> list[EOProduct]:
        """Build :class:`~eodag.api.product._product.EOProduct` from provider result
        :param results: Raw provider result as single dict in list
        :param kwargs: Search arguments
        :returns: list of single :class:`~eodag.api.product._product.EOProduct`
        """
        product_type = kwargs.get("productType")
        result = results[0]
        # datacube query string got from previous search
        _dc_qs = kwargs.pop("_dc_qs", None)
        if _dc_qs is not None:
            qs = unquote_plus(unquote_plus(_dc_qs))
            sorted_unpaginated_qp = geojson.loads(qs)
        else:
            sorted_unpaginated_qp = dict_items_recursive_sort(results.query_params)
        # remove unwanted query params
        for param in getattr(self.config, "remove_from_query", []):
            sorted_unpaginated_qp.pop(param, None)
        if result:
            properties = result
            properties.update(result.pop("request_params", None) or {})
            properties = {k: v for k, v in properties.items() if not k.startswith("__")}
            properties["geometry"] = properties.get("area") or DEFAULT_GEOMETRY
            start, end = ecmwf_temporal_to_eodag(properties)
            properties["startTimeFromAscendingNode"] = start
            properties["completionTimeFromAscendingNode"] = end
        else:
            # use all available query_params to parse properties
            result_data: dict[str, Any] = {
                **results.product_type_def_params,
                **sorted_unpaginated_qp,
                **{"qs": sorted_unpaginated_qp},
            }
            # update result with product_type_def_params and search args if not None (and not auth)
            kwargs.pop("auth", None)
            result_data.update(results.product_type_def_params)
            result_data = {
                **result_data,
                **{k: v for k, v in kwargs.items() if v is not None},
            }
            properties = properties_from_json(
                result_data,
                self.config.metadata_mapping,
                discovery_config=getattr(self.config, "discover_metadata", {}),
            )
            query_hash = hashlib.sha1(str(result_data).encode("UTF-8")).hexdigest()
            properties["title"] = properties["id"] = (
                (product_type or kwargs.get("dataset", self.provider)).upper()
                + "_ORDERABLE_"
                + query_hash
            )
            # use product_type_config as default properties
            product_type_config = getattr(self.config, "product_type_config", {})
            properties = dict(product_type_config, **properties)
        qs = geojson.dumps(sorted_unpaginated_qp)
        # used by server mode to generate downloadlink href
        # TODO: to remove once the legacy server is removed
        properties["_dc_qs"] = quote_plus(qs)
        product = EOProduct(
            provider=self.provider,
            properties={ecmwf_format(k): v for k, v in properties.items()},
            **kwargs,
        )
        # backup original register_downloader to register_downloader_only
        product.register_downloader_only = product.register_downloader
        # patched register_downloader that will also update properties
        product.register_downloader = MethodType(patched_register_downloader, product)
        return [product]
    def count_hits(
        self, count_url: Optional[str] = None, result_type: Optional[str] = None
    ) -> int:
        """Count method that will always return 1.
        :param count_url: not used, only here because this method overwrites count_hits from the parent class
        :param result_type: not used, only here because this method overwrites count_hits from the parent class
        :return: always 1
        """
        return 1 
def _check_id(product: EOProduct) -> EOProduct:
    """Check if the id is the one of an existing job.
    If the job exists, poll it, otherwise, raise an error.
    :param product: The product to check the id for
    :raises: :class:`~eodag.utils.exceptions.ValidationError`
    """
    if not (product_id := product.search_kwargs.get("id")):
        return product
    if "ORDERABLE" in product_id:
        return product
    on_response_mm = getattr(product.downloader.config, "order_on_response", {}).get(
        "metadata_mapping", {}
    )
    if not on_response_mm:
        return product
    logger.debug(f"Update product properties using given orderId {product_id}")
    on_response_mm_jsonpath = mtd_cfg_as_conversion_and_querypath(
        on_response_mm,
    )
    properties_update = properties_from_json(
        {}, {**on_response_mm_jsonpath, **{"orderId": (None, product_id)}}
    )
    product.properties.update(
        {k: v for k, v in properties_update.items() if v != NOT_AVAILABLE}
    )
    auth = product.downloader_auth.authenticate() if product.downloader_auth else None
    # try to poll the job corresponding to the given id
    try:
        product.downloader._order_status(product=product, auth=auth)  # type: ignore
    # when a NotAvailableError is catched, it means the product is not ready and still needs to be polled
    except NotAvailableError:
        product.properties["storageStatus"] = STAGING_STATUS
    except Exception as e:
        if (
            isinstance(e, DownloadError) or isinstance(e, ValidationError)
        ) and "order status could not be checked" in e.args[0]:
            raise ValidationError(
                f"Requested data is not available on {product.provider} ({product_id})."
            ) from e
        raise ValidationError(e.args[0]) from e
    # update product id
    product.properties["id"] = product_id
    # update product type if needed
    if product.product_type is None:
        product.product_type = product.properties.get("ecmwf:dataset")
    # update product title
    product.properties["title"] = (
        (product.product_type or product.provider).upper() + "_" + product_id
    )
    # use NOT_AVAILABLE as fallback product_type to avoid using guess_product_type
    if product.product_type is None:
        product.product_type = NOT_AVAILABLE
    return product
def patched_register_downloader(self, downloader, authenticator):
    """Register product donwloader and update properties if searched by id.
    :param self: product to which information should be added
    :param downloader: The download method that it can use
                    :class:`~eodag.plugins.download.base.Download` or
                    :class:`~eodag.plugins.api.base.Api`
    :param authenticator: The authentication method needed to perform the download
                        :class:`~eodag.plugins.authentication.base.Authentication`
    """
    # register downloader
    self.register_downloader_only(downloader, authenticator)
    # and also update properties
    _check_id(self)
[docs]
class MeteoblueSearch(ECMWFSearch):
    """MeteoblueSearch search plugin.
    This plugin, which inherits from :class:`~eodag.plugins.search.build_search_result.ECMWFSearch`,
    performs a POST request and uses its result to build a single :class:`~eodag.api.search_result.SearchResult`
    object.
    The available configuration parameters are inherited from parent classes, with some a particularity
    for pagination for this plugin.
    :param provider: An eodag providers configuration dictionary
    :param config: Search plugin configuration:
        * :attr:`~eodag.config.PluginConfig.pagination` (:class:`~eodag.config.PluginConfig.Pagination`)
          (**mandatory**): The configuration of how the pagination is done on the provider. For
          this plugin it has the node:
          * :attr:`~eodag.config.PluginConfig.Pagination.next_page_query_obj` (``str``): The
            additional parameters needed to perform search. These parameters won't be included in
            the result. This must be a json dict formatted like ``{{"foo":"bar"}}`` because it
            will be passed to a :meth:`str.format` method before being loaded as json.
    """
    def collect_search_urls(
        self,
        prep: PreparedSearch = PreparedSearch(),
        **kwargs: Any,
    ) -> tuple[list[str], int]:
        """Wraps PostJsonSearch.collect_search_urls to force product count to 1
        :param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information for the search
        :param kwargs: keyword arguments used in the search
        :return: list of search url and number of results
        """
        urls, _ = super().collect_search_urls(prep, **kwargs)
        return urls, 1
    def do_search(
        self, prep: PreparedSearch = PreparedSearch(items_per_page=None), **kwargs: Any
    ) -> list[dict[str, Any]]:
        """Perform the actual search request, and return result in a single element.
        :param prep: :class:`~eodag.plugins.search.PreparedSearch` object containing information for the search
        :param kwargs: keyword arguments to be used in the search
        :return: list containing the results from the provider in json format
        """
        prep.url = prep.search_urls[0]
        prep.info_message = f"Sending search request: {prep.url}"
        prep.exception_message = (
            f"Skipping error while searching for {self.provider}"
            f" {self.__class__.__name__} instance"
        )
        response = self._request(prep)
        return [response.json()]
    def build_query_string(
        self, product_type: str, query_dict: dict[str, Any]
    ) -> tuple[dict[str, Any], str]:
        """Build The query string using the search parameters
        :param product_type: product type id
        :param query_dict: keyword arguments to be used in the query string
        :return: formatted query params and encode query string
        """
        return QueryStringSearch.build_query_string(self, product_type, query_dict)
    def normalize_results(self, results, **kwargs):
        """Build :class:`~eodag.api.product._product.EOProduct` from provider result
        :param results: Raw provider result as single dict in list
        :param kwargs: Search arguments
        :returns: list of single :class:`~eodag.api.product._product.EOProduct`
        """
        product_type = kwargs.get("productType")
        result = results[0]
        # datacube query string got from previous search
        _dc_qs = kwargs.pop("_dc_qs", None)
        if _dc_qs is not None:
            qs = unquote_plus(unquote_plus(_dc_qs))
            sorted_unpaginated_query_params = geojson.loads(qs)
        else:
            next_page_query_obj = orjson.loads(
                self.config.pagination["next_page_query_obj"].format()
            )
            unpaginated_query_params = {
                k: v
                for k, v in results.query_params.items()
                if (k, v) not in next_page_query_obj.items()
            }
            # query hash, will be used to build a product id
            sorted_unpaginated_query_params = dict_items_recursive_sort(
                unpaginated_query_params
            )
        # use all available query_params to parse properties
        result = dict(
            result,
            **sorted_unpaginated_query_params,
            qs=sorted_unpaginated_query_params,
        )
        qs = geojson.dumps(sorted_unpaginated_query_params)
        query_hash = hashlib.sha1(str(qs).encode("UTF-8")).hexdigest()
        # update result with product_type_def_params and search args if not None (and not auth)
        kwargs.pop("auth", None)
        result.update(results.product_type_def_params)
        result = dict(result, **{k: v for k, v in kwargs.items() if v is not None})
        # parse properties
        parsed_properties = properties_from_json(
            result,
            self.config.metadata_mapping,
            discovery_config=getattr(self.config, "discover_metadata", {}),
        )
        properties = {
            # use product_type_config as default properties
            **getattr(self.config, "product_type_config", {}),
            **{ecmwf_format(k): v for k, v in parsed_properties.items()},
        }
        def slugify(date_str: str) -> str:
            return date_str.split("T")[0].replace("-", "")
        # build product id
        product_id = (product_type or self.provider).upper()
        start = properties.get(START, NOT_AVAILABLE)
        end = properties.get(END, NOT_AVAILABLE)
        if start != NOT_AVAILABLE:
            product_id += f"_{slugify(start)}"
            if end != NOT_AVAILABLE:
                product_id += f"_{slugify(end)}"
        product_id += f"_{query_hash}"
        properties["id"] = properties["title"] = product_id
        # used by server mode to generate downloadlink href
        properties["_dc_qs"] = quote_plus(qs)
        product = EOProduct(
            provider=self.provider,
            productType=product_type,
            properties=properties,
        )
        # use product_type_config as default properties
        product_type_config = getattr(self.config, "product_type_config", {})
        product.properties = dict(product_type_config, **product.properties)
        return [
            product,
        ] 
[docs]
class WekeoECMWFSearch(ECMWFSearch):
    """
    WekeoECMWFSearch search plugin.
    This plugin, which inherits from :class:`~eodag.plugins.search.build_search_result.ECMWFSearch`,
    performs a POST request and uses its result to build a single :class:`~eodag.api.search_result.SearchResult`
    object. In contrast to ECMWFSearch or MeteoblueSearch, the products are only build with information
    returned by the provider.
    The available configuration parameters are inherited from parent classes, with some a particularity
    for pagination for this plugin.
    :param provider: An eodag providers configuration dictionary
    :param config: Search plugin configuration:
        * :attr:`~eodag.config.PluginConfig.pagination` (:class:`~eodag.config.PluginConfig.Pagination`)
          (**mandatory**): The configuration of how the pagination is done on the provider. For
          this plugin it has the node:
          * :attr:`~eodag.config.PluginConfig.Pagination.next_page_query_obj` (``str``): The
            additional parameters needed to perform search. These parameters won't be included in
            the result. This must be a json dict formatted like ``{{"foo":"bar"}}`` because it
            will be passed to a :meth:`str.format` method before being loaded as json.
    """
    def normalize_results(
        self, results: RawSearchResult, **kwargs: Any
    ) -> list[EOProduct]:
        """Build :class:`~eodag.api.product._product.EOProduct` from provider result
        :param results: Raw provider result as single dict in list
        :param kwargs: Search arguments
        :returns: list of single :class:`~eodag.api.product._product.EOProduct`
        """
        if kwargs.get("id") and "ORDERABLE" not in kwargs["id"]:
            # id is order id (only letters and numbers) -> use parent normalize results
            return super().normalize_results(results, **kwargs)
        # formating of orderLink requires access to the productType value.
        results.data = [
            {**result, **results.product_type_def_params} for result in results
        ]
        normalized = QueryStringSearch.normalize_results(self, results, **kwargs)
        if not normalized:
            return normalized
        # remove unwanted query params
        excluded_query_params = getattr(self.config, "remove_from_query", [])
        filtered_query_params = {
            k: v
            for k, v in results.query_params.items()
            if k not in excluded_query_params
        }
        for product in normalized:
            properties = {**product.properties, **results.query_params}
            properties["_dc_qs"] = quote_plus(orjson.dumps(filtered_query_params))
            product.properties = {ecmwf_format(k): v for k, v in properties.items()}
            # update product and title the same way as in parent class
            splitted_id = product.properties.get("title", "").split("-")
            dataset = "_".join(splitted_id[:-1])
            query_hash = splitted_id[-1]
            product.properties["title"] = product.properties["id"] = (
                (product.product_type or dataset or self.provider).upper()
                + "_ORDERABLE_"
                + query_hash
            )
        return normalized
    def do_search(self, *args: Any, **kwargs: Any) -> list[dict[str, Any]]:
        """Should perform the actual search request.
        :param args: arguments to be used in the search
        :param kwargs: keyword arguments to be used in the search
        :return: list containing the results from the provider in json format
        """
        if "id" in kwargs and "ORDERABLE" not in kwargs["id"]:
            # id is order id (only letters and numbers) -> use parent normalize results.
            # No real search. We fake it all, then check order status using given id
            return [{}]
        else:
            return QueryStringSearch.do_search(self, *args, **kwargs)