Utils

Contents

Utils#

This section provides an overview of the utility functions and classes available in the eodag library. These utilities are designed to assist with various tasks such as logging, handling callbacks, performing free text searches, working with Jupyter notebooks, interacting with S3 storage, and processing xarray data. Each subsection below details the specific utilities and their usage.

Logging#

eodag.utils.logging.get_logging_verbose()[source]#

Get logging verbose level

>>> from eodag import setup_logging
>>> get_logging_verbose()
>>> setup_logging(verbose=0)
>>> get_logging_verbose()
0
>>> setup_logging(verbose=1)
>>> get_logging_verbose()
1
>>> setup_logging(verbose=2)
>>> get_logging_verbose()
2
>>> setup_logging(verbose=3)
>>> get_logging_verbose()
3
Return type:

int | None

Returns:

Verbose level in [0, 1, 2, 3] or None if not set

eodag.utils.logging.setup_logging(verbose, no_progress_bar=False)[source]#

Define logging level

Parameters:
  • verbose (int) –

    Accepted values:

    • 0: no logging with muted progress bars

    • 1: no logging but still displays progress bars

    • 2: INFO level

    • 3: DEBUG level

  • no_progress_bar (bool, default: False) – (optional) Disable progress bars

Return type:

None

Callbacks#

class eodag.utils.DownloadedCallback[source]#

Example class for callback after each download in download_all()

__call__(product)[source]#

Callback

Parameters:

product (EOProduct) – The downloaded EO product

Return type:

None

eodag.utils.ProgressCallback(*args, **kwargs)[source]#

A callable used to render progress to users for long running processes.

It inherits from tqdm.auto.tqdm, and accepts the same arguments on instantiation: iterable, desc, total, leave, file, ncols, mininterval, maxinterval, miniters, ascii, disable, unit, unit_scale, dynamic_ncols, smoothing, bar_format, initial, position, postfix, unit_divisor.

It can be globally disabled using eodag.utils.logging.setup_logging(0) or eodag.utils.logging.setup_logging(level, no_progress_bar=True), and individually disabled using disable=True.

Parameters:

Dates#

eodag.rest.dates methods that must be importable without eodag[server] installeds

eodag.utils.dates.datetime_range(start, end)[source]#

Generator function for all dates in-between start and end date.

Parameters:
Return type:

Iterator[datetime]

eodag.utils.dates.get_date(date)[source]#

Check if the input date can be parsed as a date

Examples:
>>> from eodag.utils.exceptions import ValidationError
>>> get_date("2023-09-23")
'2023-09-23T00:00:00'
>>> get_date(None) is None
True
>>> get_date("invalid-date")  
Traceback (most recent call last):
    ...
ValidationError
Parameters:

date (str | None)

Return type:

str | None

eodag.utils.dates.get_datetime(arguments)[source]#

Get start and end dates from a dict containing / separated dates in datetime item

Parameters:

arguments (dict[str, Any]) – dict containing a single date or / separated dates in datetime item

Return type:

tuple[str | None, str | None]

Returns:

Start date and end date from datetime string (duplicate value if only one date as input)

Examples:
>>> get_datetime({"datetime": "2023-03-01/2023-03-31"})
('2023-03-01T00:00:00', '2023-03-31T00:00:00')
>>> get_datetime({"datetime": "2023-03-01"})
('2023-03-01T00:00:00', '2023-03-01T00:00:00')
>>> get_datetime({"datetime": "../2023-03-31"})
(None, '2023-03-31T00:00:00')
>>> get_datetime({"datetime": "2023-03-01/.."})
('2023-03-01T00:00:00', None)
>>> get_datetime({"dtstart": "2023-03-01", "dtend": "2023-03-31"})
('2023-03-01T00:00:00', '2023-03-31T00:00:00')
>>> get_datetime({})
(None, None)
eodag.utils.dates.get_timestamp(date_time)[source]#

Return the Unix timestamp of an ISO8601 date/datetime in seconds.

If the datetime has no offset, it is assumed to be an UTC datetime.

Parameters:

date_time (str) – The datetime string to return as timestamp

Return type:

float

Returns:

The timestamp corresponding to the date_time string in seconds

Examples:
>>> get_timestamp("2023-09-23T12:34:56Z")  
1695472496.0
>>> get_timestamp("2023-09-23T12:34:56+02:00")  
1695465296.0
>>> get_timestamp("2023-09-23")  
1695427200.0
eodag.utils.dates.is_range_in_range(valid_range, check_range)[source]#

Check if the check_range is completely within the valid_range.

This function checks if both the start and end dates of the check_range are within the start and end dates of the valid_range.

Parameters:
  • valid_range (str) – The valid date range in the format ‘YYYY-MM-DD/YYYY-MM-DD’.

  • check_range (str) – The date range to check in the format ‘YYYY-MM-DD/YYYY-MM-DD’.

Return type:

bool

Returns:

True if check_range is within valid_range, otherwise False.

