Source code for eodag.api.core

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import datetime
import logging
import os
import re
import shutil
import tempfile
from importlib.metadata import version
from importlib.resources import files as res_files
from operator import itemgetter
from typing import TYPE_CHECKING, Any, Iterator, Optional, Union

import geojson
import yaml.parser

from eodag.api.product.metadata_mapping import (
    NOT_AVAILABLE,
    mtd_cfg_as_conversion_and_querypath,
)
from eodag.api.search_result import SearchResult
from eodag.config import (
    PLUGINS_TOPICS_KEYS,
    PluginConfig,
    SimpleYamlProxyConfig,
    credentials_in_auth,
    get_ext_product_types_conf,
    load_default_config,
    load_stac_provider_config,
    load_yml_config,
    override_config_from_env,
    override_config_from_file,
    override_config_from_mapping,
    provider_config_init,
    share_credentials,
)
from eodag.plugins.manager import PluginManager
from eodag.plugins.search import PreparedSearch
from eodag.plugins.search.build_search_result import MeteoblueSearch
from eodag.plugins.search.qssearch import PostJsonSearch
from eodag.types import model_fields_to_annotated
from eodag.types.queryables import CommonQueryables, QueryablesDict
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    DEFAULT_ITEMS_PER_PAGE,
    DEFAULT_MAX_ITEMS_PER_PAGE,
    DEFAULT_PAGE,
    GENERIC_PRODUCT_TYPE,
    GENERIC_STAC_PROVIDER,
    get_geometry_from_various,
    makedirs,
    sort_dict,
    string_to_jsonpath,
    uri_to_path,
)
from eodag.utils.dates import rfc3339_str_to_datetime
from eodag.utils.env import is_env_var_true
from eodag.utils.exceptions import (
    AuthenticationError,
    NoMatchingProductType,
    PluginImplementationError,
    RequestError,
    UnsupportedProductType,
    UnsupportedProvider,
)
from eodag.utils.free_text_search import compile_free_text_query
from eodag.utils.stac_reader import fetch_stac_items

if TYPE_CHECKING:
    from shapely.geometry.base import BaseGeometry

    from eodag.api.product import EOProduct
    from eodag.plugins.apis.base import Api
    from eodag.plugins.crunch.base import Crunch
    from eodag.plugins.search.base import Search
    from eodag.types import ProviderSortables
    from eodag.types.download_args import DownloadConf
    from eodag.utils import DownloadedCallback, ProgressCallback, Unpack

logger = logging.getLogger("eodag.core")


