Source code for eodag.plugins.download.http

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import os
import re
import shutil
import tarfile
import zipfile
from email.message import Message
from itertools import chain
from json import JSONDecodeError
from pathlib import Path
from typing import (
    TYPE_CHECKING,
    Any,
    Iterator,
    Literal,
    Optional,
    TypedDict,
    Union,
    cast,
)
from urllib.parse import parse_qs, urlparse

import geojson
import requests
from lxml import etree
from requests import RequestException
from requests.auth import AuthBase
from requests.structures import CaseInsensitiveDict
from zipstream import ZipStream

from eodag.api.product.metadata_mapping import (
    NOT_AVAILABLE,
    OFFLINE_STATUS,
    ONLINE_STATUS,
    STAGING_STATUS,
    mtd_cfg_as_conversion_and_querypath,
    properties_from_json,
    properties_from_xml,
)
from eodag.plugins.download.base import Download
from eodag.utils import (
    DEFAULT_DOWNLOAD_TIMEOUT,
    DEFAULT_DOWNLOAD_WAIT,
    DEFAULT_STREAM_REQUESTS_TIMEOUT,
    HTTP_REQ_TIMEOUT,
    USER_AGENT,
    ProgressCallback,
    StreamResponse,
    flatten_top_directories,
    guess_extension,
    guess_file_type,
    parse_header,
    path_to_uri,
    rename_with_version,
    sanitize,
    string_to_jsonpath,
    uri_to_path,
)
from eodag.utils.exceptions import (
    AuthenticationError,
    DownloadError,
    MisconfiguredError,
    NotAvailableError,
    TimeOutError,
    ValidationError,
)

if TYPE_CHECKING:
    from jsonpath_ng import JSONPath
    from mypy_boto3_s3 import S3ServiceResource
    from requests import Response

    from eodag.api.product import Asset, EOProduct  # type: ignore
    from eodag.api.search_result import SearchResult
    from eodag.config import PluginConfig
    from eodag.types.download_args import DownloadConf
    from eodag.utils import DownloadedCallback, Unpack

logger = logging.getLogger("eodag.download.http")