Examples:
>>> is_range_in_range("2023-01-01/2023-12-31", "2023-03-01/2023-03-31")
True
>>> is_range_in_range("2023-01-01/2023-12-31", "2022-12-01/2023-03-31")
False
>>> is_range_in_range("2023-01-01/2023-12-31", "2023-11-01/2024-01-01")
False
>>> is_range_in_range("2023-01-01/2023-12-31", "invalid-range")
False
>>> is_range_in_range("invalid-range", "2023-03-01/2023-03-31")
False
eodag.utils.dates.rfc3339_str_to_datetime(s)[source]#

Convert a string conforming to RFC 3339 to a datetime.datetime.

Parameters:

s (str) – The string to convert to datetime.datetime

Return type:

datetime

Returns:

The datetime represented by the ISO8601 (RFC 3339) formatted string

Raises:

ValidationError

Examples:
>>> from eodag.utils.exceptions import ValidationError
>>> rfc3339_str_to_datetime("2023-09-23T12:34:56Z")
datetime.datetime(2023, 9, 23, 12, 34, 56, tzinfo=datetime.timezone.utc)
>>> rfc3339_str_to_datetime("invalid-date")  
Traceback (most recent call last):
    ...
ValidationError

Notebook#

class eodag.utils.notebook.NotebookWidgets[source]#

Display / handle ipython widgets

clear_html()[source]#

Clear HTML message

Return type:

None

display_html(html_value)[source]#

Display HTML message

Parameters:

html_value (str)

Return type:

None

eodag.utils.notebook.check_ipython()[source]#

Check if called from ipython

Return type:

bool

eodag.utils.notebook.check_notebook()[source]#

Check if called from a notebook

Return type:

bool

S3#

class eodag.utils.s3.S3FileInfo(size, key, bucket_name, zip_filepath=None, data_start_offset=0, data_type='application/octet-stream', rel_path=None, file_start_offset=0, futures=<factory>, buffers=<factory>, next_yield=0)[source]#

Describe a S3 object with basic f_info and its download state.

Parameters:
  • size (int)

  • key (str)

  • bucket_name (str)

  • zip_filepath (str | None, default: None)

  • data_start_offset (int, default: 0)

  • data_type (str, default: 'application/octet-stream')

  • rel_path (str | None, default: None)

  • file_start_offset (int, default: 0)

  • futures (dict, default: <factory>)

  • buffers (dict[int, bytes], default: <factory>)

  • next_yield (int, default: 0)

buffers: dict[int, bytes]#

Buffers for downloaded data chunks, mapping start byte offsets to the actual data. This allows for partial downloads and efficient memory usage. The key is the start byte offset, and the value is the bytes data for that offset. This is used to yield data in the correct order during streaming. It is updated as chunks are downloaded.

data_start_offset: int = 0#

Offset in the ZIP archive where the file data starts.

data_type: str = 'application/octet-stream'#

MIME type of the file, defaulting to application/octet-stream. It can be updated based on the file extension or content type.

file_start_offset: int = 0#

Offset in the logical (global) file stream where this file starts.

futures: dict#

Mapping of futures to their start byte offsets, used to track download progress. Each future corresponds to a chunk of data being downloaded. The key is the future object, and the value is the start byte offset of that chunk in the logical file stream.

next_yield: int = 0#

The next offset to yield in the file, used to track progress during downloading and yielding chunks. It starts at 0 and is updated as data is yielded. This allows the streaming process to continue from where it left off, ensuring that all data is eventually yielded without duplication.

rel_path: str | None = None#

Relative path of the file, if applicable (e.g., inside a ZIP archive).

zip_filepath: str | None = None#

Path inside the ZIP archive if the file is stored inside a ZIP.

eodag.utils.s3.fetch_range(bucket_name, key_name, start, end, client_s3)[source]#

Range-fetches a S3 key.

Parameters:
  • bucket_name (str) – Bucket name of the object to fetch

  • key_name (str) – Key name of the object to fetch

  • start (int) – Start byte position to fetch

  • end (int) – End byte position to fetch

  • client_s3 (S3Client) – s3 client used to fetch the object

Return type:

bytes

Returns:

Object bytes

eodag.utils.s3.file_position_from_s3_zip(s3_bucket, object_key, s3_client, target_filepath)[source]#

Get the start position and size of a specific file inside a ZIP archive stored in S3. This function assumes the file is uncompressed (ZIP_STORED).

The returned tuple contains:

  • file_data_start: The byte offset where the file data starts in the ZIP archive.

  • file_size: The size of the file in bytes.

Parameters:
  • s3_bucket (str) – The S3 bucket name.

  • object_key (str) – The S3 object key for the ZIP file.

  • s3_client – The Boto3 S3 client.

  • target_filepath (str) – The file path inside the ZIP archive to locate.

Return type:

tuple[int, int]

Returns:

A tuple (file_data_start, file_size)