[docs] class EODataAccessGateway: """An API for downloading a wide variety of geospatial products originating from different types of providers. :param user_conf_file_path: (optional) Path to the user configuration file :param locations_conf_path: (optional) Path to the locations configuration file """ def __init__( self, user_conf_file_path: Optional[str] = None, locations_conf_path: Optional[str] = None, ) -> None: product_types_config_path = os.getenv("EODAG_PRODUCT_TYPES_CFG_FILE") or str( res_files("eodag") / "resources" / "product_types.yml" ) self.product_types_config = SimpleYamlProxyConfig(product_types_config_path) self.providers_config = load_default_config() env_var_cfg_dir = "EODAG_CFG_DIR" self.conf_dir = os.getenv( env_var_cfg_dir, default=os.path.join(os.path.expanduser("~"), ".config", "eodag"), ) try: makedirs(self.conf_dir) except OSError as e: logger.debug(e) tmp_conf_dir = os.path.join(tempfile.gettempdir(), ".config", "eodag") logger.warning( f"Cannot create configuration directory {self.conf_dir}. " + f"Falling back to temporary directory {tmp_conf_dir}." ) if os.getenv(env_var_cfg_dir) is None: logger.warning( "You can set the path of the configuration directory " + f"with the environment variable {env_var_cfg_dir}" ) self.conf_dir = tmp_conf_dir makedirs(self.conf_dir) self._plugins_manager = PluginManager(self.providers_config) # use updated providers_config self.providers_config = self._plugins_manager.providers_config # First level override: From a user configuration file if user_conf_file_path is None: env_var_name = "EODAG_CFG_FILE" standard_configuration_path = os.path.join(self.conf_dir, "eodag.yml") user_conf_file_path = os.getenv(env_var_name) if user_conf_file_path is None: user_conf_file_path = standard_configuration_path if not os.path.isfile(standard_configuration_path): shutil.copy( str( res_files("eodag") / "resources" / "user_conf_template.yml" ), standard_configuration_path, ) override_config_from_file(self.providers_config, user_conf_file_path) # Second level override: From environment variables override_config_from_env(self.providers_config) # share credentials between updated plugins confs share_credentials(self.providers_config) # init updated providers conf strict_mode = is_env_var_true("EODAG_STRICT_PRODUCT_TYPES") available_product_types = set(self.product_types_config.source.keys()) for provider in self.providers_config.keys(): provider_config_init( self.providers_config[provider], load_stac_provider_config(), ) self._sync_provider_product_types( provider, available_product_types, strict_mode ) # init product types configuration self._product_types_config_init() # re-build _plugins_manager using up-to-date providers_config self._plugins_manager.rebuild(self.providers_config) # store pruned providers configs self._pruned_providers_config: dict[str, Any] = {} # filter out providers needing auth that have no credentials set self._prune_providers_list() # Sort providers taking into account of possible new priority orders self._plugins_manager.sort_providers() # set locations configuration if locations_conf_path is None: locations_conf_path = os.getenv("EODAG_LOCS_CFG_FILE") if locations_conf_path is None: locations_conf_path = os.path.join(self.conf_dir, "locations.yml") if not os.path.isfile(locations_conf_path): # copy locations conf file and replace path example locations_conf_template = str( res_files("eodag") / "resources" / "locations_conf_template.yml" ) with ( open(locations_conf_template) as infile, open(locations_conf_path, "w") as outfile, ): # The template contains paths in the form of: # /path/to/locations/file.shp path_template = "/path/to/locations/" for line in infile: line = line.replace( path_template, os.path.join(self.conf_dir, "shp") + os.path.sep, ) outfile.write(line) # copy sample shapefile dir shutil.copytree( str(res_files("eodag") / "resources" / "shp"), os.path.join(self.conf_dir, "shp"), ) self.set_locations_conf(locations_conf_path) def _product_types_config_init(self) -> None: """Initialize product types configuration.""" for pt_id, pd_dict in self.product_types_config.source.items(): self.product_types_config.source[pt_id].setdefault("_id", pt_id) def _sync_provider_product_types( self, provider: str, available_product_types: set[str], strict_mode: bool, ) -> None: """ Synchronize product types for a provider based on strict or permissive mode. In strict mode, removes product types not in available_product_types. In permissive mode, adds empty product type configs for missing types. :param provider: The provider name whose product types should be synchronized. :param available_product_types: The set of available product type IDs. :param strict_mode: If True, remove unknown product types; if False, add empty configs for them. :returns: None """ provider_products = self.providers_config[provider].products products_to_remove: list[str] = [] products_to_add: list[str] = [] for product_id in provider_products: if product_id == GENERIC_PRODUCT_TYPE: continue if product_id not in available_product_types: if strict_mode: products_to_remove.append(product_id) continue empty_product = { "title": product_id, "abstract": NOT_AVAILABLE, } self.product_types_config.source[ product_id ] = empty_product # will update available_product_types products_to_add.append(product_id) if products_to_add: logger.debug( "Product types permissive mode, %s added (provider %s)", ", ".join(products_to_add), provider, ) if products_to_remove: logger.debug( "Product types strict mode, ignoring %s (provider %s)", ", ".join(products_to_remove), provider, ) for id in products_to_remove: del self.providers_config[provider].products[id] def get_version(self) -> str: """Get eodag package version""" return version("eodag")
[docs] def set_preferred_provider(self, provider: str) -> None: """Set max priority for the given provider. :param provider: The name of the provider that should be considered as the preferred provider to be used for this instance """ if provider not in self.available_providers(): raise UnsupportedProvider( f"This provider is not recognised by eodag: {provider}" ) preferred_provider, max_priority = self.get_preferred_provider() if preferred_provider != provider: new_priority = max_priority + 1 self._plugins_manager.set_priority(provider, new_priority)
[docs] def get_preferred_provider(self) -> tuple[str, int]: """Get the provider currently set as the preferred one for searching products, along with its priority. :returns: The provider with the maximum priority and its priority """ providers_with_priority = [ (provider, conf.priority) for provider, conf in self.providers_config.items() ] preferred, priority = max(providers_with_priority, key=itemgetter(1)) return preferred, priority
[docs] def update_providers_config( self, yaml_conf: Optional[str] = None, dict_conf: Optional[dict[str, Any]] = None, ) -> None: """Update providers configuration with given input. Can be used to add a provider to existing configuration or update an existing one. :param yaml_conf: YAML formated provider configuration :param dict_conf: provider configuration as dictionary in place of ``yaml_conf`` """ if dict_conf is not None: conf_update = dict_conf elif yaml_conf is not None: conf_update = yaml.safe_load(yaml_conf) else: return None # restore the pruned configuration for provider in list(self._pruned_providers_config.keys()): if provider in conf_update: logger.info( "%s: provider restored from the pruned configurations", provider, ) self.providers_config[provider] = self._pruned_providers_config.pop( provider ) override_config_from_mapping(self.providers_config, conf_update) # share credentials between updated plugins confs share_credentials(self.providers_config) for provider in conf_update.keys(): provider_config_init( self.providers_config[provider], load_stac_provider_config(), ) setattr(self.providers_config[provider], "product_types_fetched", False) # re-create _plugins_manager using up-to-date providers_config self._plugins_manager.build_product_type_to_provider_config_map()
[docs] def add_provider( self, name: str, url: Optional[str] = None, priority: Optional[int] = None, search: dict[str, Any] = {"type": "StacSearch"}, products: dict[str, Any] = { GENERIC_PRODUCT_TYPE: {"productType": "{productType}"} }, download: dict[str, Any] = {"type": "HTTPDownload", "auth_error_code": 401}, **kwargs: dict[str, Any], ): """Adds a new provider. ``search``, ``products`` & ``download`` already have default values that will be updated (not replaced), with user provided ones: * ``search`` : ``{"type": "StacSearch"}`` * ``products`` : ``{"GENERIC_PRODUCT_TYPE": {"productType": "{productType}"}}`` * ``download`` : ``{"type": "HTTPDownload", "auth_error_code": 401}`` :param name: Name of provider :param url: Provider url, also used as ``search["api_endpoint"]`` if not defined :param priority: Provider priority. If None, provider will be set as preferred (highest priority) :param search: Search :class:`~eodag.config.PluginConfig` mapping :param products: Provider product types mapping :param download: Download :class:`~eodag.config.PluginConfig` mapping :param kwargs: Additional :class:`~eodag.config.ProviderConfig` mapping """ conf_dict: dict[str, Any] = { name: { "url": url, "search": {"type": "StacSearch", **search}, "products": { GENERIC_PRODUCT_TYPE: {"productType": "{productType}"}, **products, }, "download": { "type": "HTTPDownload", "auth_error_code": 401, **download, }, **kwargs, } } if priority is not None: conf_dict[name]["priority"] = priority # if provided, use url as default search api_endpoint if ( url and conf_dict[name].get("search", {}) and not conf_dict[name]["search"].get("api_endpoint") ): conf_dict[name]["search"]["api_endpoint"] = url # api plugin usage: remove unneeded search/download/auth plugin conf if conf_dict[name].get("api"): for k in PLUGINS_TOPICS_KEYS: if k != "api": conf_dict[name].pop(k, None) self.update_providers_config(dict_conf=conf_dict) if priority is None: self.set_preferred_provider(name)
def _prune_providers_list(self) -> None: """Removes from config providers needing auth that have no credentials set.""" update_needed = False for provider in list(self.providers_config.keys()): conf = self.providers_config[provider] # remove providers using skipped plugins if [ v for v in conf.__dict__.values() if isinstance(v, PluginConfig) and getattr(v, "type", None) in self._plugins_manager.skipped_plugins ]: self.providers_config.pop(provider) logger.debug( f"{provider}: provider needing unavailable plugin has been removed" ) continue # check authentication if hasattr(conf, "api") and getattr(conf.api, "need_auth", False): credentials_exist = credentials_in_auth(conf.api) if not credentials_exist: # credentials needed but not found self._pruned_providers_config[provider] = self.providers_config.pop( provider ) update_needed = True logger.info( "%s: provider needing auth for search has been pruned because no credentials could be found", provider, ) elif hasattr(conf, "search") and getattr(conf.search, "need_auth", False): if not hasattr(conf, "auth") and not hasattr(conf, "search_auth"): # credentials needed but no auth plugin was found self._pruned_providers_config[provider] = self.providers_config.pop( provider ) update_needed = True logger.info( "%s: provider needing auth for search has been pruned because no auth plugin could be found", provider, ) continue credentials_exist = ( hasattr(conf, "search_auth") and credentials_in_auth(conf.search_auth) ) or ( not hasattr(conf, "search_auth") and hasattr(conf, "auth") and credentials_in_auth(conf.auth) ) if not credentials_exist: # credentials needed but not found self._pruned_providers_config[provider] = self.providers_config.pop( provider ) update_needed = True logger.info( "%s: provider needing auth for search has been pruned because no credentials could be found", provider, ) elif not hasattr(conf, "api") and not hasattr(conf, "search"): # provider should have at least an api or search plugin self._pruned_providers_config[provider] = self.providers_config.pop( provider ) logger.info( "%s: provider has been pruned because no api or search plugin could be found", provider, ) update_needed = True if update_needed: # rebuild _plugins_manager with updated providers list self._plugins_manager.rebuild(self.providers_config) def set_locations_conf(self, locations_conf_path: str) -> None: """Set locations configuration. This configuration (YML format) will contain a shapefile list associated to a name and attribute parameters needed to identify the needed geometry. You can also configure parent attributes, which can be used for creating a catalogs path when using eodag as a REST server. Example of locations configuration file content: .. code-block:: yaml shapefiles: - name: country path: /path/to/countries_list.shp attr: ISO3 - name: department path: /path/to/FR_departments.shp attr: code_insee parent: name: country attr: FRA :param locations_conf_path: Path to the locations configuration file """ if os.path.isfile(locations_conf_path): locations_config = load_yml_config(locations_conf_path) main_key = next(iter(locations_config)) main_locations_config = locations_config[main_key] logger.info("Locations configuration loaded from %s" % locations_conf_path) self.locations_config: list[dict[str, Any]] = main_locations_config else: logger.info( "Could not load locations configuration from %s" % locations_conf_path ) self.locations_config = []
[docs] def list_product_types( self, provider: Optional[str] = None, fetch_providers: bool = True ) -> list[dict[str, Any]]: """Lists supported product types. :param provider: (optional) The name of a provider that must support the product types we are about to list :param fetch_providers: (optional) Whether to fetch providers for new product types or not :returns: The list of the product types that can be accessed using eodag. :raises: :class:`~eodag.utils.exceptions.UnsupportedProvider` """ if fetch_providers: # First, update product types list if possible self.fetch_product_types_list(provider=provider) product_types: list[dict[str, Any]] = [] providers_configs = ( list(self.providers_config.values()) if not provider else [ p for p in self.providers_config.values() if provider in [p.name, getattr(p, "group", None)] ] ) if provider and not providers_configs: raise UnsupportedProvider( f"The requested provider is not (yet) supported: {provider}" ) for p in providers_configs: for product_type_id in p.products: # type: ignore if product_type_id == GENERIC_PRODUCT_TYPE: continue config = self.product_types_config[product_type_id] if "alias" in config: product_type_id = config["alias"] product_type = {"ID": product_type_id, **config} if product_type not in product_types: product_types.append(product_type) # Return the product_types sorted in lexicographic order of their ID return sorted(product_types, key=itemgetter("ID"))
[docs] def fetch_product_types_list(self, provider: Optional[str] = None) -> None: """Fetch product types list and update if needed. If strict mode is enabled (by setting the ``EODAG_STRICT_PRODUCT_TYPES`` environment variable to a truthy value), this method will not fetch or update product types and will return immediately. :param provider: The name of a provider or provider-group for which product types list should be updated. Defaults to all providers (None value). """ strict_mode = is_env_var_true("EODAG_STRICT_PRODUCT_TYPES") if strict_mode: return providers_to_fetch = list(self.providers_config.keys()) # check if some providers are grouped under a group name which is not a provider name if provider is not None and provider not in self.providers_config: providers_to_fetch = [ p for p, pconf in self.providers_config.items() if provider == getattr(pconf, "group", None) ] if providers_to_fetch: logger.info( f"Fetch product types for {provider} group: {', '.join(providers_to_fetch)}" ) else: return None elif provider is not None: providers_to_fetch = [provider] # providers discovery confs that are fetchable providers_discovery_configs_fetchable: dict[str, Any] = {} # check if any provider has not already been fetched for product types already_fetched = True for provider_to_fetch in providers_to_fetch: provider_config = self.providers_config[provider_to_fetch] # get discovery conf if hasattr(provider_config, "search"): provider_search_config = provider_config.search elif hasattr(provider_config, "api"): provider_search_config = provider_config.api else: continue discovery_conf = getattr( provider_search_config, "discover_product_types", {} ) if discovery_conf.get("fetch_url"): providers_discovery_configs_fetchable[ provider_to_fetch ] = discovery_conf if not getattr(provider_config, "product_types_fetched", False): already_fetched = False if not already_fetched: # get ext_product_types conf ext_product_types_cfg_file = os.getenv("EODAG_EXT_PRODUCT_TYPES_CFG_FILE") if ext_product_types_cfg_file is not None: ext_product_types_conf = get_ext_product_types_conf( ext_product_types_cfg_file ) else: ext_product_types_conf = get_ext_product_types_conf() if not ext_product_types_conf: # empty ext_product_types conf ext_product_types_conf = ( self.discover_product_types(provider=provider) or {} ) # update eodag product types list with new conf self.update_product_types_list(ext_product_types_conf) # Compare current provider with default one to see if it has been modified # and product types list would need to be fetched # get ext_product_types conf for user modified providers default_providers_config = load_default_config() for ( provider, user_discovery_conf, ) in providers_discovery_configs_fetchable.items(): # default discover_product_types conf if provider in default_providers_config: default_provider_config = default_providers_config[provider] if hasattr(default_provider_config, "search"): default_provider_search_config = default_provider_config.search elif hasattr(default_provider_config, "api"): default_provider_search_config = default_provider_config.api else: continue default_discovery_conf = getattr( default_provider_search_config, "discover_product_types", {} ) # compare confs if default_discovery_conf["result_type"] == "json" and isinstance( default_discovery_conf["results_entry"], str ): default_discovery_conf_parsed = dict( default_discovery_conf, **{ "results_entry": string_to_jsonpath( default_discovery_conf["results_entry"], force=True ) }, **mtd_cfg_as_conversion_and_querypath( dict( generic_product_type_id=default_discovery_conf[ "generic_product_type_id" ] ) ), **dict( generic_product_type_parsable_properties=mtd_cfg_as_conversion_and_querypath( default_discovery_conf[ "generic_product_type_parsable_properties" ] ) ), **dict( generic_product_type_parsable_metadata=mtd_cfg_as_conversion_and_querypath( default_discovery_conf[ "generic_product_type_parsable_metadata" ] ) ), ) else: default_discovery_conf_parsed = default_discovery_conf if ( user_discovery_conf == default_discovery_conf or user_discovery_conf == default_discovery_conf_parsed ) and ( not default_discovery_conf.get("fetch_url") or "ext_product_types_conf" not in locals() or "ext_product_types_conf" in locals() and ( provider in ext_product_types_conf or len(ext_product_types_conf.keys()) == 0 ) ): continue # providers not skipped here should be user-modified # or not in ext_product_types_conf (if eodag system conf != eodag conf used for ext_product_types_conf) if not already_fetched: # discover product types for user configured provider provider_ext_product_types_conf = ( self.discover_product_types(provider=provider) or {} ) # update eodag product types list with new conf self.update_product_types_list(provider_ext_product_types_conf)
[docs] def discover_product_types( self, provider: Optional[str] = None ) -> Optional[dict[str, Any]]: """Fetch providers for product types :param provider: The name of a provider or provider-group to fetch. Defaults to all providers (None value). :returns: external product types configuration """ grouped_providers = [ p for p, provider_config in self.providers_config.items() if provider == getattr(provider_config, "group", None) ] if provider and provider not in self.providers_config and grouped_providers: logger.info( f"Discover product types for {provider} group: {', '.join(grouped_providers)}" ) elif provider and provider not in self.providers_config: raise UnsupportedProvider( f"The requested provider is not (yet) supported: {provider}" ) ext_product_types_conf: dict[str, Any] = {} providers_to_fetch = [ p for p in ( [ p for p in self.providers_config if p in grouped_providers + [provider] ] if provider else self.available_providers() ) ] kwargs: dict[str, Any] = {} for provider in providers_to_fetch: if hasattr(self.providers_config[provider], "search"): search_plugin_config = self.providers_config[provider].search elif hasattr(self.providers_config[provider], "api"): search_plugin_config = self.providers_config[provider].api else: return None if getattr(search_plugin_config, "discover_product_types", {}).get( "fetch_url", None ): search_plugin: Union[Search, Api] = next( self._plugins_manager.get_search_plugins(provider=provider) ) # check after plugin init if still fetchable if not getattr(search_plugin.config, "discover_product_types", {}).get( "fetch_url" ): continue # append auth to search plugin if needed if getattr(search_plugin.config, "need_auth", False): if auth := self._plugins_manager.get_auth( search_plugin.provider, getattr(search_plugin.config, "api_endpoint", None), search_plugin.config, ): kwargs["auth"] = auth else: logger.debug( f"Could not authenticate on {provider} for product types discovery" ) ext_product_types_conf[provider] = None continue ext_product_types_conf[provider] = search_plugin.discover_product_types( **kwargs ) return sort_dict(ext_product_types_conf)
[docs] def update_product_types_list( self, ext_product_types_conf: dict[str, Optional[dict[str, dict[str, Any]]]] ) -> None: """Update eodag product types list :param ext_product_types_conf: external product types configuration """ for provider, new_product_types_conf in ext_product_types_conf.items(): if new_product_types_conf and provider in self.providers_config: try: search_plugin_config = getattr( self.providers_config[provider], "search", None ) or getattr(self.providers_config[provider], "api", None) if search_plugin_config is None: continue if not getattr( search_plugin_config, "discover_product_types", {} ).get("fetch_url"): # conf has been updated and provider product types are no more discoverable continue provider_products_config = ( self.providers_config[provider].products or {} ) except UnsupportedProvider: logger.debug( "Ignoring external product types for unknown provider %s", provider, ) continue new_product_types: list[str] = [] for ( new_product_type, new_product_type_conf, ) in new_product_types_conf["providers_config"].items(): if new_product_type not in provider_products_config: for existing_product_type in provider_products_config.copy(): # compare parsed extracted conf (without metadata_mapping entry) unparsable_keys = ( search_plugin_config.discover_product_types.get( "generic_product_type_unparsable_properties", {} ).keys() ) new_parsed_product_types_conf = { k: v for k, v in new_product_type_conf.items() if k not in unparsable_keys } if ( new_parsed_product_types_conf.items() <= provider_products_config[ existing_product_type ].items() ): # new_product_types_conf is a subset on an existing conf break else: # new_product_type_conf does not already exist, append it # to provider_products_config provider_products_config[ new_product_type ] = new_product_type_conf # to self.product_types_config self.product_types_config.source.update( { new_product_type: {"_id": new_product_type} | new_product_types_conf["product_types_config"][ new_product_type ] } ) ext_product_types_conf[provider] = new_product_types_conf new_product_types.append(new_product_type) if new_product_types: logger.debug( f"Added {len(new_product_types)} product types for {provider}" ) elif provider not in self.providers_config: # unknown provider continue self.providers_config[provider].product_types_fetched = True # re-create _plugins_manager using up-to-date providers_config self._plugins_manager.build_product_type_to_provider_config_map()
[docs] def available_providers( self, product_type: Optional[str] = None, by_group: bool = False ) -> list[str]: """Gives the sorted list of the available providers or groups The providers or groups are sorted first by their priority level in descending order, and then alphabetically in ascending order for providers or groups with the same priority level. :param product_type: (optional) Only list providers configured for this product_type :param by_group: (optional) If set to True, list groups when available instead of providers, mixed with other providers :returns: the sorted list of the available providers or groups """ if product_type: providers = [ (v.group if by_group and hasattr(v, "group") else k, v.priority) for k, v in self.providers_config.items() if product_type in getattr(v, "products", {}).keys() ] else: providers = [ (v.group if by_group and hasattr(v, "group") else k, v.priority) for k, v in self.providers_config.items() ] # If by_group is True, keep only the highest priority for each group if by_group: group_priority: dict[str, int] = {} for name, priority in providers: if name not in group_priority or priority > group_priority[name]: group_priority[name] = priority providers = list(group_priority.items()) # Sort by priority (descending) and then by name (ascending) providers.sort(key=lambda x: (-x[1], x[0])) # Return only the names of the providers or groups return [name for name, _ in providers]
def get_product_type_from_alias(self, alias_or_id: str) -> str: """Return the ID of a product type by either its ID or alias :param alias_or_id: Alias of the product type. If an existing ID is given, this method will directly return the given value. :returns: Internal name of the product type. """ product_types = [ k for k, v in self.product_types_config.items() if v.get("alias") == alias_or_id ] if len(product_types) > 1: raise NoMatchingProductType( f"Too many matching product types for alias {alias_or_id}: {product_types}" ) if len(product_types) == 0: if alias_or_id in self.product_types_config: return alias_or_id else: raise NoMatchingProductType( f"Could not find product type from alias or ID {alias_or_id}" ) return product_types[0] def get_alias_from_product_type(self, product_type: str) -> str: """Return the alias of a product type by its ID. If no alias was defined for the given product type, its ID is returned instead. :param product_type: product type ID :returns: Alias of the product type or its ID if no alias has been defined for it. """ if product_type not in self.product_types_config: raise NoMatchingProductType(product_type) return self.product_types_config[product_type].get("alias", product_type)
[docs] def guess_product_type( self, free_text: Optional[str] = None, intersect: bool = False, instrument: Optional[str] = None, platform: Optional[str] = None, platformSerialIdentifier: Optional[str] = None, processingLevel: Optional[str] = None, sensorType: Optional[str] = None, keywords: Optional[str] = None, abstract: Optional[str] = None, title: Optional[str] = None, missionStartDate: Optional[str] = None, missionEndDate: Optional[str] = None, **kwargs: Any, ) -> list[str]: """ Find EODAG product type IDs that best match a set of search parameters. When using several filters, product types that match most of them will be returned at first. :param free_text: Free text search filter used to search accross all the following parameters. Handles logical operators with parenthesis (``AND``/``OR``/``NOT``), quoted phrases (``"exact phrase"``), ``*`` and ``?`` wildcards. :param intersect: Join results for each parameter using INTERSECT instead of UNION. :param instrument: Instrument parameter. :param platform: Platform parameter. :param platformSerialIdentifier: Platform serial identifier parameter. :param processingLevel: Processing level parameter. :param sensorType: Sensor type parameter. :param keywords: Keywords parameter. :param abstract: Abstract parameter. :param title: Title parameter. :param missionStartDate: start date for datetime filtering. Not used by free_text :param missionEndDate: end date for datetime filtering. Not used by free_text :returns: The best match for the given parameters. :raises: :class:`~eodag.utils.exceptions.NoMatchingProductType` """ if productType := kwargs.get("productType"): return [productType] filters: dict[str, str] = { k: v for k, v in { "instrument": instrument, "platform": platform, "platformSerialIdentifier": platformSerialIdentifier, "processingLevel": processingLevel, "sensorType": sensorType, "keywords": keywords, "abstract": abstract, "title": title, }.items() if v is not None } only_dates = ( True if (not free_text and not filters and (missionStartDate or missionEndDate)) else False ) free_text_evaluator = ( compile_free_text_query(free_text) if free_text else lambda _: True ) guesses_with_score: list[tuple[str, int]] = [] for pt_id, pt_dict in self.product_types_config.source.items(): if ( pt_id == GENERIC_PRODUCT_TYPE or pt_id not in self._plugins_manager.product_type_to_provider_config_map ): continue score = 0 # how many filters matched # free text search if free_text: match = free_text_evaluator(pt_dict) if match: score += 1 elif intersect: continue # must match all filters # individual filters if filters: filters_matching_method = all if intersect else any filters_evaluators = { filter_name: compile_free_text_query(value) for filter_name, value in filters.items() if value is not None } filter_matches = [ filters_evaluators[filter_name]({filter_name: pt_dict[filter_name]}) for filter_name, value in filters.items() if filter_name in pt_dict ] if filters_matching_method(filter_matches): # add number of True matches to score score += sum(filter_matches) elif intersect: continue # must match all filters if score == 0 and not only_dates: continue # datetime filtering if missionStartDate or missionEndDate: min_aware = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) max_aware = datetime.datetime.max.replace(tzinfo=datetime.timezone.utc) max_start = max( rfc3339_str_to_datetime(missionStartDate) if missionStartDate else min_aware, rfc3339_str_to_datetime(pt_dict["missionStartDate"]) if pt_dict.get("missionStartDate") else min_aware, ) min_end = min( rfc3339_str_to_datetime(missionEndDate) if missionEndDate else max_aware, rfc3339_str_to_datetime(pt_dict["missionEndDate"]) if pt_dict.get("missionEndDate") else max_aware, ) if not (max_start <= min_end): continue pt_alias = pt_dict.get("alias", pt_id) guesses_with_score.append((pt_alias, score)) if guesses_with_score: # sort by score descending, then pt_id for stability guesses_with_score.sort(key=lambda x: (-x[1], x[0])) return [pt_id for pt_id, _ in guesses_with_score] raise NoMatchingProductType()
[docs] def search( self, page: int = DEFAULT_PAGE, items_per_page: int = DEFAULT_ITEMS_PER_PAGE, raise_errors: bool = False, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, provider: Optional[str] = None, count: bool = False, validate: Optional[bool] = True, **kwargs: Any, ) -> SearchResult: """Look for products matching criteria on known providers. The default behaviour is to look for products on the provider with the highest priority supporting the requested product type. These priorities are configurable through user configuration file or individual environment variable. If the request to the provider with the highest priority fails or is empty, the data will be request from the provider with the next highest priority. Only if the request fails for all available providers, an error will be thrown. :param page: (optional) The page number to return :param items_per_page: (optional) The number of results that must appear in one single page :param raise_errors: (optional) When an error occurs when searching, if this is set to True, the error is raised :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways: * with a Shapely geometry object: :class:`shapely.geometry.base.BaseGeometry` * with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"): ``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])`` * with a bounding box as list of float: ``[lonmin, latmin, lonmax, latmax]`` * with a WKT str :param locations: (optional) Location filtering by name using locations configuration ``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use the geometry of the features having the property ISO3 starting with 'PA' such as Panama and Pakistan in the shapefile configured with name=country and attr=ISO3 :param provider: (optional) the provider to be used. If set, search fallback will be disabled. If not set, the configured preferred provider will be used at first before trying others until finding results. :param count: (optional) Whether to run a query with a count request or not :param validate: (optional) Set to True to validate search parameters before sending the query to the provider :param kwargs: Some other criteria that will be used to do the search, using paramaters compatibles with the provider :returns: A collection of EO products matching the criteria .. versionchanged:: v3.0.0b1 ``search()`` method now returns only a single :class:`~eodag.api.search_result.SearchResult` instead of a 2 values tuple. .. note:: The search interfaces, which are implemented as plugins, are required to return a list as a result of their processing. This requirement is enforced here. """ search_plugins, search_kwargs = self._prepare_search( start=start, end=end, geom=geom, locations=locations, provider=provider, **kwargs, ) if search_kwargs.get("id"): # Don't validate requests by ID. "id" is not queryable. return self._search_by_id( search_kwargs.pop("id"), provider=provider, raise_errors=raise_errors, validate=False, **search_kwargs, ) # remove datacube query string from kwargs which was only needed for search-by-id search_kwargs.pop("_dc_qs", None) search_kwargs.update( page=page, items_per_page=items_per_page, ) errors: list[tuple[str, Exception]] = [] # Loop over available providers and return the first non-empty results for i, search_plugin in enumerate(search_plugins): search_plugin.clear() search_results = self._do_search( search_plugin, count=count, raise_errors=raise_errors, validate=validate, **search_kwargs, ) errors.extend(search_results.errors) if len(search_results) == 0 and i < len(search_plugins) - 1: logger.warning( f"No result could be obtained from provider {search_plugin.provider}, " "we will try to get the data from another provider", ) elif len(search_results) > 0: search_results.errors = errors return search_results if i > 1: logger.error("No result could be obtained from any available provider") return SearchResult([], 0, errors) if count else SearchResult([], errors=errors)
[docs] def search_iter_page( self, items_per_page: int = DEFAULT_ITEMS_PER_PAGE, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, **kwargs: Any, ) -> Iterator[SearchResult]: """Iterate over the pages of a products search. :param items_per_page: (optional) The number of results requested per page :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways: * with a Shapely geometry object: :class:`shapely.geometry.base.BaseGeometry` * with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"): ``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])`` * with a bounding box as list of float: ``[lonmin, latmin, lonmax, latmax]`` * with a WKT str :param locations: (optional) Location filtering by name using locations configuration ``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use the geometry of the features having the property ISO3 starting with 'PA' such as Panama and Pakistan in the shapefile configured with name=country and attr=ISO3 :param kwargs: Some other criteria that will be used to do the search, using paramaters compatibles with the provider :returns: An iterator that yields page per page a collection of EO products matching the criteria """ search_plugins, search_kwargs = self._prepare_search( start=start, end=end, geom=geom, locations=locations, **kwargs ) for i, search_plugin in enumerate(search_plugins): try: return self.search_iter_page_plugin( items_per_page=items_per_page, search_plugin=search_plugin, **search_kwargs, ) except RequestError: if i < len(search_plugins) - 1: logger.warning( "No result could be obtained from provider %s, " "we will try to get the data from another provider", search_plugin.provider, ) else: logger.error( "No result could be obtained from any available provider" ) raise raise RequestError("No result could be obtained from any available provider")
def search_iter_page_plugin( self, search_plugin: Union[Search, Api], items_per_page: int = DEFAULT_ITEMS_PER_PAGE, **kwargs: Any, ) -> Iterator[SearchResult]: """Iterate over the pages of a products search using a given search plugin. :param items_per_page: (optional) The number of results requested per page :param kwargs: Some other criteria that will be used to do the search, using parameters compatibles with the provider :param search_plugin: search plugin to be used :returns: An iterator that yields page per page a collection of EO products matching the criteria """ iteration = 1 # Store the search plugin config pagination.next_page_url_tpl to reset it later # since it might be modified if the next_page_url mechanism is used by the # plugin. (same thing for next_page_query_obj, next_page_query_obj with POST reqs) pagination_config = getattr(search_plugin.config, "pagination", {}) prev_next_page_url_tpl = pagination_config.get("next_page_url_tpl") prev_next_page_query_obj = pagination_config.get("next_page_query_obj") # Page has to be set to a value even if use_next is True, this is required # internally by the search plugin (see collect_search_urls) kwargs.update( page=1, items_per_page=items_per_page, ) prev_product = None next_page_url = None next_page_query_obj = None number_matched = None while True: # if count is enabled, it will only be performed on 1st iteration if iteration == 2: kwargs["count"] = False if iteration > 1 and next_page_url: pagination_config["next_page_url_tpl"] = next_page_url if iteration > 1 and next_page_query_obj: pagination_config["next_page_query_obj"] = next_page_query_obj logger.info("Iterate search over multiple pages: page #%s", iteration) try: # remove unwanted kwargs for _do_search kwargs.pop("raise_errors", None) search_result = self._do_search( search_plugin, raise_errors=True, **kwargs ) # if count is enabled, it will only be performed on 1st iteration if iteration == 1: number_matched = search_result.number_matched except Exception: logger.warning( "error at retrieval of data from %s, for params: %s", search_plugin.provider, str(kwargs), ) raise finally: # we don't want that next(search_iter_page(...)) modifies the plugin # indefinitely. So we reset after each request, but before the generator # yields, the attr next_page_url (to None) and # config.pagination["next_page_url_tpl"] (to its original value). next_page_url = getattr(search_plugin, "next_page_url", None) next_page_query_obj = getattr(search_plugin, "next_page_query_obj", {}) next_page_merge = getattr(search_plugin, "next_page_merge", None) if next_page_url: search_plugin.next_page_url = None if prev_next_page_url_tpl: search_plugin.config.pagination[ "next_page_url_tpl" ] = prev_next_page_url_tpl if next_page_query_obj: if prev_next_page_query_obj: search_plugin.config.pagination[ "next_page_query_obj" ] = prev_next_page_query_obj # Update next_page_query_obj for next page req if next_page_merge: search_plugin.next_page_query_obj = dict( getattr(search_plugin, "query_params", {}), **next_page_query_obj, ) else: search_plugin.next_page_query_obj = next_page_query_obj if len(search_result) > 0: # The first products between two iterations are compared. If they # are actually the same product, it means the iteration failed at # progressing for some reason. This is implemented as a workaround # to some search plugins/providers not handling pagination. product = search_result[0] if ( prev_product and product.properties["id"] == prev_product.properties["id"] and product.provider == prev_product.provider ): logger.warning( "Iterate over pages: stop iterating since the next page " "appears to have the same products as in the previous one. " "This provider may not implement pagination.", ) last_page_with_products = iteration - 1 break # use count got from 1st iteration search_result.number_matched = number_matched yield search_result prev_product = product # Prevent a last search if the current one returned less than the # maximum number of items asked for. if len(search_result) < items_per_page: last_page_with_products = iteration break else: last_page_with_products = iteration - 1 break iteration += 1 kwargs["page"] = iteration logger.debug( "Iterate over pages: last products found on page %s", last_page_with_products, )
[docs] def search_all( self, items_per_page: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, **kwargs: Any, ) -> SearchResult: """Search and return all the products matching the search criteria. It iterates over the pages of a search query and collects all the returned products into a single :class:`~eodag.api.search_result.SearchResult` instance. Requests are attempted to all providers of the product ordered by descending piority. :param items_per_page: (optional) The number of results requested internally per page. The maximum number of items than can be requested at once to a provider has been configured in EODAG for some of them. If items_per_page is None and this number is available for the searched provider, it is used to limit the number of requests made. This should also reduce the time required to collect all the products matching the search criteria. If this number is not available, a default value of 50 is used instead. items_per_page can also be set to any arbitrary value. :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways: * with a Shapely geometry object: :class:`shapely.geometry.base.BaseGeometry` * with a bounding box (dict with keys: "lonmin", "latmin", "lonmax", "latmax"): ``dict.fromkeys(["lonmin", "latmin", "lonmax", "latmax"])`` * with a bounding box as list of float: ``[lonmin, latmin, lonmax, latmax]`` * with a WKT str :param locations: (optional) Location filtering by name using locations configuration ``{"<location_name>"="<attr_regex>"}``. For example, ``{"country"="PA."}`` will use the geometry of the features having the property ISO3 starting with 'PA' such as Panama and Pakistan in the shapefile configured with name=country and attr=ISO3 :param kwargs: Some other criteria that will be used to do the search, using parameters compatible with the provider :returns: An iterator that yields page per page a collection of EO products matching the criteria """ # Get the search plugin and the maximized value # of items_per_page if defined for the provider used. try: product_type = self.get_product_type_from_alias( self.guess_product_type(**kwargs)[0] ) except NoMatchingProductType: product_type = GENERIC_PRODUCT_TYPE else: # fetch product types list if product_type is unknown if ( product_type not in self._plugins_manager.product_type_to_provider_config_map.keys() ): logger.debug( f"Fetching external product types sources to find {product_type} product type" ) self.fetch_product_types_list() # remove unwanted count kwargs.pop("count", None) search_plugins, search_kwargs = self._prepare_search( start=start, end=end, geom=geom, locations=locations, **kwargs ) for i, search_plugin in enumerate(search_plugins): itp = ( items_per_page or getattr(search_plugin.config, "pagination", {}).get( "max_items_per_page" ) or DEFAULT_MAX_ITEMS_PER_PAGE ) logger.info( "Searching for all the products with provider %s and a maximum of %s " "items per page.", search_plugin.provider, itp, ) all_results = SearchResult([]) try: for page_results in self.search_iter_page_plugin( items_per_page=itp, search_plugin=search_plugin, count=False, **search_kwargs, ): all_results.data.extend(page_results.data) logger.info( "Found %s result(s) on provider '%s'", len(all_results), search_plugin.provider, ) return all_results except RequestError: if len(all_results) == 0 and i < len(search_plugins) - 1: logger.warning( "No result could be obtained from provider %s, " "we will try to get the data from another provider", search_plugin.provider, ) elif len(all_results) == 0: logger.error( "No result could be obtained from any available provider" ) raise elif len(all_results) > 0: logger.warning( "Found %s result(s) on provider '%s', but it may be incomplete " "as it ended with an error", len(all_results), search_plugin.provider, ) return all_results raise RequestError("No result could be obtained from any available provider")
def _search_by_id( self, uid: str, provider: Optional[str] = None, **kwargs: Any ) -> SearchResult: """Internal method that enables searching a product by its id. Keeps requesting providers until a result matching the id is supplied. The search plugins should be developed in the way that enable them to handle the support of a search by id by the providers. The providers are requested one by one, in the order defined by their priorities. Be aware that because of that, the search can be slow, if the priority order is such that the provider that contains the requested product has the lowest priority. However, you can always speed up a little the search by passing the name of the provider on which to perform the search, if this information is available :param uid: The uid of the EO product :param provider: (optional) The provider on which to search the product. This may be useful for performance reasons when the user knows this product is available on the given provider :param kwargs: Search criteria to help finding the right product :returns: A search result with one EO product or None at all """ product_type = kwargs.get("productType") if product_type is not None: try: product_type = self.get_product_type_from_alias(product_type) except NoMatchingProductType: logger.debug("product type %s not found", product_type) get_search_plugins_kwargs = dict(provider=provider, product_type=product_type) search_plugins = self._plugins_manager.get_search_plugins( **get_search_plugins_kwargs ) # datacube query string _dc_qs = kwargs.pop("_dc_qs", None) results = SearchResult([]) for plugin in search_plugins: logger.info( "Searching product with id '%s' on provider: %s", uid, plugin.provider ) logger.debug("Using plugin class for search: %s", plugin.__class__.__name__) plugin.clear() # adds maximal pagination to be able to do a search-all + crunch if more # than one result are returned items_per_page = plugin.config.pagination.get( "max_items_per_page", DEFAULT_MAX_ITEMS_PER_PAGE ) kwargs.update(items_per_page=items_per_page) if isinstance(plugin, PostJsonSearch): kwargs.update( items_per_page=items_per_page, _dc_qs=_dc_qs, ) else: kwargs.update( items_per_page=items_per_page, ) try: # if more than one results are found, try getting them all and then filter using crunch for page_results in self.search_iter_page_plugin( search_plugin=plugin, id=uid, **kwargs, ): results.data.extend(page_results.data) except Exception as e: if kwargs.get("raise_errors"): raise logger.warning(e) results.errors.append((plugin.provider, e)) continue # try using crunch to get unique result if ( len(results) > 1 and len(filtered := results.filter_property(id=uid)) == 1 ): results = filtered if len(results) == 1: if not results[0].product_type: # guess product type from properties guesses = self.guess_product_type(**results[0].properties) results[0].product_type = guesses[0] # reset driver results[0].driver = results[0].get_driver() results.number_matched = 1 return results elif len(results) > 1: logger.info( "Several products found for this id (%s). You may try searching using more selective criteria.", results, ) return SearchResult([], 0, results.errors) def _fetch_external_product_type(self, provider: str, product_type: str): plugins = self._plugins_manager.get_search_plugins(provider=provider) plugin = next(plugins) # check after plugin init if still fetchable if not getattr(plugin.config, "discover_product_types", {}).get("fetch_url"): return None kwargs: dict[str, Any] = {"productType": product_type} # append auth if needed if getattr(plugin.config, "need_auth", False): if auth := self._plugins_manager.get_auth( plugin.provider, getattr(plugin.config, "api_endpoint", None), plugin.config, ): kwargs["auth"] = auth product_type_config = plugin.discover_product_types(**kwargs) self.update_product_types_list({provider: product_type_config}) def _prepare_search( self, start: Optional[str] = None, end: Optional[str] = None, geom: Optional[Union[str, dict[str, float], BaseGeometry]] = None, locations: Optional[dict[str, str]] = None, provider: Optional[str] = None, **kwargs: Any, ) -> tuple[list[Union[Search, Api]], dict[str, Any]]: """Internal method to prepare the search kwargs and get the search plugins. Product query: * By id (plus optional 'provider') * By search params: * productType query: * By product type (e.g. 'S2_MSI_L1C') * By params (e.g. 'platform'), see guess_product_type * dates: 'start' and/or 'end' * geometry: 'geom' or 'bbox' or 'box' * search locations * TODO: better expose cloudCover * other search params are passed to Searchplugin.query() :param start: (optional) Start sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param end: (optional) End sensing time in ISO 8601 format (e.g. "1990-11-26", "1990-11-26T14:30:10.153Z", "1990-11-26T14:30:10+02:00", ...). If no time offset is given, the time is assumed to be given in UTC. :param geom: (optional) Search area that can be defined in different ways (see search) :param locations: (optional) Location filtering by name using locations configuration :param provider: provider to be used, if no provider is given or the product type is not available for the provider, the preferred provider is used :param kwargs: Some other criteria * id and/or a provider for a search by * search criteria to guess the product type * other criteria compatible with the provider :returns: Search plugins list and the prepared kwargs to make a query. """ product_type: Optional[str] = kwargs.get("productType") if product_type is None: try: guesses = self.guess_product_type(**kwargs) # guess_product_type raises a NoMatchingProductType error if no product # is found. Here, the supported search params are removed from the # kwargs if present, not to propagate them to the query itself. for param in ( "instrument", "platform", "platformSerialIdentifier", "processingLevel", "sensorType", ): kwargs.pop(param, None) # By now, only use the best bet product_type = guesses[0] except NoMatchingProductType: queried_id = kwargs.get("id") if queried_id is None: logger.info( "No product type could be guessed with provided arguments" ) else: return [], kwargs if product_type is not None: try: product_type = self.get_product_type_from_alias(product_type) except NoMatchingProductType: logger.info("unknown product type " + product_type) kwargs["productType"] = product_type if start is not None: kwargs["startTimeFromAscendingNode"] = start if end is not None: kwargs["completionTimeFromAscendingNode"] = end if "box" in kwargs or "bbox" in kwargs: logger.warning( "'box' or 'bbox' parameters are only supported for backwards " " compatibility reasons. Usage of 'geom' is recommended." ) if geom is not None: kwargs["geometry"] = geom box = kwargs.pop("box", None) box = kwargs.pop("bbox", box) if geom is None and box is not None: kwargs["geometry"] = box kwargs["locations"] = locations kwargs["geometry"] = get_geometry_from_various(self.locations_config, **kwargs) # remove locations_args from kwargs now that they have been used locations_dict = {loc["name"]: loc for loc in self.locations_config} for arg in locations_dict.keys(): kwargs.pop(arg, None) del kwargs["locations"] # fetch product types list if product_type is unknown if ( product_type not in self._plugins_manager.product_type_to_provider_config_map.keys() ): if provider and product_type: # Try to get specific product type from external provider logger.debug(f"Fetching {provider} to find {product_type} product type") self._fetch_external_product_type(provider, product_type) if not provider: # no provider or still not found -> fetch all external product types logger.debug( f"Fetching external product types sources to find {product_type} product type" ) self.fetch_product_types_list() preferred_provider = self.get_preferred_provider()[0] search_plugins: list[Union[Search, Api]] = [] for plugin in self._plugins_manager.get_search_plugins( product_type=product_type, provider=provider ): # exclude MeteoblueSearch plugins from search fallback for unknown product_type if ( provider != plugin.provider and preferred_provider != plugin.provider and product_type not in self.product_types_config and isinstance(plugin, MeteoblueSearch) ): continue search_plugins.append(plugin) if not provider: provider = preferred_provider providers = [plugin.provider for plugin in search_plugins] if provider not in providers: logger.debug( "Product type '%s' is not available with preferred provider '%s'.", product_type, provider, ) else: provider_plugin = list( filter(lambda p: p.provider == provider, search_plugins) )[0] search_plugins.remove(provider_plugin) search_plugins.insert(0, provider_plugin) # Add product_types_config to plugin config. This dict contains product # type metadata that will also be stored in each product's properties. for search_plugin in search_plugins: if product_type is not None: self._attach_product_type_config(search_plugin, product_type) return search_plugins, kwargs def _do_search( self, search_plugin: Union[Search, Api], count: bool = False, raise_errors: bool = False, validate: Optional[bool] = True, **kwargs: Any, ) -> SearchResult: """Internal method that performs a search on a given provider. :param search_plugin: A search plugin :param count: (optional) Whether to run a query with a count request or not :param raise_errors: (optional) When an error occurs when searching, if this is set to True, the error is raised :param kwargs: Some other criteria that will be used to do the search :param validate: (optional) Set to True to validate search parameters before sending the query to the provider :returns: A collection of EO products matching the criteria """ logger.info("Searching on provider %s", search_plugin.provider) max_items_per_page = getattr(search_plugin.config, "pagination", {}).get( "max_items_per_page", DEFAULT_MAX_ITEMS_PER_PAGE ) if ( kwargs.get("items_per_page", DEFAULT_ITEMS_PER_PAGE) > max_items_per_page and max_items_per_page > 0 ): logger.warning( "EODAG believes that you might have asked for more products/items " "than the maximum allowed by '%s': %s > %s. Try to lower " "the value of 'items_per_page' and get the next page (e.g. 'page=2'), " "or directly use the 'search_all' method.", search_plugin.provider, kwargs["items_per_page"], max_items_per_page, ) results: list[EOProduct] = [] total_results: Optional[int] = 0 if count else None errors: list[tuple[str, Exception]] = [] try: prep = PreparedSearch(count=count) # append auth if needed if getattr(search_plugin.config, "need_auth", False): if auth := self._plugins_manager.get_auth( search_plugin.provider, getattr(search_plugin.config, "api_endpoint", None), search_plugin.config, ): prep.auth = auth prep.page = kwargs.pop("page", None) prep.items_per_page = kwargs.pop("items_per_page", None) if validate: search_plugin.validate(kwargs, prep.auth) res, nb_res = search_plugin.query(prep, **kwargs) if not isinstance(res, list): raise PluginImplementationError( "The query function of a Search plugin must return a list of " "results, got {} instead".format(type(res)) ) # Filter and attach to each eoproduct in the result the plugin capable of # downloading it (this is done to enable the eo_product to download itself # doing: eo_product.download()). The filtering is done by keeping only # those eo_products that intersects the search extent (if there was no # search extent, search_intersection contains the geometry of the # eo_product) # WARNING: this means an eo_product that has an invalid geometry can still # be returned as a search result if there was no search extent (because we # will not try to do an intersection) for eo_product in res: # if product_type is not defined, try to guess using properties if eo_product.product_type is None: pattern = re.compile(r"[^\w,]+") try: guesses = self.guess_product_type( intersect=False, **{ k: pattern.sub("", str(v).upper()) for k, v in eo_product.properties.items() if k in [ "instrument", "platform", "platformSerialIdentifier", "processingLevel", "sensorType", "keywords", ] and v is not None }, ) except NoMatchingProductType: pass else: eo_product.product_type = guesses[0] try: if eo_product.product_type is not None: eo_product.product_type = self.get_product_type_from_alias( eo_product.product_type ) except NoMatchingProductType: logger.debug("product type %s not found", eo_product.product_type) if eo_product.search_intersection is not None: eo_product._register_downloader_from_manager(self._plugins_manager) results.extend(res) total_results = ( None if (nb_res is None or total_results is None) else total_results + nb_res ) if count and nb_res is not None: logger.info( "Found %s result(s) on provider '%s'", nb_res, search_plugin.provider, ) except Exception as e: if raise_errors: # Raise the error, letting the application wrapping eodag know that # something went bad. This way it will be able to decide what to do next raise else: logger.exception( "Error while searching on provider %s (ignored):", search_plugin.provider, ) errors.append((search_plugin.provider, e)) return SearchResult(results, total_results, errors)
[docs] def crunch(self, results: SearchResult, **kwargs: Any) -> SearchResult: """Apply the filters given through the keyword arguments to the results :param results: The results of a eodag search request :returns: The result of successively applying all the filters to the results """ search_criteria = kwargs.pop("search_criteria", {}) for cruncher_name, cruncher_args in kwargs.items(): cruncher = self._plugins_manager.get_crunch_plugin( cruncher_name, **cruncher_args ) results = results.crunch(cruncher, **search_criteria) return results
[docs] @staticmethod def group_by_extent(searches: list[SearchResult]) -> list[SearchResult]: """Combines multiple SearchResults and return a list of SearchResults grouped by extent (i.e. bounding box). :param searches: List of eodag SearchResult :returns: list of :class:`~eodag.api.search_result.SearchResult` """ # Dict with extents as keys, each extent being defined by a str # "{minx}{miny}{maxx}{maxy}" (each float rounded to 2 dec). products_grouped_by_extent: dict[str, Any] = {} for search in searches: for product in search: same_geom = products_grouped_by_extent.setdefault( "".join([str(round(p, 2)) for p in product.geometry.bounds]), [] ) same_geom.append(product) return [ SearchResult(products_grouped_by_extent[extent_as_str]) for extent_as_str in products_grouped_by_extent ]
[docs] def download_all( self, search_result: SearchResult, downloaded_callback: Optional[DownloadedCallback] = None, progress_callback: Optional[ProgressCallback] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> list[str]: """Download all products resulting from a search. :param search_result: A collection of EO products resulting from a search :param downloaded_callback: (optional) A method or a callable object which takes as parameter the ``product``. You can use the base class :class:`~eodag.utils.DownloadedCallback` and override its ``__call__`` method. Will be called each time a product finishes downloading :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :param wait: (optional) If download fails, wait time in minutes between two download tries of the same product :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :param kwargs: Additional keyword arguments from the download plugin configuration class that can be provided to override any other values defined in a configuration file or with environment variables: * ``output_dir`` - where to store downloaded products, as an absolute file path (Default: local temporary directory) * ``output_extension`` - downloaded file extension * ``extract`` - whether to extract the downloaded products, only applies to archived products * ``dl_url_params`` - additional parameters to pass over to the download url as an url parameter * ``delete_archive`` - whether to delete the downloaded archives * ``asset`` - regex filter to identify assets to download :returns: A collection of the absolute paths to the downloaded products """ paths = [] if search_result: logger.info("Downloading %s products", len(search_result)) # Get download plugin using first product assuming product from several provider # aren't mixed into a search result download_plugin = self._plugins_manager.get_download_plugin( search_result[0] ) paths = download_plugin.download_all( search_result, downloaded_callback=downloaded_callback, progress_callback=progress_callback, wait=wait, timeout=timeout, **kwargs, ) else: logger.info("Empty search result, nothing to be downloaded !") return paths
[docs] @staticmethod def serialize( search_result: SearchResult, filename: str = "search_results.geojson" ) -> str: """Registers results of a search into a geojson file. :param search_result: A collection of EO products resulting from a search :param filename: (optional) The name of the file to generate :returns: The name of the created file """ with open(filename, "w") as fh: geojson.dump(search_result, fh) return filename
[docs] @staticmethod def deserialize(filename: str) -> SearchResult: """Loads results of a search from a geojson file. :param filename: A filename containing a search result encoded as a geojson :returns: The search results encoded in `filename` """ with open(filename, "r") as fh: return SearchResult.from_geojson(geojson.load(fh))
[docs] def deserialize_and_register(self, filename: str) -> SearchResult: """Loads results of a search from a geojson file and register products with the information needed to download itself :param filename: A filename containing a search result encoded as a geojson :returns: The search results encoded in `filename` """ products = self.deserialize(filename) for i, product in enumerate(products): if product.downloader is None: downloader = self._plugins_manager.get_download_plugin(product) auth = product.downloader_auth if auth is None: auth = self._plugins_manager.get_auth_plugin(downloader, product) products[i].register_downloader(downloader, auth) return products
[docs] def download( self, product: EOProduct, progress_callback: Optional[ProgressCallback] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> str: """Download a single product. This is an alias to the method of the same name on :class:`~eodag.api.product._product.EOProduct`, but it performs some additional checks like verifying that a downloader and authenticator are registered for the product before trying to download it. If the metadata mapping for ``downloadLink`` is set to something that can be interpreted as a link on a local filesystem, the download is skipped (by now, only a link starting with ``file:/`` is supported). Therefore, any user that knows how to extract product location from product metadata on a provider can override the ``downloadLink`` metadata mapping in the right way. For example, using the environment variable: ``EODAG__CREODIAS__SEARCH__METADATA_MAPPING__DOWNLOADLINK="file:///{id}"`` will lead to all :class:`~eodag.api.product._product.EOProduct`'s originating from the provider ``creodias`` to have their ``downloadLink`` metadata point to something like: ``file:///12345-678``, making this method immediately return the later string without trying to download the product. :param product: The EO product to download :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :param wait: (optional) If download fails, wait time in minutes between two download tries :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :param kwargs: Additional keyword arguments from the download plugin configuration class that can be provided to override any other values defined in a configuration file or with environment variables: * ``output_dir`` - where to store downloaded products, as an absolute file path (Default: local temporary directory) * ``output_extension`` - downloaded file extension * ``extract`` - whether to extract the downloaded products, only applies to archived products * ``dl_url_params`` - additional parameters to pass over to the download url as an url parameter * ``delete_archive`` - whether to delete the downloaded archives * ``asset`` - regex filter to identify assets to download :returns: The absolute path to the downloaded product in the local filesystem :raises: :class:`~eodag.utils.exceptions.PluginImplementationError` :raises: :class:`RuntimeError` """ if product.location.startswith("file:/"): logger.info("Local product detected. Download skipped") return uri_to_path(product.location) self._setup_downloader(product) path = product.download( progress_callback=progress_callback, wait=wait, timeout=timeout, **kwargs ) return path
def _setup_downloader(self, product: EOProduct) -> None: if product.downloader is None: downloader = self._plugins_manager.get_download_plugin(product) auth = product.downloader_auth if auth is None: auth = self._plugins_manager.get_auth_plugin(downloader, product) product.register_downloader(downloader, auth)
[docs] def get_cruncher(self, name: str, **options: Any) -> Crunch: """Build a crunch plugin from a configuration :param name: The name of the cruncher to build :param options: The configuration options of the cruncher :returns: The cruncher named ``name`` """ plugin_conf = {"name": name} plugin_conf.update({key.replace("-", "_"): val for key, val in options.items()}) return self._plugins_manager.get_crunch_plugin(name, **plugin_conf)
[docs] def list_queryables( self, provider: Optional[str] = None, fetch_providers: bool = True, **kwargs: Any, ) -> QueryablesDict: """Fetch the queryable properties for a given product type and/or provider. :param provider: (optional) The provider. :param fetch_providers: If new product types should be fetched from the providers; default: True :param kwargs: additional filters for queryables (`productType` or other search arguments) :raises UnsupportedProductType: If the specified product type is not available for the provider. :returns: A :class:`~eodag.api.product.queryables.QuerybalesDict` containing the EODAG queryable properties, associating parameters to their annotated type, and a additional_properties attribute """ # only fetch providers if product type is not found available_product_types: list[str] = [ pt["ID"] for pt in self.list_product_types(provider=provider, fetch_providers=False) ] product_type: Optional[str] = kwargs.get("productType") pt_alias: Optional[str] = product_type if product_type: if product_type not in available_product_types: if fetch_providers: # fetch providers and try again available_product_types = [ pt["ID"] for pt in self.list_product_types( provider=provider, fetch_providers=True ) ] raise UnsupportedProductType(f"{product_type} is not available.") try: kwargs["productType"] = product_type = self.get_product_type_from_alias( product_type ) except NoMatchingProductType as e: raise UnsupportedProductType(f"{product_type} is not available.") from e if not provider and not product_type: return QueryablesDict( additional_properties=True, **model_fields_to_annotated(CommonQueryables.model_fields), ) additional_properties = False additional_information = [] queryable_properties: dict[str, Any] = {} for plugin in self._plugins_manager.get_search_plugins(product_type, provider): # attach product type config product_type_configs: dict[str, Any] = {} if product_type: self._attach_product_type_config(plugin, product_type) product_type_configs[product_type] = plugin.config.product_type_config else: for pt in available_product_types: self._attach_product_type_config(plugin, pt) product_type_configs[pt] = plugin.config.product_type_config # authenticate if required if getattr(plugin.config, "need_auth", False) and ( auth := self._plugins_manager.get_auth_plugin(plugin) ): try: plugin.auth = auth.authenticate() except AuthenticationError: logger.debug( "queryables from provider %s could not be fetched due to an authentication error", plugin.provider, ) plugin_queryables = plugin.list_queryables( kwargs, available_product_types, product_type_configs, product_type, pt_alias, ) if plugin_queryables.additional_information: additional_information.append( f"{plugin.provider}: {plugin_queryables.additional_information}" ) queryable_properties = {**plugin_queryables, **queryable_properties} additional_properties = ( additional_properties or plugin_queryables.additional_properties ) return QueryablesDict( additional_properties=additional_properties, additional_information=" | ".join(additional_information), **queryable_properties, )
[docs] def available_sortables(self) -> dict[str, Optional[ProviderSortables]]: """For each provider, gives its available sortable parameter(s) and its maximum number of them if it supports the sorting feature, otherwise gives None. :returns: A dictionary with providers as keys and dictionary of sortable parameter(s) and its (their) maximum number as value(s). :raises: :class:`~eodag.utils.exceptions.UnsupportedProvider` """ sortables: dict[str, Optional[ProviderSortables]] = {} provider_search_plugins = self._plugins_manager.get_search_plugins() for provider_search_plugin in provider_search_plugins: provider = provider_search_plugin.provider if not hasattr(provider_search_plugin.config, "sort"): sortables[provider] = None continue sortable_params = list( provider_search_plugin.config.sort.get("sort_param_mapping", {}).keys() ) if not provider_search_plugin.config.sort.get("max_sort_params"): sortables[provider] = { "sortables": sortable_params, "max_sort_params": None, } continue sortables[provider] = { "sortables": sortable_params, "max_sort_params": provider_search_plugin.config.sort[ "max_sort_params" ], } return sortables
def _attach_product_type_config(self, plugin: Search, product_type: str) -> None: """ Attach product_types_config to plugin config. This dict contains product type metadata that will also be stored in each product's properties. """ try: plugin.config.product_type_config = dict( [ p for p in self.list_product_types( plugin.provider, fetch_providers=False ) if p["_id"] == product_type ][0], **{"productType": product_type}, ) # If the product isn't in the catalog, it's a generic product type. except IndexError: # Construct the GENERIC_PRODUCT_TYPE metadata plugin.config.product_type_config = dict( ID=GENERIC_PRODUCT_TYPE, **self.product_types_config[GENERIC_PRODUCT_TYPE], productType=product_type, ) # Remove the ID since this is equal to productType. plugin.config.product_type_config.pop("ID", None)
[docs] def import_stac_items(self, items_urls: list[str]) -> SearchResult: """Import STAC items from a list of URLs and convert them to SearchResult. - Origin provider and download links will be set if item comes from an EODAG server. - If item comes from a known EODAG provider, result will be registered to it, ready to download and its metadata normalized. - If item comes from an unknown provider, a generic STAC provider will be used. :param items_urls: A list of STAC items URLs to import :returns: A SearchResult containing the imported STAC items """ json_items = [] for item_url in items_urls: json_items.extend(fetch_stac_items(item_url)) # add a generic STAC provider that might be needed to handle the items self.add_provider(GENERIC_STAC_PROVIDER) results = SearchResult([]) for json_item in json_items: if search_result := SearchResult._from_stac_item( json_item, self._plugins_manager ): results.extend(search_result) return results