[docs] class HTTPDownload(Download): """HTTPDownload plugin. Handles product download over HTTP protocol :param provider: provider name :param config: Download plugin configuration: * :attr:`~eodag.config.PluginConfig.type` (``str``) (**mandatory**): ``HTTPDownload`` * :attr:`~eodag.config.PluginConfig.base_uri` (``str``): default endpoint url * :attr:`~eodag.config.PluginConfig.method` (``str``): HTTP request method for the download request (``GET`` or ``POST``); default: ``GET`` * :attr:`~eodag.config.PluginConfig.extract` (``bool``): if the content of the downloaded file should be extracted; default: ``True`` * :attr:`~eodag.config.PluginConfig.auth_error_code` (``int``): which error code is returned in case of an authentication error * :attr:`~eodag.config.PluginConfig.dl_url_params` (``dict[str, Any]``): parameters to be added to the query params of the request * :attr:`~eodag.config.PluginConfig.archive_depth` (``int``): level in extracted path tree where to find data; default: ``1`` * :attr:`~eodag.config.PluginConfig.flatten_top_dirs` (``bool``): if the directory structure should be flattened; default: ``True`` * :attr:`~eodag.config.PluginConfig.ignore_assets` (``bool``): ignore assets and download using downloadLink; default: ``False`` * :attr:`~eodag.config.PluginConfig.timeout` (``int``): time to wait until request timeout in seconds; default: ``5`` * :attr:`~eodag.config.PluginConfig.ssl_verify` (``bool``): if the ssl certificates should be verified in requests; default: ``True`` * :attr:`~eodag.config.PluginConfig.no_auth_download` (``bool``): if the download should be done without authentication; default: ``True`` * :attr:`~eodag.config.PluginConfig.order_enabled` (``bool``): if the product has to be ordered to download it; if this parameter is set to true, a mapping for the orderLink has to be added to the metadata mapping of the search plugin used for the provider; default: ``False`` * :attr:`~eodag.config.PluginConfig.order_method` (``str``): HTTP request method for the order request (``GET`` or ``POST``); default: ``GET`` * :attr:`~eodag.config.PluginConfig.order_headers` (``[dict[str, str]]``): headers to be added to the order request * :attr:`~eodag.config.PluginConfig.order_on_response` (:class:`~eodag.config.PluginConfig.OrderOnResponse`): a typed dictionary containing the key ``metadata_mapping`` which can be used to add new product properties based on the data in response to the order request * :attr:`~eodag.config.PluginConfig.order_status` (:class:`~eodag.config.PluginConfig.OrderStatus`): configuration to handle the order status; contains information which method to use, how the response data is interpreted, which status corresponds to success, ordered and error and what should be done on success. * :attr:`~eodag.config.PluginConfig.products` (``dict[str, dict[str, Any]``): product type specific config; the keys are the product types, the values are dictionaries which can contain the key :attr:`~eodag.config.PluginConfig.extract` to overwrite the provider config for a specific product type """
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: super(HTTPDownload, self).__init__(provider, config)
def _order( self, product: EOProduct, auth: Optional[AuthBase] = None, **kwargs: Unpack[DownloadConf], ) -> Optional[dict[str, Any]]: """Send product order request. It will be executed once before the download retry loop, if the product is OFFLINE and has `orderLink` in its properties. Product ordering can be configured using the following download plugin parameters: - :attr:`~eodag.config.PluginConfig.order_enabled`: Wether order is enabled or not (may not use this method if no `orderLink` exists) - :attr:`~eodag.config.PluginConfig.order_method`: (optional) HTTP request method, GET (default) or POST - :attr:`~eodag.config.PluginConfig.order_on_response`: (optional) things to do with obtained order response: - *metadata_mapping*: edit or add new product propoerties properties Product properties used for order: - **orderLink**: order request URL :param product: The EO product to order :param auth: (optional) authenticated object :param kwargs: download additional kwargs :returns: the returned json status response """ product.properties["storageStatus"] = STAGING_STATUS order_method = getattr(self.config, "order_method", "GET").upper() ssl_verify = getattr(self.config, "ssl_verify", True) timeout = getattr(self.config, "timeout", HTTP_REQ_TIMEOUT) OrderKwargs = TypedDict( "OrderKwargs", {"json": dict[str, Union[Any, list[str]]]}, total=False ) order_kwargs: OrderKwargs = {} if order_method == "POST": # separate url & parameters parts = urlparse(str(product.properties["orderLink"])) query_dict = {} # `parts.query` may be a JSON with query strings as one of values. If `parse_qs` is executed as first step, # the resulting `query_dict` would be erroneous. try: query_dict = geojson.loads(parts.query) except JSONDecodeError: if parts.query: query_dict = parse_qs(parts.query) order_url = parts._replace(query="").geturl() if query_dict: order_kwargs["json"] = query_dict else: order_url = product.properties["orderLink"] order_kwargs = {} headers = {**getattr(self.config, "order_headers", {}), **USER_AGENT} try: with requests.request( method=order_method, url=order_url, auth=auth, timeout=timeout, headers=headers, verify=ssl_verify, **order_kwargs, ) as response: logger.debug(f"{order_method} {order_url} {headers} {order_kwargs}") try: response.raise_for_status() ordered_message = response.text logger.debug(ordered_message) product.properties["storageStatus"] = STAGING_STATUS except RequestException as e: self._check_auth_exception(e) msg = f"{product.properties['title']} could not be ordered" if e.response is not None and e.response.status_code == 400: raise ValidationError.from_error(e, msg) from e else: raise DownloadError.from_error(e, msg) from e return self.order_response_process(response, product) except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=timeout) from exc def order_response_process( self, response: Response, product: EOProduct ) -> Optional[dict[str, Any]]: """Process order response :param response: The order response :param product: The orderd EO product :returns: the returned json status response """ on_response_mm = getattr(self.config, "order_on_response", {}).get( "metadata_mapping", {} ) if not on_response_mm: return None logger.debug("Parsing order response to update product metada-mapping") on_response_mm_jsonpath = mtd_cfg_as_conversion_and_querypath( on_response_mm, ) json_response = response.json() properties_update = properties_from_json( {"json": json_response, "headers": {**response.headers}}, on_response_mm_jsonpath, ) product.properties.update( {k: v for k, v in properties_update.items() if v != NOT_AVAILABLE} ) # the job id becomes the product id for EcmwfSearch products if "ORDERABLE" in product.properties.get("id", ""): product.properties["id"] = product.properties.get( "orderId", product.properties["id"] ) product.properties["title"] = ( (product.product_type or product.provider).upper() + "_" + product.properties["id"] ) if "downloadLink" in product.properties: product.remote_location = product.location = product.properties[ "downloadLink" ] logger.debug(f"Product location updated to {product.location}") return json_response def _order_status( self, product: EOProduct, auth: Optional[AuthBase] = None, ) -> None: """Send product order status request. It will be executed before each download retry. Product order status request can be configured using the following download plugin parameters: - :attr:`~eodag.config.PluginConfig.order_status`: :class:`~eodag.config.PluginConfig.OrderStatus` Product properties used for order status: - **orderStatusLink**: order status request URL :param product: The ordered EO product :param auth: (optional) authenticated object :param kwargs: download additional kwargs """ status_config = getattr(self.config, "order_status", {}) success_code: Optional[int] = status_config.get("success", {}).get("http_code") timeout = getattr(self.config, "timeout", HTTP_REQ_TIMEOUT) def _request( url: str, method: str = "GET", headers: Optional[dict[str, Any]] = None, json: Optional[Any] = None, timeout: int = HTTP_REQ_TIMEOUT, ) -> Response: """Send request and handle allow redirects""" logger.debug(f"{method} {url} {headers} {json}") try: response = requests.request( method=method, url=url, auth=auth, timeout=timeout, headers={**(headers or {}), **USER_AGENT}, allow_redirects=False, # Redirection is manually handled json=json, ) logger.debug( f"Order download status request responded with {response.status_code}" ) response.raise_for_status() # Raise an exception if status code indicates an error # Handle redirection (if needed) if ( 300 <= response.status_code < 400 and response.status_code != success_code ): # cf: https://www.rfc-editor.org/rfc/rfc9110.html#name-303-see-other if response.status_code == 303: method = "GET" if new_url := response.headers.get("Location"): return _request(new_url, method, headers, json, timeout) return response except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=timeout) from exc status_request: dict[str, Any] = status_config.get("request", {}) status_request_method = str(status_request.get("method", "GET")).upper() if status_request_method == "POST": # separate url & parameters parts = urlparse(str(product.properties["orderStatusLink"])) status_url = parts._replace(query="").geturl() query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict = geojson.loads(parts.query) json_data = query_dict if query_dict else None else: status_url = product.properties["orderStatusLink"] json_data = None # check header for success before full status request skip_parsing_status_response = False status_dict: dict[str, Any] = {} config_on_success: dict[str, Any] = status_config.get("on_success", {}) on_success_mm = config_on_success.get("metadata_mapping", {}) status_response_content_needed = ( False if not any([v.startswith("$.json.") for v in on_success_mm.values()]) else True ) if success_code: try: response = _request( status_url, "HEAD", status_request.get("headers"), json_data, timeout, ) if ( response.status_code == success_code and not status_response_content_needed ): # success and no need to get status response content skip_parsing_status_response = True except RequestException as e: logger.debug(e) if not skip_parsing_status_response: # status request try: response = _request( status_url, status_request_method, status_request.get("headers"), json_data, timeout, ) if ( response.status_code == success_code and not status_response_content_needed ): # success and no need to get status response content skip_parsing_status_response = True except RequestException as e: msg = ( f"{product.properties.get('title') or product.properties.get('id') or product} " "order status could not be checked" ) if e.response is not None and e.response.status_code == 400: raise ValidationError.from_error(e, msg) from e else: raise DownloadError.from_error(e, msg) from e if not skip_parsing_status_response: # status request json_response = response.json() if not isinstance(json_response, dict): raise RequestException("response content is not a dict") status_dict = json_response status_mm = status_config.get("metadata_mapping", {}) status_mm_jsonpath = ( mtd_cfg_as_conversion_and_querypath( status_mm, ) if status_mm else {} ) logger.debug("Parsing order status response") status_dict = properties_from_json( {"json": response.json(), "headers": {**response.headers}}, status_mm_jsonpath, ) # display progress percentage if "percent" in status_dict: status_percent = str(status_dict["percent"]) if status_percent.isdigit(): status_percent += "%" logger.info( f"{product.properties['title']} order status: {status_percent}" ) product.properties.update( {k: v for k, v in status_dict.items() if v != NOT_AVAILABLE} ) product.properties["orderStatus"] = status_dict.get("status") status_message = status_dict.get("message") # handle status error errors: dict[str, Any] = status_config.get("error", {}) if errors and errors.items() <= status_dict.items(): raise DownloadError( f"Provider {product.provider} returned: {status_dict.get('error_message', status_message)}" ) product.properties["storageStatus"] = STAGING_STATUS success_status: dict[str, Any] = status_config.get("success", {}).get("status") # if not success if (success_status and success_status != status_dict.get("status")) or ( success_code and success_code != response.status_code ): return None product.properties["storageStatus"] = ONLINE_STATUS if not config_on_success: # Nothing left to do return None # need search on success ? if config_on_success.get("need_search"): logger.debug(f"Search for new location: {product.properties['searchLink']}") try: response = _request(product.properties["searchLink"], timeout=timeout) except RequestException as e: logger.warning( "%s order status could not be checked, request returned %s", product.properties["title"], e, ) msg = f"{product.properties['title']} order status could not be checked" if e.response is not None and e.response.status_code == 400: raise ValidationError.from_error(e, msg) from e else: raise DownloadError.from_error(e, msg) from e result_type = config_on_success.get("result_type", "json") result_entry = config_on_success.get("results_entry") on_success_mm_querypath = ( # append product.properties as input for on success response parsing mtd_cfg_as_conversion_and_querypath( dict( {k: str(v) for k, v in product.properties.items()}, **on_success_mm ), ) if on_success_mm else {} ) try: if result_type == "xml": if not result_entry: raise MisconfiguredError( '"result_entry" is required with "result_type" "xml"' 'in "order_status.on_success"' ) root_node = etree.fromstring(response.content) namespaces = {k or "ns": v for k, v in root_node.nsmap.items()} results = [ etree.tostring(entry) for entry in root_node.xpath( result_entry, namespaces=namespaces, ) ] if len(results) != 1: raise DownloadError( "Could not get a single result after order success for " f"{product.properties['searchLink']} request. " f"Please search and download {product} again" ) assert isinstance(results, list), "results must be in a list" # single result result = results[0] if on_success_mm_querypath: properties_update = properties_from_xml( result, on_success_mm_querypath, ) else: properties_update = {} else: json_response = ( response.json() if "application/json" in response.headers.get("Content-Type", "") else {} ) if result_entry: entry_jsonpath: JSONPath = string_to_jsonpath( result_entry, force=True ) json_response = entry_jsonpath.find(json_response) raise NotImplementedError( 'result_entry in config.on_success is not yet supported for result_type "json"' ) if on_success_mm_querypath: logger.debug( "Parsing on-success metadata-mapping using order status response" ) properties_update = properties_from_json( {"json": json_response, "headers": {**response.headers}}, on_success_mm_querypath, ) # only keep properties to update (remove product.properties added for parsing) properties_update = { k: v for k, v in properties_update.items() if k in on_success_mm } else: properties_update = {} except Exception as e: if isinstance(e, DownloadError): raise logger.debug(e) raise DownloadError( f"Could not parse result after order success. Please search and download {product} again" ) from e # update product product.properties.update(properties_update) if "downloadLink" in properties_update: product.location = product.remote_location = product.properties[ "downloadLink" ] else: self.order_response_process(response, product) def download( self, product: EOProduct, auth: Optional[Union[AuthBase, S3ServiceResource]] = None, progress_callback: Optional[ProgressCallback] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> Optional[str]: """Download a product using HTTP protocol. The downloaded product is assumed to be a Zip file. If it is not, the user is warned, it is renamed to remove the zip extension and no further treatment is done (no extraction) """ if auth is not None and not isinstance(auth, AuthBase): raise MisconfiguredError(f"Incompatible auth plugin: {type(auth)}") if progress_callback is None: logger.info( "Progress bar unavailable, please call product.download() instead of plugin.download()" ) progress_callback = ProgressCallback(disable=True) fs_path, record_filename = self._prepare_download( product, progress_callback=progress_callback, **kwargs, ) if not fs_path or not record_filename: if fs_path: product.location = path_to_uri(fs_path) return fs_path # download assets if exist instead of remote_location if len(product.assets) > 0 and ( not getattr(self.config, "ignore_assets", False) or kwargs.get("asset") is not None ): try: fs_path = self._download_assets( product, fs_path, record_filename, auth, progress_callback, **kwargs, ) if kwargs.get("asset") is None: product.location = path_to_uri(fs_path) return fs_path except NotAvailableError as e: if kwargs.get("asset") is not None: raise NotAvailableError(e).with_traceback(e.__traceback__) else: pass url = product.remote_location @self._order_download_retry(product, wait, timeout) def download_request( product: EOProduct, auth: AuthBase, progress_callback: ProgressCallback, wait: float, timeout: float, **kwargs: Unpack[DownloadConf], ) -> os.PathLike: is_empty = True chunk_iterator = self._stream_download( product, auth, progress_callback, **kwargs ) if fs_path is not None: ext = Path(product.filename).suffix path = Path(fs_path).with_suffix(ext) if "ORDERABLE" in path.stem and product.properties.get("title"): path = path.with_stem(sanitize(product.properties["title"])) with open(path, "wb") as fhandle: for chunk in chunk_iterator: is_empty = False progress_callback(len(chunk)) fhandle.write(chunk) self.stream.close() # Closing response stream if is_empty: raise DownloadError(f"product {product.properties['id']} is empty") else: # make sure storage status is online product.properties["storageStatus"] = ONLINE_STATUS return path else: raise DownloadError( f"download of product {product.properties['id']} failed" ) path = download_request( product, auth, progress_callback, wait, timeout, **kwargs ) with open(record_filename, "w") as fh: fh.write(url) logger.debug("Download recorded in %s", record_filename) if os.path.isfile(path) and not ( zipfile.is_zipfile(path) or tarfile.is_tarfile(path) ): new_fs_path = os.path.join( os.path.dirname(path), sanitize(product.properties["title"]), ) if os.path.isfile(new_fs_path): rename_with_version(new_fs_path) if not os.path.isdir(new_fs_path): os.makedirs(new_fs_path) shutil.move(path, new_fs_path) product.location = path_to_uri(new_fs_path) return new_fs_path product_path = self._finalize( str(path), progress_callback=progress_callback, **kwargs, ) product.location = path_to_uri(product_path) return product_path def _check_stream_size(self, product: EOProduct) -> int: stream_size = int(self.stream.headers.get("content-length", 0)) if ( stream_size == 0 and "storageStatus" in product.properties and product.properties["storageStatus"] != ONLINE_STATUS ): raise NotAvailableError( "%s(initially %s) ordered, got: %s" % ( product.properties["title"], product.properties["storageStatus"], self.stream.reason, ) ) return stream_size def _check_product_filename(self, product: EOProduct) -> str: filename = None asset_content_disposition = self.stream.headers.get("content-disposition") if asset_content_disposition: filename = cast( Optional[str], parse_header(asset_content_disposition).get_param("filename", None), ) if not filename: # default filename extracted from path filename = str(os.path.basename(self.stream.url)) filename_extension = os.path.splitext(filename)[1] if not filename_extension: if content_type := getattr(product, "headers", {}).get("Content-Type"): ext = guess_extension(content_type) if ext: filename += ext return filename def _stream_download_dict( self, product: EOProduct, auth: Optional[Union[AuthBase, S3ServiceResource]] = None, byte_range: tuple[Optional[int], Optional[int]] = (None, None), compress: Literal["zip", "raw", "auto"] = "auto", wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ) -> StreamResponse: r""" Returns dictionary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments. It contains a generator to streamed download chunks and the response headers. :param product: The EO product to download :param auth: (optional) authenticated object :param progress_callback: (optional) A progress callback :param wait: (optional) If download fails, wait time in minutes between two download tries :param timeout: (optional) If download fails, maximum time in minutes before stop retrying to download :param kwargs: `output_dir` (str), `extract` (bool), `delete_archive` (bool) and `dl_url_params` (dict) can be provided as additional kwargs and will override any other values defined in a configuration file or with environment variables. :returns: Dictionary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments """ if auth is not None and not isinstance(auth, AuthBase): raise MisconfiguredError(f"Incompatible auth plugin: {type(auth)}") # download assets if exist instead of remote_location if len(product.assets) > 0 and ( not getattr(self.config, "ignore_assets", False) or kwargs.get("asset") is not None ): try: assets_values = product.assets.get_values(kwargs.get("asset")) assets_stream_list = self._stream_download_assets( product, auth, None, assets_values=assets_values, **kwargs, ) # single asset if len(assets_stream_list) == 1: asset_stream = assets_stream_list[0] if assets_values[0].get("type"): asset_stream.headers["content-type"] = assets_values[0]["type"] return asset_stream # multiple assets in zip else: outputs_filename = ( sanitize(product.properties["title"]) if "title" in product.properties else sanitize(product.properties.get("id", "download")) ) # do not use global size if one of the assets has no size missing_length = any(not (asset.size) for asset in assets_values) zip_stream = ( ZipStream(sized=True) if not missing_length else ZipStream() ) for asset_stream in assets_stream_list: zip_stream.add( asset_stream.content, arcname=asset_stream.arcname, size=asset_stream.size, ) zip_length = len(zip_stream) if not missing_length else None return StreamResponse( content=zip_stream, media_type="application/zip", filename=f"{outputs_filename}.zip", size=zip_length, ) except NotAvailableError as e: if kwargs.get("asset") is not None: raise NotAvailableError(e).with_traceback(e.__traceback__) else: pass chunk_iterator = self._stream_download(product, auth, None, **kwargs) # start reading chunks to set product.headers try: first_chunk = next(chunk_iterator) except StopIteration: # product is empty file logger.error("product %s is empty", product.properties["id"]) raise NotAvailableError(f"product {product.properties['id']} is empty") return StreamResponse( content=chain(iter([first_chunk]), chunk_iterator), headers=product.headers, filename=getattr(product, "filename", None), size=getattr(product, "size", None), ) def _check_auth_exception(self, e: Optional[RequestException]) -> None: # check if error is identified as auth_error in provider conf auth_errors = getattr(self.config, "auth_error_code", [None]) if not isinstance(auth_errors, list): auth_errors = [auth_errors] response_text = ( e.response.text.strip() if e is not None and e.response is not None else "" ) if ( e is not None and e.response is not None and e.response.status_code in auth_errors ): raise AuthenticationError( f"Please check your credentials for {self.provider}.", f"HTTP Error {e.response.status_code} returned.", response_text, ) def _process_exception( self, e: Optional[RequestException], product: EOProduct, ordered_message: str ) -> None: self._check_auth_exception(e) response_text = ( e.response.text.strip() if e is not None and e.response is not None else "" ) # product not available if product.properties.get("storageStatus", ONLINE_STATUS) != ONLINE_STATUS: msg = ( ordered_message if ordered_message and not response_text else response_text ) raise NotAvailableError( "%s(initially %s) requested, returned: %s" % ( product.properties["title"], product.properties["storageStatus"], msg, ) ) else: import traceback as tb if e: logger.error( "Error while getting resource :\n%s\n%s", tb.format_exc(), response_text, ) else: logger.error("Error while getting resource :\n%s", tb.format_exc()) def _order_request( self, product: EOProduct, auth: Optional[AuthBase], ) -> None: if ( "orderLink" in product.properties and product.properties.get("storageStatus") == OFFLINE_STATUS and not product.properties.get("orderStatus") ): self._order(product=product, auth=auth) if ( product.properties.get("orderStatusLink") and product.properties.get("storageStatus") != ONLINE_STATUS ): self._order_status(product=product, auth=auth) def order( self, product: EOProduct, auth: Optional[Union[AuthBase, S3ServiceResource]] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, ) -> None: """ Order product and poll to check its status :param product: The EO product to download :param auth: (optional) authenticated object :param wait: (optional) Wait time in minutes between two order status check :param timeout: (optional) Maximum time in minutes before stop checking order status """ self._order_download_retry(product, wait, timeout)(self._order_request)( product, auth ) def _stream_download( self, product: EOProduct, auth: Optional[AuthBase] = None, progress_callback: Optional[ProgressCallback] = None, **kwargs: Unpack[DownloadConf], ) -> Iterator[Any]: """ Fetches a zip file containing the assets of a given product as a stream and returns a generator yielding the chunks of the file :param product: product for which the assets should be downloaded :param auth: The configuration of a plugin of type Authentication :param progress_callback: A method or a callable object which takes a current size and a maximum size as inputs and handle progress bar creation and update to give the user a feedback on the download progress :param kwargs: additional arguments """ if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) ssl_verify = getattr(self.config, "ssl_verify", True) ordered_message = "" # retry handled at download level self._order_request(product, auth) params = kwargs.pop("dl_url_params", None) or getattr( self.config, "dl_url_params", {} ) req_method = ( product.properties.get("downloadMethod", "").lower() or getattr(self.config, "method", "GET").lower() ) url = product.remote_location if req_method == "post": # separate url & parameters parts = urlparse(url) query_dict = parse_qs(parts.query) if not query_dict and parts.query: query_dict = geojson.loads(parts.query) req_url = parts._replace(query="").geturl() req_kwargs: dict[str, Any] = {"json": query_dict} if query_dict else {} else: req_url = url req_kwargs = {} if req_url.startswith(NOT_AVAILABLE): raise NotAvailableError("Download link is not available") if getattr(self.config, "no_auth_download", False): auth = None s = requests.Session() try: self.stream = s.request( req_method, req_url, stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, verify=ssl_verify, **req_kwargs, ) except requests.exceptions.MissingSchema: # location is not a valid url -> product is not available yet raise NotAvailableError("Product is not available yet") try: self.stream.raise_for_status() except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT) from exc except RequestException as e: self._process_exception(e, product, ordered_message) raise DownloadError( f"download of {product.properties['id']} is empty" ) from e else: # check if product was ordered if getattr( self.stream, "status_code", None ) is not None and self.stream.status_code == getattr( self.config, "order_status", {} ).get( "ordered", {} ).get( "http_code" ): product.properties["storageStatus"] = "ORDERED" self._process_exception(None, product, ordered_message) stream_size = self._check_stream_size(product) or None product.headers = self.stream.headers filename = self._check_product_filename(product) content_type = product.headers.get("Content-Type") guessed_content_type = ( guess_file_type(filename) if filename and not content_type else None ) if guessed_content_type is not None: product.headers["Content-Type"] = guessed_content_type progress_callback.reset(total=stream_size) product.size = stream_size product.filename = filename return self.stream.iter_content(chunk_size=64 * 1024) def _stream_download_assets( self, product: EOProduct, auth: Optional[AuthBase] = None, progress_callback: Optional[ProgressCallback] = None, assets_values: list[Asset] = [], **kwargs: Unpack[DownloadConf], ) -> list[StreamResponse]: """Stream download assets as a zip file.""" if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) # get extra parameters to pass to the query params = kwargs.pop("dl_url_params", None) or getattr( self.config, "dl_url_params", {} ) total_size = self._get_asset_sizes(assets_values, auth, params) or None progress_callback.reset(total=total_size) # loop for assets paths and get common_subdir asset_rel_paths_list = [] for asset in assets_values: asset_rel_path_parts = urlparse(asset["href"]).path.strip("/").split("/") asset_rel_path_parts_sanitized = [ sanitize(part) for part in asset_rel_path_parts ] asset.rel_path = os.path.join(*asset_rel_path_parts_sanitized) asset_rel_paths_list.append(asset.rel_path) if asset_rel_paths_list: assets_common_subdir = os.path.commonpath(asset_rel_paths_list) # product conf overrides provider conf for "flatten_top_dirs" product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", True) ) ssl_verify = getattr(self.config, "ssl_verify", True) matching_url = ( getattr(product.downloader_auth.config, "matching_url", "") if product.downloader_auth else "" ) matching_conf = ( getattr(product.downloader_auth.config, "matching_conf", None) if product.downloader_auth else None ) def get_chunks_generator(asset: Asset) -> Iterator[bytes]: """Create a generator function that will be called by ZipStream when needed.""" asset_href = asset.get("href") # This function will be called by zipstream when it needs the data if not asset_href or asset_href.startswith("file:"): logger.info(f"Local asset detected. Download skipped for {asset_href}") return # Determine auth if matching_conf or (matching_url and re.match(matching_url, asset_href)): auth_object = auth else: auth_object = None # Make the request inside the generator try: with requests.get( asset_href, stream=True, auth=auth_object, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, verify=ssl_verify, ) as stream: stream.raise_for_status() # Process asset path asset_rel_path = ( asset.rel_path.replace(assets_common_subdir, "").strip(os.sep) if flatten_top_dirs else asset.rel_path ) asset_rel_dir = os.path.dirname(asset_rel_path) if not getattr(asset, "filename", None): # try getting filename in GET header if was not found in HEAD result asset_content_disposition = stream.headers.get( "content-disposition" ) if asset_content_disposition: asset.filename = cast( Optional[str], parse_header(asset_content_disposition).get_param( "filename", None ), ) if not getattr(asset, "filename", None): # default filename extracted from path asset.filename = os.path.basename(asset.rel_path) asset.rel_path = os.path.join( asset_rel_dir, cast(str, asset.filename) ) for chunk in stream.iter_content(chunk_size=64 * 1024): if chunk: progress_callback(len(chunk)) yield chunk except requests.exceptions.Timeout as exc: raise TimeOutError( exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT ) from exc except RequestException as e: self._handle_asset_exception(e, asset) assets_stream_list = [] # Process each asset for asset in assets_values: if not asset["href"] or asset["href"].startswith("file:"): logger.info( f"Local asset detected. Download skipped for {asset['href']}" ) continue asset_chunks = get_chunks_generator(asset) try: # start reading chunks to set assets attributes first_chunk = next(asset_chunks) asset_chunks = chain(iter([first_chunk]), asset_chunks) except StopIteration: # Empty generator asset_chunks = iter([]) assets_stream_list.append( StreamResponse( content=asset_chunks, filename=getattr(asset, "filename", None), arcname=getattr(asset, "rel_path", None), size=getattr(asset, "size", 0) or None, ) ) return assets_stream_list def _download_assets( self, product: EOProduct, fs_dir_path: str, record_filename: str, auth: Optional[AuthBase] = None, progress_callback: Optional[ProgressCallback] = None, **kwargs: Unpack[DownloadConf], ) -> str: """Download product assets if they exist""" if progress_callback is None: logger.info("Progress bar unavailable, please call product.download()") progress_callback = ProgressCallback(disable=True) assets_urls = [ a["href"] for a in getattr(product, "assets", {}).values() if "href" in a ] if not assets_urls: raise NotAvailableError("No assets available for %s" % product) assets_values = product.assets.get_values(kwargs.get("asset")) assets_stream_list = self._stream_download_assets( product, auth, progress_callback, assets_values=assets_values, **kwargs ) # remove existing incomplete file if os.path.isfile(fs_dir_path): os.remove(fs_dir_path) # create product dest dir if not os.path.isdir(fs_dir_path): os.makedirs(fs_dir_path) # product conf overrides provider conf for "flatten_top_dirs" product_conf = getattr(self.config, "products", {}).get( product.product_type, {} ) flatten_top_dirs = product_conf.get( "flatten_top_dirs", getattr(self.config, "flatten_top_dirs", True) ) # count local assets local_assets_count = 0 for asset in assets_values: if asset["href"].startswith("file:"): local_assets_count += 1 continue for asset_stream in assets_stream_list: asset_chunks = asset_stream.content asset_path = cast(str, asset_stream.arcname) asset_abs_path = os.path.join(fs_dir_path, asset_path) asset_abs_path_temp = asset_abs_path + "~" # create asset subdir if not exist asset_abs_path_dir = os.path.dirname(asset_abs_path) if not os.path.isdir(asset_abs_path_dir): os.makedirs(asset_abs_path_dir) # remove temporary file if os.path.isfile(asset_abs_path_temp): os.remove(asset_abs_path_temp) if not os.path.isfile(asset_abs_path): logger.debug("Downloading to temporary file '%s'", asset_abs_path_temp) with open(asset_abs_path_temp, "wb") as fhandle: for chunk in asset_chunks: if chunk: fhandle.write(chunk) logger.debug( "Download completed. Renaming temporary file '%s' to '%s'", os.path.basename(asset_abs_path_temp), os.path.basename(asset_abs_path), ) os.rename(asset_abs_path_temp, asset_abs_path) # only one local asset if local_assets_count == len(assets_urls) and local_assets_count == 1: # remove empty {fs_dir_path} shutil.rmtree(fs_dir_path) # and return assets_urls[0] path fs_dir_path = uri_to_path(assets_urls[0]) # do not flatten dir flatten_top_dirs = False # several local assets elif local_assets_count == len(assets_urls) and local_assets_count > 0: common_path = os.path.commonpath([uri_to_path(uri) for uri in assets_urls]) # remove empty {fs_dir_path} shutil.rmtree(fs_dir_path) # and return assets_urls common path fs_dir_path = common_path # do not flatten dir flatten_top_dirs = False # no assets downloaded but some should have been elif len(os.listdir(fs_dir_path)) == 0: raise NotAvailableError("No assets could be downloaded") # flatten directory structure if flatten_top_dirs: flatten_top_directories(fs_dir_path) if kwargs.get("asset") is None: # save hash/record file with open(record_filename, "w") as fh: fh.write(product.remote_location) logger.debug("Download recorded in %s", record_filename) return fs_dir_path def _handle_asset_exception(self, e: RequestException, asset: Asset) -> None: # check if error is identified as auth_error in provider conf auth_errors = getattr(self.config, "auth_error_code", [None]) if not isinstance(auth_errors, list): auth_errors = [auth_errors] if e.response is not None and e.response.status_code in auth_errors: raise AuthenticationError( f"Please check your credentials for {self.provider}.", f"HTTP Error {e.response.status_code} returned.", e.response.text.strip(), ) else: logger.error( "Unexpected error at download of asset %s: %s", asset["href"], e ) raise DownloadError(e) def _get_asset_sizes( self, assets_values: list[Asset], auth: Optional[AuthBase], params: Optional[dict[str, str]], zipped: bool = False, ) -> int: total_size = 0 timeout = getattr(self.config, "timeout", HTTP_REQ_TIMEOUT) ssl_verify = getattr(self.config, "ssl_verify", True) # loop for assets size & filename for asset in assets_values: if asset["href"] and not asset["href"].startswith("file:"): # HEAD request for size & filename try: asset_headers_resp = requests.head( asset["href"], auth=auth, params=params, headers=USER_AGENT, timeout=timeout, verify=ssl_verify, ) asset_headers_resp.raise_for_status() asset_headers = asset_headers_resp.headers except RequestException as e: logger.debug(f"HEAD request failed: {str(e)}") asset_headers = CaseInsensitiveDict() if not getattr(asset, "size", 0): # size from HEAD header / Content-length asset.size = int(asset_headers.get("Content-length", 0)) header_content_disposition = Message() if not getattr(asset, "size", 0) or not getattr(asset, "filename", 0): # header content-disposition header_content_disposition = parse_header( asset_headers.get("content-disposition", "") ) if not getattr(asset, "size", 0): # size from HEAD header / content-disposition / size size_str = str(header_content_disposition.get_param("size", 0)) asset.size = int(size_str) if size_str.isdigit() else 0 if not getattr(asset, "filename", 0): # filename from HEAD header / content-disposition / size asset_filename = header_content_disposition.get_param( "filename", None ) asset.filename = str(asset_filename) if asset_filename else None if not getattr(asset, "size", 0): # GET request for size with requests.get( asset["href"], stream=True, auth=auth, params=params, headers=USER_AGENT, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, verify=ssl_verify, ) as stream: # size from GET header / Content-length asset.size = int(stream.headers.get("Content-length", 0)) if not getattr(asset, "size", 0): # size from GET header / content-disposition / size size_str = str( parse_header( stream.headers.get("content-disposition", "") ).get_param("size", 0) ) asset.size = int(size_str) if size_str.isdigit() else 0 total_size += asset.size return total_size def download_all( self, products: SearchResult, auth: Optional[Union[AuthBase, S3ServiceResource]] = None, downloaded_callback: Optional[DownloadedCallback] = None, progress_callback: Optional[ProgressCallback] = None, wait: float = DEFAULT_DOWNLOAD_WAIT, timeout: float = DEFAULT_DOWNLOAD_TIMEOUT, **kwargs: Unpack[DownloadConf], ): """ Download all using parent (base plugin) method """ return super(HTTPDownload, self).download_all( products, auth=auth, downloaded_callback=downloaded_callback, progress_callback=progress_callback, wait=wait, timeout=timeout, **kwargs, )