Raises:
eodag.utils.s3.list_files_in_s3_zipped_object(bucket_name, key_name, s3_client)[source]#

List files in s3 zipped object, without downloading it.

See https://stackoverflow.com/questions/41789176/how-to-count-files-inside-zip-in-aws-s3-without-downloading-it; Based on https://stackoverflow.com/questions/51351000/read-zip-files-from-s3-without-downloading-the-entire-file

Parameters:
  • bucket_name (str) – Bucket name of the object to fetch

  • key_name (str) – Key name of the object to fetch

  • s3_resource – s3 resource used to fetch the object

  • s3_client (S3Client)

Return type:

list[ZipInfo]

Returns:

List of files in zip

eodag.utils.s3.open_s3_zipped_object(bucket_name, key_name, s3_client, zip_size=None, partial=True)[source]#

Fetches the central directory and EOCD (End Of Central Directory) from an S3 object and opens a ZipFile in memory.

This function retrieves the ZIP file’s central directory and EOCD by performing range requests on the S3 object. It supports partial fetching (only the central directory and EOCD) for efficiency, or full ZIP download if needed.

Parameters:
  • bucket_name (str) – Name of the S3 bucket containing the ZIP file.

  • key_name (str) – Key (path) of the ZIP file in the S3 bucket.

  • s3_client – S3 client instance used to perform range requests.

  • zip_size (int | None, default: None) – Size of the ZIP file in bytes. If None, it will be determined via a HEAD request.

  • partial (bool, default: True) – If True, only fetch the central directory and EOCD. If False, fetch the entire ZIP file.

Return type:

tuple[ZipFile, bytes]

Returns:

Tuple containing the opened ZipFile object and the central directory bytes.

Raises:

InvalidDataError – If the EOCD signature is not found in the last 64KB of the file.

eodag.utils.s3.stream_download_from_s3(s3_client, files_info, byte_range=(None, None), compress='auto', zip_filename='archive', range_size=8388608, max_workers=8)[source]#

Stream data from one or more S3 objects in chunks, with support for global byte ranges.

This function provides efficient streaming download of S3 objects with support for:

  • Single file streaming with direct MIME type detection

  • Multiple file streaming as ZIP archives

  • Byte range requests for partial content

  • Files within ZIP archives (using .zip! notation)

  • Concurrent chunk downloading for improved performance

  • Memory-efficient streaming without loading entire files

The response format depends on the compress parameter and number of files:

  • Single file + compress="raw" or "auto": streams file directly with detected MIME type

  • Multiple files + compress="zip" or "auto": creates ZIP archive containing all files

  • compress="zip": always creates ZIP archive regardless of file count

For files stored within ZIP archives, use the .zip! notation in the S3FileInfo.key: "path/to/archive.zip!internal/file.txt"

Parameters:
  • s3_client (S3Client) – Boto3 S3 client instance for making requests

  • files_info (list[S3FileInfo]) – List of S3FileInfo objects describing files to download. Each object must contain at minimum: bucket_name, key, and size. Optional fields include: data_type, rel_path, zip_filepath.

  • byte_range (tuple[int | None, int | None], default: (None, None)) – Global byte range to download as (start, end) tuple. None values indicate open-ended ranges. Applied across the logical concatenation of all files.

  • compress (Literal['zip', 'raw', 'auto'], default: 'auto') –

    Output format control:

    • "zip": Always create ZIP archive

    • "raw": Stream files directly (single) or as multipart (multiple)

    • "auto": ZIP for multiple files, raw for single file

  • zip_filename (str, default: 'archive') – Base filename for ZIP archives (without .zip extension). Only used when creating ZIP archives.

  • range_size (int, default: 8388608) – Size of each download chunk in bytes. Larger chunks reduce request overhead but use more memory. Default: 8MB.

  • max_workers (int, default: 8) – Maximum number of concurrent download threads. Higher values improve throughput for multiple ranges.

Returns:

StreamResponse object containing:

  • content: Iterator of bytes for the streaming response

  • media_type: MIME type ("application/zip" for archives, detected type for single files)

  • headers: HTTP headers including Content-Disposition for downloads

Return type:

StreamResponse

Raises:

Example usage:

import boto3
from eodag.utils.s3 import stream_download_from_s3, S3FileInfo

# Create S3 client
s3_client = boto3.client('s3')

# Single file download
files = [S3FileInfo(bucket_name="bucket", key="file.txt", size=1024)]
response = stream_download_from_s3(s3_client, files)

# Multiple files as ZIP archive
files = [
    S3FileInfo(bucket_name="bucket", key="file1.txt", size=1024),
    S3FileInfo(bucket_name="bucket", key="file2.txt", size=2048)
]
response = stream_download_from_s3(s3_client, files, compress="zip")

# File within ZIP archive
files = [S3FileInfo(
    bucket_name="bucket",
    key="archive.zip!internal.txt",
    size=512
)]
response = stream_download_from_s3(s3_client, files)

