Source code for eodag.plugins.download.aws

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import os
import re
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast

import boto3
import requests
from botocore.exceptions import ClientError
from lxml import etree
from requests.auth import AuthBase

from eodag.api.product.metadata_mapping import (
    mtd_cfg_as_conversion_and_querypath,
    properties_from_json,
    properties_from_xml,
)
from eodag.plugins.authentication.aws_auth import raise_if_auth_error
from eodag.plugins.download.base import Download
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    HTTP_REQ_TIMEOUT,
    USER_AGENT,
    ProgressCallback,
    StreamResponse,
    flatten_top_directories,
    get_bucket_name_and_prefix,
    path_to_uri,
    rename_subfolder,
    sanitize,
)
from eodag.utils.exceptions import (
    AuthenticationError,
    DownloadError,
    MisconfiguredError,
    NoMatchingProductType,
    NotAvailableError,
    TimeOutError,
)
from eodag.utils.s3 import S3FileInfo, open_s3_zipped_object, stream_download_from_s3

if TYPE_CHECKING:
    from mypy_boto3_s3 import S3ServiceResource
    from mypy_boto3_s3.client import S3Client

    from eodag.api.product import EOProduct
    from eodag.api.search_result import SearchResult
    from eodag.config import PluginConfig
    from eodag.types.download_args import DownloadConf
    from eodag.utils import DownloadedCallback, Unpack


logger = logging.getLogger("eodag.download.aws")

# AWS chunk path identify patterns

# S2 L2A Tile files -----------------------------------------------------------
S2L2A_TILE_IMG_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/R(?P<res>[0-9]+m)/(?P<file>[A-Z0-9_]+)\.jp2$"
)
S2L2A_TILE_AUX_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/auxiliary/(?P<file>AUX_.+)$"
)
# S2 L2A QI Masks
S2_TILE_QI_MSK_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/qi/(?P<file_base>.+)_(?P<file_suffix>[0-9]+m\.jp2)$"
)
# S2 L2A QI PVI
S2_TILE_QI_PVI_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/qi/L2A_PVI\.jp2$"
)
# S2 Tile files ---------------------------------------------------------------
S2_TILE_IMG_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>[A-Z0-9_]+\.jp2)$"
)
S2_TILE_PREVIEW_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/preview/(?P<file>.+)$"
)
S2_TILE_AUX_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/auxiliary/(?P<file>.+)$"
)
S2_TILE_QI_DIR_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/qi/(?P<file>.+)$"
)
S2_TILE_THUMBNAIL_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>preview\.\w+)$"
)
S2_TILE_MTD_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>metadata\.xml)$"
)
# S2 Tile generic
S2_TILE_REGEX = re.compile(
    r"^tiles/(?P<tile1>[0-9]+)/(?P<tile2>[A-Z]+)/(?P<tile3>[A-Z]+)/(?P<year>[0-9]+)/(?P<month>[0-9]+)/"
    + r"(?P<day>[0-9]+)/(?P<num>[0-9]+)/(?P<file>.+)$"
)
# S2 Product files
S2_PROD_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/(?P<file>.+)$"
)
S2_PROD_DS_MTD_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/datastrip/"
    + r"(?P<num>.+)/(?P<file>metadata\.xml)$"
)
S2_PROD_DS_QI_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/datastrip/"
    + r"(?P<num>.+)/qi/(?P<file>.+)$"
)
S2_PROD_DS_QI_REPORT_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/datastrip/"
    + r"(?P<num>.+)/qi/(?P<filename>.+)_report\.xml$"
)
S2_PROD_INSPIRE_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/"
    + r"(?P<file>inspire\.xml)$"
)
# S2 Product generic
S2_PROD_MTD_REGEX = re.compile(
    r"^products/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<title>[A-Z0-9_]+)/"
    + r"(?P<file>metadata\.xml)$"
)
# S1 files --------------------------------------------------------------------
S1_CALIB_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/annotation/calibration/"
    + r"(?P<file_prefix>[a-z]+)-(?P<file_beam>[a-z]+)-(?P<file_pol>.+)\.xml$"
)
S1_ANNOT_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/annotation/"
    + r"(?P<file_beam>[a-z]+)-(?P<file_pol>.+)\.xml$"
)
S1_MEAS_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/measurement/"
    + r"(?P<file_beam>[a-z]+)-(?P<file_pol>.+)\.(?P<file_ext>[a-z0-9]+)$"
)
S1_REPORT_REGEX = re.compile(
    r"^GRD/(?P<year>[0-9]+)/(?P<month>[0-9]+)/(?P<day>[0-9]+)/(?P<beam>[A-Z0-9]+)/(?P<prod_pol>[A-Z0-9]+)/"
    + r"(?P<title>[A-Z0-9_]+)/(?P<file>report-\w+\.pdf)$"
)
# S1 generic
S1_REGEX = re.compile(
    r"^GRD/[0-9]{4}/[0-9]+/[0-9]+/[A-Z0-9]+/[A-Z0-9]+/(?P<title>S1[A-Z0-9_]+)/(?P<file>.+)$"
)
# CBERS4 generic
CBERS4_REGEX = re.compile(
    r"^GRD/[0-9]{4}/[0-9]+/[0-9]+/[A-Z0-9]+/[A-Z0-9]+/(?P<title>S1[A-Z0-9_]+)/(?P<file>.+)$"
)

