# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
# https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import hashlib
import logging
import os
import shutil
import tarfile
import tempfile
import zipfile
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep
from typing import TYPE_CHECKING, Any, Callable, Literal, Optional, TypeVar, Union
from eodag.api.product.metadata_mapping import ONLINE_STATUS
from eodag.plugins.base import PluginTopic
from eodag.utils import (
DEFAULT_DOWNLOAD_TIMEOUT,
DEFAULT_DOWNLOAD_WAIT,
ProgressCallback,
StreamResponse,
sanitize,
uri_to_path,
)
from eodag.utils.exceptions import (
AuthenticationError,
MisconfiguredError,
NotAvailableError,
)
from eodag.utils.notebook import NotebookWidgets
if TYPE_CHECKING:
from mypy_boto3_s3 import S3ServiceResource
from requests.auth import AuthBase
from eodag.api.product import EOProduct
from eodag.api.search_result import SearchResult
from eodag.config import PluginConfig
from eodag.types.download_args import DownloadConf
from eodag.utils import DownloadedCallback, Unpack
logger = logging.getLogger("eodag.download.base")
T = TypeVar("T")
[docs]
class Download(PluginTopic):
"""Base Download Plugin.
A Download plugin has two download methods that it must implement:
- ``download``: download a single :class:`~eodag.api.product._product.EOProduct`
- ``download_all``: download multiple products from a :class:`~eodag.api.search_result.SearchResult`
They must:
- download data in the ``output_dir`` folder defined in the plugin's
configuration or passed through kwargs
- extract products from their archive (if relevant) if ``extract`` is set to ``True``
(``True`` by default)
- save a product in an archive/directory (in ``output_dir``) whose name must be
the product's ``title`` property
- update the product's ``location`` attribute once its data is downloaded (and
eventually after it's extracted) to the product's location given as a file URI
(e.g. ``file:///tmp/product_folder`` on Linux or
``file:///C:/Users/username/AppData/Local/Temp`` on Windows)
- save a *record* file in the directory ``{output_dir}/.downloaded`` whose name
is built on the MD5 hash of the product's ``product_type`` and ``properties['id']``
attributes (``hashlib.md5((product.product_type+"-"+product.properties['id']).encode("utf-8")).hexdigest()``)
and whose content is the product's ``remote_location`` attribute itself.
- not try to download a product whose ``location`` attribute already points to an
existing file/directory
- not try to download a product if its *record* file exists as long as the expected
product's file/directory. If the *record* file only is found, it must be deleted
(it certainly indicates that the download didn't complete)
:param provider: An eodag providers configuration dictionary
:param config: Path to the user configuration file
"""
def __init__(self, provider: str, config: PluginConfig) -> None:
super(Download, self).__init__(provider, config)
self._authenticate = bool(getattr(self.config, "authenticate", False))
[docs]
def download(
self,
product: EOProduct,
auth: Optional[Union[AuthBase, S3ServiceResource]] = None,
progress_callback: Optional[ProgressCallback] = None,
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> Optional[str]:
r"""
Base download method. Not available, it must be defined for each plugin.
:param product: The EO product to download
:param auth: (optional) authenticated object
:param progress_callback: (optional) A progress callback
:param wait: (optional) If download fails, wait time in minutes between two download tries
:param timeout: (optional) If download fails, maximum time in minutes before stop retrying
to download
:param kwargs: `output_dir` (str), `extract` (bool), `delete_archive` (bool)
and `dl_url_params` (dict) can be provided as additional kwargs
and will override any other values defined in a configuration
file or with environment variables.
:returns: The absolute path to the downloaded product in the local filesystem
(e.g. ``/tmp/product.zip`` on Linux or
``C:\\Users\\username\\AppData\\Local\\Temp\\product.zip`` on Windows)
"""
raise NotImplementedError(
"A Download plugin must implement a method named download"
)
def _stream_download_dict(
self,
product: EOProduct,
auth: Optional[Union[AuthBase, S3ServiceResource]] = None,
byte_range: tuple[Optional[int], Optional[int]] = (None, None),
compress: Literal["zip", "raw", "auto"] = "auto",
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> StreamResponse:
r"""
Base _stream_download_dict method. Not available, it must be defined for each plugin.
:param product: The EO product to download
:param auth: (optional) authenticated object
:param progress_callback: (optional) A progress callback
:param wait: (optional) If download fails, wait time in minutes between two download tries
:param timeout: (optional) If download fails, maximum time in minutes before stop retrying
to download
:param kwargs: ``output_dir`` (str), ``extract`` (bool), ``delete_archive`` (bool)
and ``dl_url_params`` (dict) can be provided as additional kwargs
and will override any other values defined in a configuration
file or with environment variables.
:returns: Dictionary of :class:`~fastapi.responses.StreamingResponse` keyword-arguments
"""
raise NotImplementedError(
"Download streaming must be implemented using a method named _stream_download_dict"
)
def _prepare_download(
self,
product: EOProduct,
progress_callback: Optional[ProgressCallback] = None,
**kwargs: Unpack[DownloadConf],
) -> tuple[Optional[str], Optional[str]]:
"""Check if file has already been downloaded, and prepare product download
:param product: The EO product to download
:param progress_callback: (optional) A progress callback
:returns: fs_path, record_filename
"""
if product.location != product.remote_location:
fs_path = uri_to_path(product.location)
# The fs path of a product is either a file (if 'extract' config is False) or a directory
if os.path.isfile(fs_path) or os.path.isdir(fs_path):
logger.info(
f"Product already present on this platform. Identifier: {fs_path}",
)
# Do not download data if we are on site. Instead give back the absolute path to the data
return fs_path, None
url = product.remote_location
if not url:
logger.debug(
f"Unable to get download url for {product}, skipping download",
)
return None, None
logger.info(
f"Download url: {url}",
)
output_dir = (
kwargs.pop("output_dir", None)
or getattr(self.config, "output_dir", tempfile.gettempdir())
or tempfile.gettempdir()
)
output_extension = kwargs.get("output_extension") or getattr(
self.config, "output_extension", ""
)
# Strong asumption made here: all products downloaded will be zip files
# If they are not, the '.zip' extension will be removed when they are downloaded and returned as is
prefix = os.path.abspath(output_dir)
sanitized_title = sanitize(product.properties["title"])
if sanitized_title == product.properties["title"]:
collision_avoidance_suffix = ""
else:
collision_avoidance_suffix = "-" + sanitize(product.properties["id"])
fs_path = os.path.join(
prefix,
f"{sanitize(product.properties['title'])}{collision_avoidance_suffix}{output_extension}",
)
fs_dir_path = (
fs_path.replace(output_extension, "") if output_extension else fs_path
)
download_records_dir = os.path.join(prefix, ".downloaded")
try:
os.makedirs(download_records_dir)
except OSError as exc:
import errno
if exc.errno != errno.EEXIST: # Skip error if dir exists
import traceback as tb
logger.warning(
f"Unable to create records directory. Got:\n{tb.format_exc()}",
)
url_hash = hashlib.md5(url.encode("utf-8")).hexdigest()
old_record_filename = os.path.join(download_records_dir, url_hash)
record_filename = os.path.join(
download_records_dir, self.generate_record_hash(product)
)
if os.path.isfile(old_record_filename):
os.rename(old_record_filename, record_filename)
# path with or without extension
path_obj = Path(fs_path)
matched_paths = list(path_obj.parent.glob(f"{path_obj.stem}.*"))
fs_path_with_ext = matched_paths[0] if matched_paths else fs_path
if (
os.path.isfile(record_filename)
and fs_path_with_ext
and os.path.isfile(fs_path_with_ext)
):
logger.info(
f"Product already downloaded: {fs_path_with_ext}",
)
return (
self._finalize(
str(fs_path_with_ext), progress_callback=progress_callback, **kwargs
),
None,
)
elif os.path.isfile(record_filename) and os.path.isdir(fs_dir_path):
logger.info(
f"Product already downloaded: {fs_dir_path}",
)
return (
self._finalize(
fs_dir_path, progress_callback=progress_callback, **kwargs
),
None,
)
# Remove the record file if fs_path is absent (e.g. it was deleted while record wasn't)
elif os.path.isfile(record_filename):
logger.debug(
f"Record file found ({record_filename}) but not the actual file",
)
logger.debug(
f"Removing record file : {record_filename}",
)
os.remove(record_filename)
return fs_path, record_filename
[docs]
def generate_record_hash(self, product: EOProduct) -> str:
"""Generate the record hash of the given product.
The MD5 hash is built from the product's ``product_type`` and ``properties['id']`` attributes
(``hashlib.md5((product.product_type+"-"+product.properties['id']).encode("utf-8")).hexdigest()``)
:param product: The product to calculate the record hash
:returns: The MD5 hash
"""
# In some unit tests, `product.product_type` is `None` and `product.properties["id"]` is `ìnt`
product_hash = str(product.product_type) + "-" + str(product.properties["id"])
return hashlib.md5(product_hash.encode("utf-8")).hexdigest()
def _resolve_archive_depth(self, product_path: str) -> str:
"""Update product_path using archive_depth from provider configuration.
Handle depth levels in the product archive. For example, if the downloaded archive was
extracted to: ``/top_level/product_base_dir`` and ``archive_depth`` was configured to 2, the product
location will be ``/top_level/product_base_dir``.
WARNING: A strong assumption is made here: there is only one subdirectory per level
:param product_path: The path to the extracted product
:returns: The path to the extracted product with the right depth
"""
archive_depth = getattr(self.config, "archive_depth", 1)
count = 1
while count < archive_depth:
product_path = os.path.join(product_path, os.listdir(product_path)[0])
count += 1
return product_path
def _finalize(
self,
fs_path: str,
progress_callback: Optional[ProgressCallback] = None,
**kwargs: Unpack[DownloadConf],
) -> str:
"""Finalize the download process.
:param fs_path: The path to the local zip archive downloaded or already present
:param progress_callback: (optional) A progress callback
:returns: The absolute path to the product
"""
# progress bar init
if progress_callback is None:
progress_callback = ProgressCallback(
unit="file",
unit_scale=False,
position=2,
)
# one shot progress callback to close after download
close_progress_callback = True
else:
close_progress_callback = False
progress_callback.unit = "file"
progress_callback.unit_scale = False
progress_callback.refresh()
extract = kwargs.pop("extract", None)
extract = (
extract if extract is not None else getattr(self.config, "extract", True)
)
if not extract:
logger.info("Extraction not activated. The product is available as is.")
progress_callback(1, total=1)
return fs_path
delete_archive = kwargs.pop("delete_archive", None)
delete_archive = (
delete_archive
if delete_archive is not None
else getattr(self.config, "delete_archive", True)
)
product_path, _ = os.path.splitext(fs_path)
product_path_exists = os.path.exists(product_path)
if product_path_exists and os.path.isfile(product_path):
logger.info(
f"Remove existing partially downloaded file: {product_path}"
f" ({os.stat(product_path).st_size}/{os.stat(fs_path).st_size})"
)
os.remove(product_path)
elif (
product_path_exists
and os.path.isdir(product_path)
and len(os.listdir(product_path)) == 0
):
logger.info(f"Remove existing empty destination directory: {product_path}")
os.rmdir(product_path)
elif (
product_path_exists
and os.path.isdir(product_path)
and len(os.listdir(product_path)) > 0
):
logger.info(
f"Extraction cancelled, destination directory already exists and is not empty: {product_path}"
)
progress_callback(1, total=1)
return product_path
output_dir = kwargs.pop("output_dir", None) or self.config.output_dir
if not os.path.exists(product_path):
logger.info("Extraction activated")
progress_callback.desc = (
f"Extracting files from {os.path.basename(fs_path)}"
)
progress_callback.refresh()
product_dir = os.path.join(output_dir, product_path)
tmp_dir = tempfile.TemporaryDirectory()
extraction_dir = os.path.join(tmp_dir.name, os.path.basename(product_dir))
if fs_path.endswith(".zip"):
with zipfile.ZipFile(fs_path, "r") as zfile:
fileinfos = zfile.infolist()
progress_callback.reset(total=len(fileinfos))
for fileinfo in fileinfos:
zfile.extract(
fileinfo,
path=extraction_dir,
)
progress_callback(1)
# in some cases, only a lone file is extracted without being in a directory
# then, we create a directory in which we place this file
product_extraction_path = self._resolve_archive_depth(extraction_dir)
if os.path.isfile(product_extraction_path) and not os.path.isdir(
product_dir
):
os.makedirs(product_dir)
shutil.move(product_extraction_path, product_dir)
elif fs_path.endswith(".tar") or fs_path.endswith(".tar.gz"):
with tarfile.open(fs_path, "r") as zfile:
progress_callback.reset(total=1)
zfile.extractall(path=extraction_dir)
progress_callback(1)
# in some cases, only a lone file is extracted without being in a directory
# then, we create a directory in which we place this file
product_extraction_path = self._resolve_archive_depth(extraction_dir)
if os.path.isfile(product_extraction_path) and not os.path.isdir(
product_dir
):
os.makedirs(product_dir)
shutil.move(product_extraction_path, product_dir)
else:
progress_callback(1, total=1)
tmp_dir.cleanup()
if delete_archive and os.path.isfile(fs_path):
logger.info(f"Deleting archive {os.path.basename(fs_path)}")
os.unlink(fs_path)
elif os.path.isfile(fs_path):
logger.info(
f"Archive deletion is deactivated, keeping {os.path.basename(fs_path)}"
)
else:
progress_callback(1, total=1)
# close progress bar if needed
if close_progress_callback:
progress_callback.close()
return product_path
[docs]
def download_all(
self,
products: SearchResult,
auth: Optional[Union[AuthBase, S3ServiceResource]] = None,
downloaded_callback: Optional[DownloadedCallback] = None,
progress_callback: Optional[ProgressCallback] = None,
wait: float = DEFAULT_DOWNLOAD_WAIT,
timeout: float = DEFAULT_DOWNLOAD_TIMEOUT,
**kwargs: Unpack[DownloadConf],
) -> list[str]:
"""
Base download_all method.
This specific implementation uses the :meth:`eodag.plugins.download.base.Download.download` method
implemented by the plugin to **sequentially** attempt to download products.
:param products: Products to download
:param auth: (optional) authenticated object
:param downloaded_callback: (optional) A method or a callable object which takes
as parameter the ``product``. You can use the base class
:class:`~eodag.utils.DownloadedCallback` and override
its ``__call__`` method. Will be called each time a product
finishes downloading
:param progress_callback: (optional) A progress callback
:param wait: (optional) If download fails, wait time in minutes between two download tries
:param timeout: (optional) If download fails, maximum time in minutes before stop retrying
to download
:param kwargs: `output_dir` (str), `extract` (bool), `delete_archive` (bool)
and `dl_url_params` (dict) can be provided as additional kwargs
and will override any other values defined in a configuration
file or with environment variables.
:returns: List of absolute paths to the downloaded products in the local
filesystem (e.g. ``['/tmp/product.zip']`` on Linux or
``['C:\\Users\\username\\AppData\\Local\\Temp\\product.zip']`` on Windows)
"""
# Products are going to be removed one by one from this sequence once
# downloaded.
products = products[:]
paths: list[str] = []
# initiate retry loop
start_time = datetime.now()
stop_time = start_time + timedelta(minutes=timeout)
nb_products = len(products)
retry_count = 0
# another output for notbooks
nb_info = NotebookWidgets()
for product in products:
product.next_try = start_time
# progress bar init
if progress_callback is None:
progress_callback = ProgressCallback(
total=nb_products,
unit="product",
desc="Downloaded products",
unit_scale=False,
)
product_progress_callback = None
else:
product_progress_callback = progress_callback.copy()
progress_callback.reset(total=nb_products)
progress_callback.unit = "product"
progress_callback.desc = "Downloaded products"
progress_callback.unit_scale = False
progress_callback.refresh()
with progress_callback as bar:
while "Loop until all products are download or timeout is reached":
# try downloading each product before retry
for idx, product in enumerate(products):
if datetime.now() >= product.next_try:
products[idx].next_try += timedelta(minutes=wait)
try:
paths.append(
product.download(
progress_callback=product_progress_callback,
wait=wait,
timeout=-1,
**kwargs,
)
)
if downloaded_callback:
downloaded_callback(product)
# product downloaded, to not retry it
products.remove(product)
bar(1)
# reset stop time for next product
stop_time = datetime.now() + timedelta(minutes=timeout)
except NotAvailableError as e:
logger.info(e)
continue
except (AuthenticationError, MisconfiguredError):
logger.exception(
f"Stopped because of credentials problems with provider {self.provider}"
)
raise
except (RuntimeError, Exception):
import traceback as tb
logger.error(
f"A problem occurred during download of product: {product}. "
"Skipping it"
)
logger.debug(f"\n{tb.format_exc()}")
# product skipped, to not retry it
products.remove(product)
if (
len(products) > 0
and datetime.now() < products[0].next_try
and datetime.now() < stop_time
):
wait_seconds = (products[0].next_try - datetime.now()).seconds
retry_count += 1
info_message = (
f"[Retry #{retry_count}, {nb_products - len(products)}/{nb_products} D/L] "
f"Waiting {wait_seconds}s until next download try (retry every {wait}' for {timeout}')"
)
logger.info(info_message)
nb_info.display_html(info_message)
sleep(wait_seconds + 1)
elif len(products) > 0 and datetime.now() >= stop_time:
logger.warning(
f"{len(products)} products could not be downloaded: "
+ str([prod.properties["title"] for prod in products])
)
break
elif len(products) == 0:
break
return paths
def _order_download_retry(
self, product: EOProduct, wait: float, timeout: float
) -> Callable[[Callable[..., T]], Callable[..., T]]:
"""
Order download retry decorator.
Retries the wrapped order_download method after ``wait`` minutes if a
``NotAvailableError`` exception is thrown until ``timeout`` minutes.
:param product: The EO product to download
:param wait: If download fails, wait time in minutes between two download tries
:param timeout: If download fails, maximum time in minutes before stop retrying
to download
:returns: decorator
"""
def decorator(order_download: Callable[..., T]) -> Callable[..., T]:
def download_and_retry(*args: Any, **kwargs: Unpack[DownloadConf]) -> T:
# initiate retry loop
start_time = datetime.now()
stop_time = start_time + timedelta(minutes=timeout)
product.next_try = start_time
retry_count = 0
not_available_info = "The product could not be downloaded"
# another output for notebooks
nb_info = NotebookWidgets()
while "Loop until products download succeeds or timeout is reached":
datetime_now = datetime.now()
if datetime_now >= product.next_try:
product.next_try += timedelta(minutes=wait)
try:
download = order_download(*args, **kwargs)
except NotAvailableError as e:
not_available_info = str(e)
else:
if (
product.properties.get("storageStatus", ONLINE_STATUS)
== ONLINE_STATUS
) or timeout <= 0:
return download
if not getattr(self.config, "order_enabled", False):
raise NotAvailableError(
f"Product is not available for download and order is not supported for"
f" {self.provider}, {not_available_info}"
)
if datetime_now >= product.next_try and datetime_now < stop_time:
wait_seconds: Union[float, int] = (
datetime_now - product.next_try + timedelta(minutes=wait)
).seconds
retry_count += 1
retry_info = (
f"[Retry #{retry_count}] Waited {wait_seconds}s, checking order status again"
f" (retry every {wait}' for {timeout}')"
)
logger.info(not_available_info)
# Retry-After info from Response header
if hasattr(self, "stream"):
retry_server_info = self.stream.headers.get(
"Retry-After", ""
)
if retry_server_info:
logger.debug(
f"[{self.provider} response] Retry-After: {retry_server_info}"
)
logger.debug(retry_info)
nb_info.display_html(retry_info)
product.next_try = datetime_now
elif datetime_now < product.next_try and datetime_now < stop_time:
wait_seconds = (product.next_try - datetime_now).seconds + (
product.next_try - datetime_now
).microseconds / 1e6
retry_count += 1
retry_info = (
f"[Retry #{retry_count}] Waiting {wait_seconds}s until next order status check"
f" (retry every {wait}' for {timeout}')"
)
logger.info(not_available_info)
# Retry-After info from Response header
if hasattr(self, "stream"):
retry_server_info = self.stream.headers.get(
"Retry-After", ""
)
if retry_server_info:
logger.debug(
f"[{self.provider} response] Retry-After: {retry_server_info}"
)
logger.debug(retry_info)
nb_info.display_html(retry_info)
sleep(wait_seconds)
elif datetime_now >= stop_time and timeout > 0:
if "storageStatus" not in product.properties:
product.properties["storageStatus"] = "N/A status"
logger.info(not_available_info)
raise NotAvailableError(
f"{product.properties['title']} is not available ({product.properties['storageStatus']})"
f" and order was not successfull, timeout reached"
)
elif datetime_now >= stop_time:
raise NotAvailableError(not_available_info)
return order_download(*args, **kwargs)
return download_and_retry
return decorator