# Process streaming response
for chunk in response.content:
    # Handle chunk data
    pass
eodag.utils.s3.update_assets_from_s3(product, auth, s3_endpoint=None, content_url=None)[source]#

Update EOProduct.assets using content listed in its remote_location or given content_url.

If url points to a zipped archive, its content will also be be listed.

Parameters:
  • product (EOProduct) – product to update

  • auth (AwsAuth) – Authentication plugin

  • s3_endpoint (str | None, default: None) – s3 endpoint if not hosted on AWS

  • content_url (str | None, default: None) – s3 URL pointing to the content that must be listed (defaults to product.remote_location if empty)

Return type:

None

xarray#

Warning

These functions will only be available with eodag-cube installed.

Xarray-related utilities

eodag_cube.utils.xarray.guess_engines(file)[source]#

Guess matching xarray engines for fsspec fsspec.core.OpenFile

Parameters:

file (OpenFile) – fsspec https OpenFile

Return type:

list[str]

Returns:

engines list

eodag_cube.utils.xarray.try_open_dataset(file, **xarray_kwargs)[source]#

Try opening xarray dataset from fsspec OpenFile

Parameters:
Return type:

Dataset

Returns:

opened xarray dataset

Misc#

Miscellaneous utilities to be used throughout eodag.

Everything that does not fit into one of the specialised categories of utilities in this package should go here

class eodag.utils.FloatRange(min=None, max=None)[source]#

A parameter that works similar to click.FLOAT but restricts the value to fit into a range. Fails if the value doesn’t fit into the range.

Parameters:
convert(value, param, ctx)[source]#

Convert value

Parameters:
Return type:

Any

name: str = 'percentage'#

the descriptive name of this type

class eodag.utils.MockResponse(json_data=None, status_code=200, headers=None)[source]#

Fake requests response

Parameters:
json()[source]#

Return json data

Return type:

Any

raise_for_status()[source]#

raises an exception when the status is not ok

Return type:

None

class eodag.utils.StreamResponse(content, filename=None, size=None, headers=None, media_type=None, status_code=None, arcname=None)[source]#

Represents a streaming response

Parameters:
property filename: str | None#

Get the filename for the streaming response.

Returns:

The filename, or None if not set

property size: int | None#

Get the content size for the streaming response.

Returns:

The content size in bytes, or None if not set

eodag.utils.cached_parse(str_to_parse)[source]#

Cached jsonpath_ng.ext.parse()

>>> cached_parse.cache_clear()
>>> cached_parse("$.foo")
Child(Root(), Fields('foo'))
>>> cached_parse.cache_info()
CacheInfo(hits=0, misses=1, maxsize=128, currsize=1)
>>> cached_parse("$.foo")
Child(Root(), Fields('foo'))
>>> cached_parse.cache_info()
CacheInfo(hits=1, misses=1, maxsize=128, currsize=1)
>>> cached_parse("$.bar")
Child(Root(), Fields('bar'))
>>> cached_parse.cache_info()
CacheInfo(hits=1, misses=2, maxsize=128, currsize=2)
Parameters:

str_to_parse (str) – string to parse as jsonpath_ng.JSONPath

Return type:

JSONPath

Returns:

parsed jsonpath_ng.JSONPath

eodag.utils.cached_yaml_load(config_path)[source]#

Cached yaml.load()

Parameters:

config_path (str) – path to the yaml configuration file

Return type:

dict[str, Any]

Returns:

loaded yaml configuration

eodag.utils.cached_yaml_load_all(config_path)[source]#

Cached yaml.load_all()

Load all configurations stored in the configuration file as separated yaml documents

Parameters:

config_path (str) – path to the yaml configuration file

Return type:

list[Any]

Returns:

list of configurations

eodag.utils.cast_scalar_value(value, new_type)[source]#

Convert a scalar (not nested) value type to the given one

>>> cast_scalar_value('1', int)
1
>>> cast_scalar_value(1, str)
'1'
>>> cast_scalar_value('false', bool)
False
Parameters:
  • value (Any) – the scalar value to convert

  • new_type (Any) – the wanted type

Return type:

Any

Returns:

scalar value converted to new_type

eodag.utils.deepcopy(sth)[source]#

Customized and faster deepcopy inspired by https://stackoverflow.com/a/45858907

_copy_list and _copy_dict dispatchers available for the moment

Parameters:

sth (Any) – Object to copy

Return type:

Any

Returns:

Copied object

eodag.utils.dict_items_recursive_apply(config_dict, apply_method, **apply_method_parameters)[source]#

Recursive apply method to dict elements

>>> dict_items_recursive_apply(
...     {"foo": {"bar": "baz"}, "qux": ["a", "b"]},
...     lambda k, v, x: v.upper() + x, **{"x": "!"}
... ) == {'foo': {'bar': 'BAZ!'}, 'qux': ['A!', 'B!']}
True
Parameters:
  • config_dict (dict[Any, Any]) – Input nested dictionary

  • apply_method (Callable[..., Any]) – Method to be applied to dict elements

  • apply_method_parameters (Any) – Optional parameters passed to the method