# S1 image number conf per polarization ---------------------------------------
S1_IMG_NB_PER_POLAR = {
    "SH": {"HH": 1},
    "SV": {"VV": 1},
    "DH": {"HH": 1, "HV": 2},
    "DV": {"VV": 1, "VH": 2},
    "HH": {"HH": 1},
    "HV": {"HV": 1},
    "VV": {"VV": 1},
    "VH": {"VH": 1},
}


[docs] class AwsDownload(Download): """Download on AWS using S3 protocol. :param provider: provider name :param config: Download plugin configuration: * :attr:`~eodag.config.PluginConfig.type` (``str``) (**mandatory**): AwsDownload * :attr:`~eodag.config.PluginConfig.s3_endpoint` (``str``): s3 endpoint url * :attr:`~eodag.config.PluginConfig.flatten_top_dirs` (``bool``): if the directory structure should be flattened; default: ``True`` * :attr:`~eodag.config.PluginConfig.ignore_assets` (``bool``): ignore assets and download using ``downloadLink``; default: ``False`` * :attr:`~eodag.config.PluginConfig.ssl_verify` (``bool``): if the ssl certificates should be verified in requests; default: ``True`` * :attr:`~eodag.config.PluginConfig.bucket_path_level` (``int``): at which level of the path part of the url the bucket can be found; If no bucket_path_level is given, the bucket is taken from the first element of the netloc part. * :attr:`~eodag.config.PluginConfig.products` (``dict[str, dict[str, Any]``): product type specific config; the keys are the product types, the values are dictionaries which can contain the keys: * **default_bucket** (``str``): bucket where the product type can be found * **complementary_url_key** (``str``): keys to add additional urls * **build_safe** (``bool``): if a SAFE (Standard Archive Format for Europe) product should be created; used for Sentinel products; default: False * **fetch_metadata** (``dict[str, Any]``): config for metadata to be fetched for the SAFE product """
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: super(AwsDownload, self).__init__(provider, config)
def download( self, product: EOProduct, auth: Optional[Union[AuthBase, S3ServiceResource]] = None, progress_callback: Optional[ProgressCallback] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> Optional[str]: """Download method for AWS S3 API. The product can be downloaded as it is, or as SAFE-formatted product. SAFE-build is configured for a given provider and product type. If the product title is configured to be updated during download and SAFE-formatted, its destination path will be: `{output_dir}/{title}` :param product: The EO product to download :param auth: (optional) authenticated object :param progress_callback: (optional) A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :param kwargs: `output_dir` (str), `extract` (bool), `delete_archive` (bool) and `dl_url_params` (dict) can be provided as additional kwargs and will override any other values defined in a configuration file or with environment variables. :returns: The absolute path to the downloaded product in the local filesystem """ if progress_callback is None: logger.info( "Progress bar unavailable, please call product.download() instead of plugin.download()" ) progress_callback = ProgressCallback(disable=True) # prepare download & create dirs (before updating metadata) product_local_path, record_filename = self._download_preparation( product, progress_callback=progress_callback, **kwargs ) if not record_filename or not product_local_path: return product_local_path product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) # do not try to build SAFE if asset filter is used asset_filter = kwargs.get("asset") if asset_filter: build_safe = False ignore_assets = False else: build_safe = product_conf.get("build_safe", False) ignore_assets = getattr(self.config, "ignore_assets", False) # product conf overrides provider conf for "flatten_top_dirs" flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", True) ) # xtra metadata needed for SAFE product self._configure_safe_build(build_safe, product) # bucket names and prefixes bucket_names_and_prefixes = self._get_bucket_names_and_prefixes( product, asset_filter, ignore_assets, product_conf.get("complementary_url_key", []), ) # authenticate if product.downloader_auth: authenticated_objects = product.downloader_auth.authenticate_objects( bucket_names_and_prefixes ) else: raise MisconfiguredError( "Authentication plugin (AwsAuth) has to be configured if AwsDownload is used" ) # files in zip updated_bucket_names_and_prefixes = self._download_file_in_zip( product, bucket_names_and_prefixes, product_local_path, progress_callback ) # prevent nothing-to-download errors if download was performed in zip raise_error = ( False if len(updated_bucket_names_and_prefixes) != len(bucket_names_and_prefixes) else True ) # downloadable files unique_product_chunks = self._get_unique_products( updated_bucket_names_and_prefixes, authenticated_objects, asset_filter, ignore_assets, product, raise_error=raise_error, ) total_size = sum([p.size for p in unique_product_chunks]) or None # download if len(unique_product_chunks) > 0: progress_callback.reset(total=total_size) try: for product_chunk in unique_product_chunks: try: chunk_rel_path = self.get_chunk_dest_path( product, product_chunk, build_safe=build_safe, ) except NotAvailableError as e: # out of SAFE format chunk logger.warning(e) continue chunk_abs_path = os.path.join(product_local_path, chunk_rel_path) chunk_abs_path_dir = os.path.dirname(chunk_abs_path) if not os.path.isdir(chunk_abs_path_dir): os.makedirs(chunk_abs_path_dir) bucket_objects = authenticated_objects.get(product_chunk.bucket_name) extra_args = ( getattr(bucket_objects, "_params", {}).copy() if bucket_objects else {} ) if not os.path.isfile(chunk_abs_path): product_chunk.Bucket().download_file( product_chunk.key, chunk_abs_path, ExtraArgs=extra_args, Callback=progress_callback, ) except AuthenticationError as e: logger.warning("Unexpected error: %s" % e) except ClientError as e: raise_if_auth_error(e, self.provider) logger.warning("Unexpected error: %s" % e) # finalize safe product if build_safe and product.product_type and "S2_MSI" in product.product_type: self.finalize_s2_safe_product(product_local_path) # flatten directory structure elif flatten_top_dirs: flatten_top_directories(product_local_path) if build_safe: self.check_manifest_file_list(product_local_path) if asset_filter is None: # save hash/record file with open(record_filename, "w") as fh: fh.write(product.remote_location) logger.debug("Download recorded in %s", record_filename) product.location = path_to_uri(product_local_path) return product_local_path def _download_file_in_zip( self, product, bucket_names_and_prefixes, product_local_path, progress_callback ): """ Download file in zip from a prefix like `foo/bar.zip!file.txt` """ if ( not getattr(product, "downloader_auth", None) or product.downloader_auth.s3_resource is None ): logger.debug("Cannot check files in s3 zip without s3 resource") return bucket_names_and_prefixes s3_client = product.downloader_auth.get_s3_client() downloaded = [] for i, pack in enumerate(bucket_names_and_prefixes): bucket_name, prefix = pack if ".zip!" in prefix: splitted_path = prefix.split(".zip!") zip_prefix = f"{splitted_path[0]}.zip" rel_path = splitted_path[-1] dest_file = os.path.join(product_local_path, rel_path) dest_abs_path_dir = os.path.dirname(dest_file) if not os.path.isdir(dest_abs_path_dir): os.makedirs(dest_abs_path_dir) zip_file, _ = open_s3_zipped_object( bucket_name, zip_prefix, s3_client, partial=False ) with zip_file: # file size file_info = zip_file.getinfo(rel_path) progress_callback.reset(total=file_info.file_size) with ( zip_file.open(rel_path) as extracted, open(dest_file, "wb") as output_file, ): # Read in 1MB chunks for zchunk in iter(lambda: extracted.read(1024 * 1024), b""): output_file.write(zchunk) progress_callback(len(zchunk)) downloaded.append(i) return [ pack for i, pack in enumerate(bucket_names_and_prefixes) if i not in downloaded ] def _download_preparation( self, product: EOProduct, progress_callback: ProgressCallback, **kwargs: Unpack[DownloadConf], ) -> tuple[Optional[str], Optional[str]]: """ Preparation for the download: - check if file was already downloaded - get file path - create directories :param product: product to be downloaded :param progress_callback: progress callback to be used :param kwargs: additional arguments :return: local path and file name """ product_local_path, record_filename = self._prepare_download( product, progress_callback=progress_callback, **kwargs ) if not product_local_path or not record_filename: if product_local_path: product.location = path_to_uri(product_local_path) return product_local_path, None product_local_path = product_local_path.replace(".zip", "") # remove existing incomplete file if os.path.isfile(product_local_path): os.remove(product_local_path) # create product dest dir if not os.path.isdir(product_local_path): os.makedirs(product_local_path) return product_local_path, record_filename def _configure_safe_build(self, build_safe: bool, product: EOProduct): """ Updates the product properties with fetch metadata if safe build is enabled :param build_safe: if safe build is enabled :param product: product to be updated """ product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) ssl_verify = getattr(self.config, "ssl_verify", True) timeout = getattr(self.config, "timeout", HTTP_REQ_TIMEOUT) if build_safe and "fetch_metadata" in product_conf.keys(): fetch_format = product_conf["fetch_metadata"]["fetch_format"] update_metadata = product_conf["fetch_metadata"]["update_metadata"] fetch_url = product_conf["fetch_metadata"]["fetch_url"].format( **product.properties ) logger.info("Fetching extra metadata from %s" % fetch_url) try: resp = requests.get( fetch_url, headers=USER_AGENT, timeout=timeout, verify=ssl_verify, ) except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=timeout) from exc update_metadata = mtd_cfg_as_conversion_and_querypath(update_metadata) if fetch_format == "json": json_resp = resp.json() update_metadata = properties_from_json(json_resp, update_metadata) product.properties.update(update_metadata) elif fetch_format == "xml": update_metadata = properties_from_xml(resp.content, update_metadata) product.properties.update(update_metadata) else: logger.warning( "SAFE metadata fetch format %s not implemented" % fetch_format ) def _get_bucket_names_and_prefixes( self, product: EOProduct, asset_filter: Optional[str], ignore_assets: bool, complementary_url_keys: list[str], ) -> list[tuple[str, Optional[str]]]: """ Retrieves the bucket names and path prefixes for the assets :param product: product for which the assets shall be downloaded :param asset_filter: text for which the assets should be filtered :param ignore_assets: if product instead of individual assets should be used :return: tuples of bucket names and prefixes """ # if assets are defined, use them instead of scanning product.location if len(product.assets) > 0 and not ignore_assets: if asset_filter: filter_regex = re.compile(asset_filter) assets_keys = getattr(product, "assets", {}).keys() assets_keys = list(filter(filter_regex.fullmatch, assets_keys)) filtered_assets = { a_key: getattr(product, "assets", {})[a_key] for a_key in assets_keys } assets_values = [a for a in filtered_assets.values() if "href" in a] if not assets_values: raise NotAvailableError( rf"No asset key matching re.fullmatch(r'{asset_filter}') was found in {product}" ) else: assets_values = product.assets.values() bucket_names_and_prefixes = [] for complementary_url in assets_values: bucket_names_and_prefixes.append( self.get_product_bucket_name_and_prefix( product, complementary_url.get("href", "") ) ) else: bucket_names_and_prefixes = [ self.get_product_bucket_name_and_prefix(product) ] # add complementary urls try: for complementary_url_key in complementary_url_keys or []: bucket_names_and_prefixes.append( self.get_product_bucket_name_and_prefix( product, product.properties[complementary_url_key] ) ) except KeyError: logger.warning( "complementary_url_key %s is missing in %s properties" % (complementary_url_key, product.properties["id"]) ) return bucket_names_and_prefixes def _get_unique_products( self, bucket_names_and_prefixes: list[tuple[str, Optional[str]]], authenticated_objects: dict[str, Any], asset_filter: Optional[str], ignore_assets: bool, product: EOProduct, raise_error: bool = True, ) -> set[Any]: """ Retrieve unique product chunks based on authenticated objects and asset filters :param bucket_names_and_prefixes: list of bucket names and corresponding path prefixes :param authenticated_objects: available objects per bucket :param asset_filter: text for which assets should be filtered :param ignore_assets: if product instead of individual assets should be used :param product: product that shall be downloaded :param raise_error: raise error if there is nothing to download :return: set of product chunks that can be downloaded """ product_chunks: list[Any] = [] for bucket_name, prefix in bucket_names_and_prefixes: # unauthenticated items filtered out if bucket_name in authenticated_objects.keys(): product_chunks.extend( authenticated_objects[bucket_name].filter(Prefix=prefix) ) unique_product_chunks = set(product_chunks) # if asset_filter is used with ignore_assets, apply filtering on listed prefixes if asset_filter and ignore_assets: filter_regex = re.compile(asset_filter) unique_product_chunks = set( filter( lambda c: filter_regex.search(os.path.basename(c.key)), unique_product_chunks, ) ) if not unique_product_chunks and raise_error: raise NotAvailableError( rf"No file basename matching re.fullmatch(r'{asset_filter}') was found in {product.remote_location}" ) if not unique_product_chunks and raise_error: raise NoMatchingProductType("No product found to download.") return unique_product_chunks def _stream_download_dict( self, product: EOProduct, auth: Optional[Union[AuthBase, S3ServiceResource]] = None, byte_range: tuple[Optional[int], Optional[int]] = (None, None), compress: Literal["zip", "raw", "auto"] = "auto", wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> StreamResponse: """ Stream EO product data as a FastAPI-compatible `StreamResponse`, with support for partial downloads, asset filtering, and on-the-fly compression. This method streams data from one or more S3 objects that belong to a given EO product. It supports: - **Regex-based asset filtering** via `asset`, allowing partial product downloads. - **Byte-range requests** through the `byte_range` parameter, enabling partial download of data. - **Selective file extraction from ZIP archives**, for uncompressed entries (ZIP method: STORE only). This enables lazy access to individual files inside ZIPs without downloading the entire archive. Data is downloaded from S3 in parallel using HTTP range requests, which improves speed by downloading chunks concurrently using multiple concurrent **range requests**. #### Compression Behavior (`compress` parameter): - `"raw"`: - If there is only one file: returns a raw stream of that file. - For multiple files, streams them sequentially using an HTTP multipart/mixed response with proper MIME boundaries and per-file headers, allowing clients to parse each file independently. - `"auto"` (default): - Streams a single file as raw. - Streams multiple files as a ZIP archive. - `"zip"`: - Always returns a ZIP archive, whether one or many files are included. #### SAFE Archive Support: If the product type supports SAFE structure and no `asset_regex` is specified (i.e., full product download), the method attempts to reconstruct a valid SAFE archive layout in the streamed output. :param product: The EO product to download. :param asset: (optional) Regex pattern to filter which assets/files to include. :param auth: (optional) Authentication configuration (e.g., AWS credentials). :param byte_range: Tuple of (start, end) for a global byte range request. Either can be None for open-ended ranges. :param compress: One of "zip", "raw", or "auto". Controls how output is compressed: - "raw": single file is streamed directly; multiple files use a custom separator. - "auto": raw for single file, zipped for multiple. - "zip": always returns a ZIP archive. :returns: A `StreamResponse` object containing the streamed download and appropriate headers. """ asset_regex = kwargs.get("asset") product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) build_safe = ( False if asset_regex is not None else product_conf.get("build_safe", False) ) ignore_assets = getattr(self.config, "ignore_assets", False) self._configure_safe_build(build_safe, product) bucket_names_and_prefixes = self._get_bucket_names_and_prefixes( product, asset_regex, ignore_assets, product_conf.get("complementary_url_key", []), ) # authenticate if product.downloader_auth: authenticated_objects = product.downloader_auth.authenticate_objects( bucket_names_and_prefixes ) else: raise MisconfiguredError( "Authentication plugin (AwsAuth) has to be configured if AwsDownload is used" ) # downloadable files product_objects = self._get_unique_products( bucket_names_and_prefixes, authenticated_objects, asset_regex, ignore_assets, product, ) if auth and isinstance(auth, boto3.resources.base.ServiceResource): s3_resource = auth else: s3_resource = boto3.resource( service_name="s3", endpoint_url=getattr(self.config, "s3_endpoint", None), ) product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", True) ) common_path = ( self._get_commonpath(product, product_objects, build_safe) if flatten_top_dirs else "" ) if len(product_objects) == 1: common_path = os.path.dirname(common_path) assets_by_path = { a.get("href", "").split("s3://")[-1]: a for a in product.assets.get_values(asset_filter=asset_regex or "") } files_info = [] for obj in product_objects: try: rel_path = self.get_chunk_dest_path(product, obj, build_safe=build_safe) if flatten_top_dirs: rel_path = os.path.join( product.properties["title"], re.sub(rf"^{common_path}/?", "", rel_path), ) data_type = assets_by_path.get(f"{obj.bucket_name}/{obj.key}", {}).get( "type" ) file_info = S3FileInfo( key=obj.key, size=obj.size, bucket_name=obj.bucket_name, rel_path=rel_path, ) if data_type: file_info.data_type = data_type files_info.append(file_info) except NotAvailableError as e: logger.warning(e) title = product.properties.get("title") or product.properties.get( "id", "download" ) zip_filename = sanitize(title) return stream_download_from_s3( cast("S3Client", s3_resource.meta.client), files_info, byte_range, compress, zip_filename, ) def _get_commonpath( self, product: EOProduct, product_chunks: set[Any], build_safe: bool ) -> str: chunk_paths = [] for product_chunk in product_chunks: chunk_paths.append( self.get_chunk_dest_path(product, product_chunk, build_safe=build_safe) ) return os.path.commonpath(chunk_paths) def get_product_bucket_name_and_prefix( self, product: EOProduct, url: Optional[str] = None ) -> tuple[str, Optional[str]]: """Extract bucket name and prefix from product URL :param product: The EO product to download :param url: (optional) URL to use as product.location :returns: bucket_name and prefix as str """ if url is None: url = product.location bucket_path_level = getattr(self.config, "bucket_path_level", None) bucket, prefix = get_bucket_name_and_prefix( url=url, bucket_path_level=bucket_path_level ) if bucket is None: bucket = ( getattr(self.config, "products", {}) .get(product.product_type, {}) .get("default_bucket", "") ) return bucket, prefix def check_manifest_file_list(self, product_path: str) -> None: """Checks if products listed in manifest.safe exist""" manifest_path_list = [ os.path.join(d, x) for d, _, f in os.walk(product_path) for x in f if x == "manifest.safe" ] if len(manifest_path_list) == 0: raise FileNotFoundError( f"No manifest.safe could be found in {product_path}" ) else: safe_path = os.path.dirname(manifest_path_list[0]) root = etree.parse(os.path.join(safe_path, "manifest.safe")).getroot() for safe_file in root.xpath("//fileLocation"): safe_file_path = os.path.join(safe_path, safe_file.get("href")) if not os.path.isfile(safe_file_path) and "HTML" in safe_file.get("href"): # add empty files for missing HTML/* Path(safe_file_path).touch() elif not os.path.isfile(safe_file_path): logger.warning("SAFE build: %s is missing" % safe_file.get("href")) def finalize_s2_safe_product(self, product_path: str) -> None: """Add missing dirs to downloaded product""" try: logger.debug("Finalize SAFE product") manifest_path_list = [ os.path.join(d, x) for d, _, f in os.walk(product_path) for x in f if x == "manifest.safe" ] if len(manifest_path_list) == 0: raise FileNotFoundError( f"No manifest.safe could be found in {product_path}" ) else: safe_path = os.path.dirname(manifest_path_list[0]) # create empty missing dirs auxdata_path = os.path.join(safe_path, "AUX_DATA") if not os.path.isdir(auxdata_path): os.makedirs(auxdata_path) html_path = os.path.join(safe_path, "HTML") if not os.path.isdir(html_path): os.makedirs(html_path) repinfo_path = os.path.join(safe_path, "rep_info") if not os.path.isdir(repinfo_path): os.makedirs(repinfo_path) # granule tile dirname root = etree.parse(os.path.join(safe_path, "manifest.safe")).getroot() tile_id = cast( str, os.path.basename( os.path.dirname( root.xpath("//fileLocation[contains(@href,'MTD_TL.xml')]")[ 0 ].get("href") ) ), ) granule_folder = os.path.join(safe_path, "GRANULE") rename_subfolder(granule_folder, tile_id) # datastrip scene dirname scene_id = cast( str, os.path.basename( os.path.dirname( root.xpath("//fileLocation[contains(@href,'MTD_DS.xml')]")[ 0 ].get("href") ) ), ) datastrip_folder = os.path.join(safe_path, "DATASTRIP") rename_subfolder(datastrip_folder, scene_id) except Exception as e: logger.exception("Could not finalize SAFE product from downloaded data") raise DownloadError(e) def get_chunk_dest_path( self, product: EOProduct, chunk: Any, dir_prefix: Optional[str] = None, build_safe: bool = False, ) -> str: """Get chunk SAFE destination path""" if not build_safe: if dir_prefix is None: dir_prefix = chunk.key product_path: str = chunk.key.split(dir_prefix.strip("/") + "/")[-1] logger.debug(f"Downloading {chunk.key} to {product_path}") return product_path title_date1: Optional[str] = None title_part3: Optional[str] = None ds_dir: Any = 0 s2_processing_level: str = "" s1_title_suffix: Optional[str] = None # S2 common if product.product_type and "S2_MSI" in product.product_type: title_search: Optional[re.Match[str]] = re.search( r"^\w+_\w+_(\w+)_(\w+)_(\w+)_(\w+)_(\w+)$", product.properties["title"], ) title_date1 = title_search.group(1) if title_search else None title_part3 = title_search.group(4) if title_search else None ds_dir_search = re.search( r"^.+_(DS_\w+_+\w+_\w+)_\w+.\w+$", product.properties.get("originalSceneID", ""), ) ds_dir = ds_dir_search.group(1) if ds_dir_search else 0 s2_processing_level = product.product_type.split("_")[-1] # S1 common elif product.product_type == "S1_SAR_GRD": s1_title_suffix_search = re.search( r"^.+_([A-Z0-9_]+_[A-Z0-9_]+_[A-Z0-9_]+_[A-Z0-9_]+)_\w+$", product.properties["title"], ) s1_title_suffix = ( s1_title_suffix_search.group(1).lower().replace("_", "-") if s1_title_suffix_search else None ) # S2 L2A Tile files ----------------------------------------------- if matched := S2L2A_TILE_IMG_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/IMG_DATA/R%s/T%s%s%s_%s_%s_%s.jp2" % ( found_dict["num"], found_dict["res"], found_dict["tile1"], found_dict["tile2"], found_dict["tile3"], title_date1, found_dict["file"], found_dict["res"], ) elif matched := S2L2A_TILE_AUX_DIR_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/AUX_DATA/%s" % ( found_dict["num"], found_dict["file"], ) # S2 L2A QI Masks elif matched := S2_TILE_QI_MSK_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/QI_DATA/MSK_%sPRB_%s" % ( found_dict["num"], found_dict["file_base"], found_dict["file_suffix"], ) # S2 L2A QI PVI elif matched := S2_TILE_QI_PVI_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/QI_DATA/%s_%s_PVI.jp2" % ( found_dict["num"], title_part3, title_date1, ) # S2 Tile files --------------------------------------------------- elif matched := S2_TILE_PREVIEW_DIR_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/preview/%s" % ( found_dict["num"], found_dict["file"], ) elif matched := S2_TILE_IMG_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/IMG_DATA/T%s%s%s_%s_%s" % ( found_dict["num"], found_dict["tile1"], found_dict["tile2"], found_dict["tile3"], title_date1, found_dict["file"], ) elif matched := S2_TILE_THUMBNAIL_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/%s" % ( found_dict["num"], found_dict["file"], ) elif matched := S2_TILE_MTD_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/MTD_TL.xml" % found_dict["num"] elif matched := S2_TILE_AUX_DIR_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/AUX_DATA/AUX_%s" % ( found_dict["num"], found_dict["file"], ) elif matched := S2_TILE_QI_DIR_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/QI_DATA/%s" % ( found_dict["num"], found_dict["file"], ) # S2 Tiles generic elif matched := S2_TILE_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "GRANULE/%s/%s" % ( found_dict["num"], found_dict["file"], ) # S2 Product files elif matched := S2_PROD_DS_MTD_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "DATASTRIP/%s/MTD_DS.xml" % ds_dir elif matched := S2_PROD_DS_QI_REPORT_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "DATASTRIP/%s/QI_DATA/%s.xml" % ( ds_dir, found_dict["filename"], ) elif matched := S2_PROD_DS_QI_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "DATASTRIP/%s/QI_DATA/%s" % ( ds_dir, found_dict["file"], ) elif matched := S2_PROD_INSPIRE_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "INSPIRE.xml" elif matched := S2_PROD_MTD_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "MTD_MSI%s.xml" % s2_processing_level # S2 Product generic elif matched := S2_PROD_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "%s" % found_dict["file"] # S1 -------------------------------------------------------------- elif matched := S1_CALIB_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "annotation/calibration/%s-%s-%s-grd-%s-%s-%03d.xml" % ( found_dict["file_prefix"], product.properties["platformSerialIdentifier"].lower(), found_dict["file_beam"], found_dict["file_pol"], s1_title_suffix, S1_IMG_NB_PER_POLAR.get(product.properties["polarizationMode"], {}).get( found_dict["file_pol"].upper(), 1 ), ) elif matched := S1_ANNOT_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "annotation/%s-%s-grd-%s-%s-%03d.xml" % ( product.properties["platformSerialIdentifier"].lower(), found_dict["file_beam"], found_dict["file_pol"], s1_title_suffix, S1_IMG_NB_PER_POLAR.get(product.properties["polarizationMode"], {}).get( found_dict["file_pol"].upper(), 1 ), ) elif matched := S1_MEAS_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "measurement/%s-%s-grd-%s-%s-%03d.%s" % ( product.properties["platformSerialIdentifier"].lower(), found_dict["file_beam"], found_dict["file_pol"], s1_title_suffix, S1_IMG_NB_PER_POLAR.get(product.properties["polarizationMode"], {}).get( found_dict["file_pol"].upper(), 1 ), found_dict["file_ext"], ) elif matched := S1_REPORT_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "%s.SAFE-%s" % ( product.properties["title"], found_dict["file"], ) # S1 generic elif matched := S1_REGEX.match(chunk.key): found_dict = matched.groupdict() product_path = "%s" % found_dict["file"] # out of SAFE format else: raise NotAvailableError(f"Ignored {chunk.key} out of SAFE matching pattern") logger.debug(f"Downloading {chunk.key} to {product_path}") return product_path def download_all( self, products: SearchResult, auth: Optional[Union[AuthBase, S3ServiceResource]] = None, downloaded_callback: Optional[DownloadedCallback] = None, progress_callback: Optional[ProgressCallback] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> list[str]: """ download_all using parent (base plugin) method """ return super(AwsDownload, self).download_all( products, auth=auth, downloaded_callback=downloaded_callback, progress_callback=progress_callback, wait=wait, timeout=timeout, **kwargs, )