Return type:

dict[Any, Any]

Returns:

Updated dict

eodag.utils.dict_items_recursive_sort(config_dict)[source]#

Recursive sort dict elements

>>> dict_items_recursive_sort(
...     {"b": {"b": "c", "a": 0}, "a": ["b", {2: 0, 0: 1, 1: 2}]},
... ) == {"a": ["b", {0: 1, 1: 2, 2: 0}], "b": {"a": 0, "b": "c"}}
True
Parameters:

config_dict (dict[Any, Any]) – Input nested dictionary

Return type:

dict[Any, Any]

Returns:

Updated dict

eodag.utils.dict_md5sum(input_dict)[source]#

Hash nested dictionary

Parameters:

input_dict (dict[str, Any]) – input dict

Return type:

str

Returns:

hash

>>> hd = dict_md5sum({"b": {"c": 1, "a": 2, "b": 3}, "a": 4})
>>> hd
'a195bcef1bb3b419e9e74b7cc5db8098'
>>> assert(dict_md5sum({"a": 4, "b": {"b": 3, "c": 1, "a": 2}}) == hd)
eodag.utils.flatten_top_directories(nested_dir_root, common_subdirs_path=None)[source]#

Flatten directory structure, removing common empty sub-directories

Parameters:
  • nested_dir_root (str) – Absolute path of the directory structure to flatten

  • common_subdirs_path (str | None, default: None) – (optional) Absolute path of the desired subdirectory to remove

Return type:

None

eodag.utils.format_dict_items(config_dict, **format_variables)[source]#

Recursively apply str.format() to **format_variables on config_dict values

>>> format_dict_items(
...     {"foo": {"bar": "{a}"}, "baz": ["{b}?", "{b}!"]},
...     **{"a": "qux", "b": "quux"},
... ) == {"foo": {"bar": "qux"}, "baz": ["quux?", "quux!"]}
True
Parameters:
  • config_dict (dict[str, Any]) – Dictionary having values that need to be parsed

  • format_variables (Any) – Variables used as args for parsing

Return type:

dict[Any, Any]

Returns:

Updated dict

eodag.utils.format_pydantic_error(e)[source]#

Format Pydantic ValidationError

Parameters:

e (PydanticValidationError) – A Pydantic ValidationError object

Return type:

str

eodag.utils.format_string(key, str_to_format, **format_variables)[source]#

Format "{foo}"-like string

>>> format_string(None, "foo {bar}, {baz} ?", **{"bar": "qux", "baz": "quux"})
'foo qux, quux ?'
Parameters:
  • key (str) – Input item key

  • str_to_format (Any) – Input item value, to be parsed

  • format_variables (Any)

Return type:

Any

Returns:

Parsed value

eodag.utils.get_bucket_name_and_prefix(url, bucket_path_level=None)[source]#

Extract bucket name and prefix from URL

Parameters:
  • url (str) – (optional) URL to use as product.location

  • bucket_path_level (int | None, default: None) – (optional) bucket location index in path.split(‘/’)

Return type:

tuple[str | None, str | None]

Returns:

bucket_name and prefix as str

eodag.utils.get_geometry_from_various(locations_config=[], **query_args)[source]#

Creates a shapely.geometry using given query kwargs arguments

Parameters:
  • locations_config (list[dict[str, Any]], default: []) – (optional) EODAG locations configuration

  • query_args (Any) – Query kwargs arguments from search()

Return type:

BaseGeometry

Returns:

shapely Geometry found

Raises:

ValueError

eodag.utils.get_ssl_context(ssl_verify)[source]#

Returns an SSL context based on ssl_verify argument.

Parameters:

ssl_verify (bool) – ssl_verify parameter

Return type:

SSLContext

Returns:

An SSL context object.

eodag.utils.guess_extension(type)[source]#

Guess extension from mime type, using eodag extended mimetypes definition

>>> guess_extension('image/tiff')
'.tiff'
>>> guess_extension('application/x-grib')
'.grib'
Parameters:

type (str) – mime type

Return type:

str | None

Returns:

guessed file extension

eodag.utils.guess_file_type(file)[source]#

Guess the mime type of a file or URL based on its extension, using eodag extended mimetypes definition

>>> guess_file_type('foo.tiff')
'image/tiff'
>>> guess_file_type('foo.grib')
'application/x-grib'
Parameters:

file (str) – file url or path

Return type:

str | None

Returns:

guessed mime type

eodag.utils.items_recursive_apply(input_obj, apply_method, **apply_method_parameters)[source]#

Recursive apply method to items contained in input object (dict or list)

>>> items_recursive_apply(
...     {"foo": {"bar":"baz"}, "qux": ["a","b"]},
...     lambda k,v,x: v.upper()+x, **{"x":"!"}
... ) == {'foo': {'bar': 'BAZ!'}, 'qux': ['A!', 'B!']}
True
>>> items_recursive_apply(
...     [{"foo": {"bar":"baz"}}, "qux"],
...     lambda k,v,x: v.upper()+x,
...     **{"x":"!"})
[{'foo': {'bar': 'BAZ!'}}, 'QUX!']
>>> items_recursive_apply(
...     "foo",
...     lambda k,v,x: v.upper()+x,
...     **{"x":"!"})
'foo'
Parameters:
  • input_obj (dict[Any, Any] | list[Any]) – Input object (dict or list)

  • apply_method (Callable[..., Any]) – Method to be applied to dict elements

  • apply_method_parameters (Any) – Optional parameters passed to the method

Return type:

dict[Any, Any] | list[Any]

Returns:

Updated object

eodag.utils.items_recursive_sort(input_obj)[source]#

Recursive sort dict items contained in input object (dict or list)

>>> items_recursive_sort(
...     {"b": {"b": "c", "a": 0}, "a": ["b", {2: 0, 0: 1, 1: 2}]},
... ) == {"a": ["b", {0: 1, 1: 2, 2: 0}], "b": {"a": 0, "b": "c"}}
True
>>> items_recursive_sort(["b", {2: 0, 0: 1, 1:2}])
['b', {0: 1, 1: 2, 2: 0}]
>>> items_recursive_sort("foo")
'foo'
Parameters:

input_obj (list[Any] | dict[Any, Any]) – Input object (dict or list)

Return type:

list[Any] | dict[Any, Any]

Returns:

Updated object

eodag.utils.jsonpath_parse_dict_items(jsonpath_dict, values_dict)[source]#

Recursively parse jsonpath_ng.JSONPath elements in dict

>>> import jsonpath_ng.ext as jsonpath
>>> jsonpath_parse_dict_items(
...     {"foo": {"bar": parse("$.a.b")}, "qux": [parse("$.c"), parse("$.c")]},
...     {"a":{"b":"baz"}, "c":"quux"}
... ) == {'foo': {'bar': 'baz'}, 'qux': ['quux', 'quux']}
True
Parameters:
  • jsonpath_dict (dict[str, Any]) – Dictionary having jsonpath_ng.JSONPath values that need to be parsed

  • values_dict (dict[str, Any]) – Values dict used as args for parsing

Return type:

dict[Any, Any]

Returns:

Updated dict

eodag.utils.list_items_recursive_apply(config_list, apply_method, **apply_method_parameters)[source]#

Recursive apply method to list elements

>>> list_items_recursive_apply(
...     [{"foo": {"bar": "baz"}}, "qux"],
...     lambda k, v, x: v.upper() + x,
...     **{"x": "!"})
[{'foo': {'bar': 'BAZ!'}}, 'QUX!']
Parameters:
  • config_list (list[Any]) – Input list containing nested lists/dicts

  • apply_method (Callable[..., Any]) – Method to be applied to list elements

  • apply_method_parameters (Any) – Optional parameters passed to the method

Return type:

list[Any]

Returns:

Updated list

eodag.utils.list_items_recursive_sort(config_list)[source]#

Recursive sort dicts in list elements

>>> list_items_recursive_sort(["b", {2: 0, 0: 1, 1: 2}])
['b', {0: 1, 1: 2, 2: 0}]
Parameters:

config_list (list[Any]) – Input list containing nested lists/dicts

Return type:

list[Any]

Returns:

Updated list

eodag.utils.makedirs(dirpath)[source]#

Create a directory in filesystem with parents if necessary

Parameters:

dirpath (str)

Return type:

None

eodag.utils.maybe_generator(obj)[source]#

Generator function that get an arbitrary object and generate values from it if the object is a generator.

Parameters:

obj (Any)

Return type:

Iterator[Any]

eodag.utils.md5sum(file_path)[source]#

Get file MD5 checksum

>>> import os
>>> md5sum(os.devnull)
'd41d8cd98f00b204e9800998ecf8427e'
Parameters:

file_path (str) – input file path

Return type:

str

Returns:

MD5 checksum

eodag.utils.merge_mappings(mapping1, mapping2)[source]#

Merge two mappings with string keys, values from mapping2 overriding values from mapping1.

Do its best to detect the key in mapping1 to override. For example:

>>> mapping2 = {"keya": "new"}
>>> mapping1 = {"keyA": "obsolete"}
>>> merge_mappings(mapping1, mapping2)
>>> mapping1
{'keyA': 'new'}

If mapping2 has a key that cannot be detected in mapping1, this new key is added to mapping1 as is.

Parameters:
  • mapping1 (dict[Any, Any]) – The mapping containing values to be overridden

  • mapping2 (dict[Any, Any]) – The mapping containing values that will override the first mapping

Return type:

None

eodag.utils.mutate_dict_in_place(func, mapping)[source]#

Apply func to values of mapping.

The mapping object’s values are modified in-place. The function is recursive, allowing to also modify values of nested dicts that may be level-1 values of mapping.

Parameters:
  • func (Callable[[Any], Any]) – A function to apply to each value of mapping which is not a dict object

  • mapping (dict[Any, Any]) – A Python dict object

Return type:

None

Returns:

None

eodag.utils.nested_pairs2dict(pairs)[source]#

Create a dict using nested pairs

>>> nested_pairs2dict([["foo", [["bar", "baz"]]]])
{'foo': {'bar': 'baz'}}
Parameters:

pairs (list[Any] | Any) – Pairs of key / value

Return type:

Any | dict[Any, Any]

Returns:

Created dict

eodag.utils.obj_md5sum(data)[source]#

Get MD5 checksum from JSON serializable object

>>> obj_md5sum(None)
'37a6259cc0c1dae299a7866489dff0bd'
Parameters:

data (Any) – JSON serializable input object

Return type:

str

Returns:

MD5 checksum

eodag.utils.parse_header(header)[source]#

Parse HTTP header

>>> parse_header(
...     'Content-Disposition: form-data; name="field2"; filename="example.txt"'
... ).get_param("filename")
'example.txt'
Parameters:

header (str) – header to parse

Return type:

Message

Returns:

parsed header

eodag.utils.parse_jsonpath(key, jsonpath_obj, **values_dict)[source]#

Parse jsonpah in jsonpath_obj using values_dict

>>> import jsonpath_ng.ext as jsonpath
>>> parse_jsonpath(None, parse("$.foo.bar"), **{"foo": {"bar": "baz"}})
'baz'
Parameters:
  • key (str) – Input item key

  • jsonpath_obj (str | Child) – Input item value, to be parsed

  • values_dict (dict[str, Any]) – Values used as args for parsing

Return type:

str | None

Returns:

Parsed value

eodag.utils.parse_le_uint16(data)[source]#

Parse little-endian unsigned 2-byte integer.

>>> parse_le_uint16(b'\x01\x00')
1
>>> parse_le_uint16(b'\xff\xff')
65535
Parameters:

data (bytes)

Return type:

int

eodag.utils.parse_le_uint32(data)[source]#

Parse little-endian unsigned 4-byte integer.

>>> parse_le_uint32(b'\x01\x00\x00\x00')
1
>>> parse_le_uint32(b'\xff\xff\xff\xff')
4294967295
Parameters:

data (bytes)

Return type:

int

eodag.utils.path_to_uri(path)[source]#

Convert a local absolute path to a file URI

Parameters:

path (str)

Return type:

str

eodag.utils.remove_str_array_quotes(input_str)[source]#

Remove quotes around arrays to avoid json parsing errors

Parameters:

input_str (str) – string to format

Return type:

str

Returns:

string without quotes surrounding array brackets

>>> remove_str_array_quotes('"a":"["a", "b"]"')
'"a":["a", "b"]'
>>> remove_str_array_quotes('{"a":"["a", "b"]", "b": ["c", "d"]}')
'{"a":["a", "b"], "b": ["c", "d"]}'
eodag.utils.rename_subfolder(dirpath, name)[source]#

Rename first subfolder found in dirpath with given name, raise RuntimeError if no subfolder can be found

Parameters:
  • dirpath (str) – path to the directory containing the subfolder

  • name (str) – new name of the subfolder

Raises:

RuntimeError

Return type:

None

Example:

>>> import os
>>> import tempfile
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     somefolder = os.path.join(tmpdir, "somefolder")
...     otherfolder = os.path.join(tmpdir, "otherfolder")
...     os.makedirs(somefolder)
...     assert os.path.isdir(somefolder) and not os.path.isdir(otherfolder)
...     rename_subfolder(tmpdir, "otherfolder")
...     assert not os.path.isdir(somefolder) and os.path.isdir(otherfolder)

Before:

$ tree <tmp-folder>
<tmp-folder>
└── somefolder
    └── somefile

After:

$ tree <tmp-folder>
<tmp-folder>
└── otherfolder
    └── somefile
eodag.utils.rename_with_version(file_path, suffix='old')[source]#

Renames a file by appending and incrementing a version number if a conflict exists.

Parameters:
  • file_path (str) – full path of the file to rename

  • suffix (str, default: 'old') – suffix preceding version number in case of name conflict

Return type:

str

Returns:

new file path with the version appended or incremented

Example:

>>> import tempfile
>>> from pathlib import Path
>>> with tempfile.TemporaryDirectory() as tmpdir:
...     file_path = (Path(tmpdir) / "foo.txt")
...     file_path.touch()
...     (Path(tmpdir) / "foo_old1.txt").touch()
...     expected = str(Path(tmpdir) / "foo_old2.txt")
...     assert expected == rename_with_version(str(file_path))
eodag.utils.repeatfunc(func, n, *args)[source]#

Call func n times with args

Parameters:
Return type:

starmap

eodag.utils.sanitize(value)[source]#

Sanitize string to be used as a name of a directory.

>>> sanitize('productName')
'productName'
>>> sanitize('name with multiple  spaces')
'name_with_multiple_spaces'
>>> sanitize('âtre fête île alcôve bûche çà génèse où Noël ovoïde capharnaüm')
'atre_fete_ile_alcove_buche_ca_genese_ou_Noel_ovoide_capharnaum'
>>> sanitize('replace,ponctuation:;signs!?byunderscorekeeping-hyphen.dot_and_underscore')   # noqa
'replace_ponctuation_signs_byunderscorekeeping-hyphen.dot_and_underscore'
Parameters:

value (str)

Return type:

str

eodag.utils.slugify(value, allow_unicode=False)[source]#

Copied from Django Source code, only modifying last line (no need for safe strings).

source: django/django

Convert to ASCII if allow_unicode is False. Convert spaces to hyphens. Remove characters that aren’t alphanumerics, underscores, or hyphens. Convert to lowercase. Also strip leading and trailing whitespace.

Parameters:
  • value (Any)

  • allow_unicode (bool, default: False)

Return type:

str

eodag.utils.sort_dict(input_dict)[source]#

Recursively sorts a dict by keys.

Parameters:

input_dict (dict[str, Any]) – input dict

Return type:

dict[str, Any]

Returns:

sorted dict

>>> sort_dict({"b": {"c": 1, "a": 2, "b": 3}, "a": 4})
{'a': 4, 'b': {'a': 2, 'b': 3, 'c': 1}}
eodag.utils.string_to_jsonpath(*args, force=False)[source]#

Get jsonpath_ng.JSONPath for $.foo.bar like string

>>> string_to_jsonpath(None, "$.foo.bar")
Child(Child(Root(), Fields('foo')), Fields('bar'))
>>> string_to_jsonpath("$.foo.bar")
Child(Child(Root(), Fields('foo')), Fields('bar'))
>>> string_to_jsonpath('$.foo[0][*]')
Child(Child(Child(Root(), Fields('foo')), Index(index=0)), Slice(start=None,end=None,step=None))
>>> string_to_jsonpath("foo")
'foo'
>>> string_to_jsonpath("foo", force=True)
Fields('foo')
Parameters:
  • args (Any) – Last arg as input string value, to be converted

  • force (bool, default: False) – force conversion even if input string is not detected as a jsonpath_ng.JSONPath

Return type:

str | JSONPath

Returns:

Parsed value

eodag.utils.strip_accents(s)[source]#

Strip accents of a string.

>>> strip_accents('productName')
'productName'
>>> strip_accents('génèse')
'genese'
>>> strip_accents('preserve-punct-special-chars:;,?!§%$£œ')
'preserve-punct-special-chars:;,?!§%$£œ'
Parameters:

s (str)

Return type:

str

eodag.utils.update_nested_dict(old_dict, new_dict, extend_list_values=False, allow_empty_values=False, allow_extend_duplicates=True)[source]#

Update recursively old_dict items with new_dict ones

>>> update_nested_dict(
...     {"a": {"a.a": 1, "a.b": 2}, "b": 3},
...     {"a": {"a.a": 10}}
... ) == {'a': {'a.a': 10, 'a.b': 2}, 'b': 3}
True
>>> update_nested_dict(
...     {"a": {"a.a": [1, 2]}},
...     {"a": {"a.a": [10, 2]}},
...     extend_list_values=True,
...     allow_extend_duplicates=True
... ) == {'a': {'a.a': [1, 2, 10, 2]}}
True
>>> update_nested_dict(
...     {"a": {"a.a": [1, 2]}},
...     {"a": {"a.a": [10, 2]}},
...     extend_list_values=True,
...     allow_extend_duplicates=False
... ) == {'a': {'a.a': [1, 2, 10]}}
True
>>> update_nested_dict(
...     {"a": {"a.a": 1, "a.b": 2}, "b": 3},
...     {"a": {"a.a": None}},
... ) == {'a': {'a.a': 1, 'a.b': 2}, 'b': 3}
True
>>> update_nested_dict(
...     {"a": {"a.a": 1, "a.b": 2}, "b": 3},
...     {"a": {"a.a": None}},
...     allow_empty_values=True
... ) == {'a': {'a.a': None, 'a.b': 2}, 'b': 3}
True
Parameters:
  • old_dict (dict[Any, Any]) – Dict to be updated

  • new_dict (dict[Any, Any]) – Incomming dict

  • extend_list_values (bool, default: False) – (optional) Extend old_dict value if both old/new values are lists

  • allow_empty_values (bool, default: False) – (optional) Allow update with empty values

  • allow_extend_duplicates (bool, default: True)

Return type:

dict[Any, Any]

Returns:

Updated dict

eodag.utils.uri_to_path(uri)[source]#

Convert a file URI (e.g. file:///tmp) to a local path (e.g. /tmp)

Parameters:

uri (str)

Return type:

str