Utils#
This section provides an overview of the utility functions and classes available in the eodag library. These utilities are designed to assist with various tasks such as logging, handling callbacks, performing free text searches, working with Jupyter notebooks, interacting with S3 storage, and processing xarray data. Each subsection below details the specific utilities and their usage.
Logging#
- eodag.utils.logging.get_logging_verbose()[source]#
Get logging verbose level
>>> from eodag import setup_logging >>> get_logging_verbose() >>> setup_logging(verbose=0) >>> get_logging_verbose() 0 >>> setup_logging(verbose=1) >>> get_logging_verbose() 1 >>> setup_logging(verbose=2) >>> get_logging_verbose() 2 >>> setup_logging(verbose=3) >>> get_logging_verbose() 3
Callbacks#
- class eodag.utils.DownloadedCallback[source]#
Example class for callback after each download in
download_all()
- eodag.utils.ProgressCallback(*args, **kwargs)[source]#
A callable used to render progress to users for long running processes.
It inherits from
tqdm.auto.tqdm, and accepts the same arguments on instantiation:iterable,desc,total,leave,file,ncols,mininterval,maxinterval,miniters,ascii,disable,unit,unit_scale,dynamic_ncols,smoothing,bar_format,initial,position,postfix,unit_divisor.It can be globally disabled using
eodag.utils.logging.setup_logging(0)oreodag.utils.logging.setup_logging(level, no_progress_bar=True), and individually disabled usingdisable=True.
Dates#
eodag.rest.dates methods that must be importable without eodag[server] installeds
- eodag.utils.dates.datetime_range(start, end)[source]#
Generator function for all dates in-between
startandenddate.
- eodag.utils.dates.get_date(date)[source]#
Check if the input date can be parsed as a date
- Examples:
>>> from eodag.utils.exceptions import ValidationError >>> get_date("2023-09-23") '2023-09-23T00:00:00' >>> get_date(None) is None True >>> get_date("invalid-date") Traceback (most recent call last): ... ValidationError
- eodag.utils.dates.get_datetime(arguments)[source]#
Get start and end dates from a dict containing / separated dates in datetime item
- Parameters:
arguments (
dict[str,Any]) – dict containing a single date or / separated dates in datetime item- Return type:
- Returns:
Start date and end date from datetime string (duplicate value if only one date as input)
- Examples:
>>> get_datetime({"datetime": "2023-03-01/2023-03-31"}) ('2023-03-01T00:00:00', '2023-03-31T00:00:00') >>> get_datetime({"datetime": "2023-03-01"}) ('2023-03-01T00:00:00', '2023-03-01T00:00:00') >>> get_datetime({"datetime": "../2023-03-31"}) (None, '2023-03-31T00:00:00') >>> get_datetime({"datetime": "2023-03-01/.."}) ('2023-03-01T00:00:00', None) >>> get_datetime({"dtstart": "2023-03-01", "dtend": "2023-03-31"}) ('2023-03-01T00:00:00', '2023-03-31T00:00:00') >>> get_datetime({}) (None, None)
- eodag.utils.dates.get_timestamp(date_time)[source]#
Return the Unix timestamp of an ISO8601 date/datetime in seconds.
If the datetime has no offset, it is assumed to be an UTC datetime.
- Parameters:
date_time (
str) – The datetime string to return as timestamp- Return type:
- Returns:
The timestamp corresponding to the
date_timestring in seconds
- Examples:
>>> get_timestamp("2023-09-23T12:34:56Z") 1695472496.0 >>> get_timestamp("2023-09-23T12:34:56+02:00") 1695465296.0 >>> get_timestamp("2023-09-23") 1695427200.0
- eodag.utils.dates.is_range_in_range(valid_range, check_range)[source]#
Check if the check_range is completely within the valid_range.
This function checks if both the start and end dates of the check_range are within the start and end dates of the valid_range.
- Parameters:
- Return type:
- Returns:
True if check_range is within valid_range, otherwise False.
- Examples:
>>> is_range_in_range("2023-01-01/2023-12-31", "2023-03-01/2023-03-31") True >>> is_range_in_range("2023-01-01/2023-12-31", "2022-12-01/2023-03-31") False >>> is_range_in_range("2023-01-01/2023-12-31", "2023-11-01/2024-01-01") False >>> is_range_in_range("2023-01-01/2023-12-31", "invalid-range") False >>> is_range_in_range("invalid-range", "2023-03-01/2023-03-31") False
- eodag.utils.dates.rfc3339_str_to_datetime(s)[source]#
Convert a string conforming to RFC 3339 to a
datetime.datetime.- Parameters:
s (
str) – The string to convert todatetime.datetime- Return type:
- Returns:
The datetime represented by the ISO8601 (RFC 3339) formatted string
- Raises:
ValidationError
- Examples:
>>> from eodag.utils.exceptions import ValidationError >>> rfc3339_str_to_datetime("2023-09-23T12:34:56Z") datetime.datetime(2023, 9, 23, 12, 34, 56, tzinfo=datetime.timezone.utc)
>>> rfc3339_str_to_datetime("invalid-date") Traceback (most recent call last): ... ValidationError
Free text search#
- eodag.utils.free_text_search.compile_free_text_query(query)[source]#
Compiles a free-text logical search query into a dictionary evaluator function.
The evaluator checks whether the concatenated string values of a dictionary (case-insensitive) satisfy the given logical expression.
- Processing steps:
Tokenize the query into words, quoted phrases, wildcards, and operators.
Convert infix tokens into postfix notation using the Shunting Yard algorithm.
Build an evaluator function that applies the expression to dictionary fields.
- Supported features:
Logical operators:
AND,OR,NOTGrouping with parentheses:
(,)Exact phrases in quotes:
"foo bar"(case-insensitive substring match)- Wildcards inside tokens:
*→ matches zero or more characters?→ matches exactly one character
Plain tokens without wildcards → matched as whole words (word boundary aware)
Case-insensitive matching across all tokens and phrases
- Parameters:
query (
str) – A logical search expression (e.g.,'("foo bar" OR baz*) AND NOT qux').- Return type:
- Returns:
A function that takes a
dict[str, str]and returnsTrueif it matches.- Example:
>>> evaluator = compile_free_text_query('("FooAndBar" OR BAR) AND "FOOBAR collection"') >>> evaluator({ ... "title": "titleFOOBAR - Lorem FOOBAR collection", ... "abstract": "abstract FOOBAR - This is FOOBAR. FooAndBar" ... }) True >>> evaluator({ ... "title": "collection FOOBAR", ... "abstract": "abstract FOOBAR - This is FOOBAR. FooAndBar" ... }) False >>> evaluator({ ... "title": "titleFOOBAR - Lorem FOOBAR ", ... "abstract": "abstract FOOBAR - This is FOOBAR." ... }) False >>> evaluator({"title": "Only Bar here"}) False
Wildcard example:
>>> evaluator = compile_free_text_query('foo*') >>> evaluator({"title": "this is foobar"}) True >>> evaluator({"title": "something with fooo"}) True >>> evaluator({"title": "bar only"}) False
Notebook#
S3#
- class eodag.utils.s3.S3FileInfo(size, key, bucket_name, zip_filepath=None, data_start_offset=0, data_type='application/octet-stream', rel_path=None, file_start_offset=0, futures=<factory>, buffers=<factory>, next_yield=0)[source]#
Describe a S3 object with basic f_info and its download state.
- Parameters:
-
buffers:
dict[int,bytes]# Buffers for downloaded data chunks, mapping start byte offsets to the actual data. This allows for partial downloads and efficient memory usage. The key is the start byte offset, and the value is the bytes data for that offset. This is used to yield data in the correct order during streaming. It is updated as chunks are downloaded.
-
data_type:
str= 'application/octet-stream'# MIME type of the file, defaulting to application/octet-stream. It can be updated based on the file extension or content type.
-
futures:
dict# Mapping of futures to their start byte offsets, used to track download progress. Each future corresponds to a chunk of data being downloaded. The key is the future object, and the value is the start byte offset of that chunk in the logical file stream.
-
next_yield:
int= 0# The next offset to yield in the file, used to track progress during downloading and yielding chunks. It starts at 0 and is updated as data is yielded. This allows the streaming process to continue from where it left off, ensuring that all data is eventually yielded without duplication.
- eodag.utils.s3.fetch_range(bucket_name, key_name, start, end, client_s3)[source]#
Range-fetches a S3 key.
- eodag.utils.s3.file_position_from_s3_zip(s3_bucket, object_key, s3_client, target_filepath)[source]#
Get the start position and size of a specific file inside a ZIP archive stored in S3. This function assumes the file is uncompressed (ZIP_STORED).
The returned tuple contains:
file_data_start: The byte offset where the file data starts in the ZIP archive.
file_size: The size of the file in bytes.
- Parameters:
- Return type:
- Returns:
A tuple (file_data_start, file_size)
- Raises:
FileNotFoundError – If the target file is not found in the ZIP archive.
NotImplementedError – If the file is not uncompressed (ZIP_STORED)
- eodag.utils.s3.list_files_in_s3_zipped_object(bucket_name, key_name, s3_client)[source]#
List files in s3 zipped object, without downloading it.
See https://stackoverflow.com/questions/41789176/how-to-count-files-inside-zip-in-aws-s3-without-downloading-it; Based on https://stackoverflow.com/questions/51351000/read-zip-files-from-s3-without-downloading-the-entire-file
- eodag.utils.s3.open_s3_zipped_object(bucket_name, key_name, s3_client, zip_size=None, partial=True)[source]#
Fetches the central directory and EOCD (End Of Central Directory) from an S3 object and opens a ZipFile in memory.
This function retrieves the ZIP file’s central directory and EOCD by performing range requests on the S3 object. It supports partial fetching (only the central directory and EOCD) for efficiency, or full ZIP download if needed.
- Parameters:
bucket_name (
str) – Name of the S3 bucket containing the ZIP file.key_name (
str) – Key (path) of the ZIP file in the S3 bucket.s3_client – S3 client instance used to perform range requests.
zip_size (
int|None, default:None) – Size of the ZIP file in bytes. If None, it will be determined via a HEAD request.partial (
bool, default:True) – If True, only fetch the central directory and EOCD. If False, fetch the entire ZIP file.
- Return type:
- Returns:
Tuple containing the opened ZipFile object and the central directory bytes.
- Raises:
InvalidDataError – If the EOCD signature is not found in the last 64KB of the file.
- eodag.utils.s3.stream_download_from_s3(s3_client, files_info, byte_range=(None, None), compress='auto', zip_filename='archive', range_size=8388608, max_workers=8)[source]#
Stream data from one or more S3 objects in chunks, with support for global byte ranges.
This function provides efficient streaming download of S3 objects with support for:
Single file streaming with direct MIME type detection
Multiple file streaming as ZIP archives
Byte range requests for partial content
Files within ZIP archives (using
.zip!notation)Concurrent chunk downloading for improved performance
Memory-efficient streaming without loading entire files
The response format depends on the compress parameter and number of files:
Single file +
compress="raw"or"auto": streams file directly with detected MIME typeMultiple files +
compress="zip"or"auto": creates ZIP archive containing all filescompress="zip": always creates ZIP archive regardless of file count
For files stored within ZIP archives, use the
.zip!notation in theS3FileInfo.key:"path/to/archive.zip!internal/file.txt"- Parameters:
s3_client (
S3Client) – Boto3 S3 client instance for making requestsfiles_info (
list[S3FileInfo]) – List of S3FileInfo objects describing files to download. Each object must contain at minimum:bucket_name,key, andsize. Optional fields include:data_type,rel_path,zip_filepath.byte_range (
tuple[int|None,int|None], default:(None, None)) – Global byte range to download as(start, end)tuple.Nonevalues indicate open-ended ranges. Applied across the logical concatenation of all files.compress (
Literal['zip','raw','auto'], default:'auto') –Output format control:
"zip": Always create ZIP archive"raw": Stream files directly (single) or as multipart (multiple)"auto": ZIP for multiple files, raw for single file
zip_filename (
str, default:'archive') – Base filename for ZIP archives (without.zipextension). Only used when creating ZIP archives.range_size (
int, default:8388608) – Size of each download chunk in bytes. Larger chunks reduce request overhead but use more memory. Default: 8MB.max_workers (
int, default:8) – Maximum number of concurrent download threads. Higher values improve throughput for multiple ranges.
- Returns:
StreamResponse object containing:
content: Iterator of bytes for the streaming responsemedia_type: MIME type ("application/zip"for archives, detected type for single files)headers: HTTP headers including Content-Disposition for downloads
- Return type:
- Raises:
InvalidDataError – If ZIP file structures are malformed
NotAvailableError – If S3 objects cannot be accessed
AuthenticationError – If S3 credentials are invalid
NotImplementedError – If compressed files within ZIP archives are encountered
Example usage:
import boto3 from eodag.utils.s3 import stream_download_from_s3, S3FileInfo # Create S3 client s3_client = boto3.client('s3') # Single file download files = [S3FileInfo(bucket_name="bucket", key="file.txt", size=1024)] response = stream_download_from_s3(s3_client, files) # Multiple files as ZIP archive files = [ S3FileInfo(bucket_name="bucket", key="file1.txt", size=1024), S3FileInfo(bucket_name="bucket", key="file2.txt", size=2048) ] response = stream_download_from_s3(s3_client, files, compress="zip") # File within ZIP archive files = [S3FileInfo( bucket_name="bucket", key="archive.zip!internal.txt", size=512 )] response = stream_download_from_s3(s3_client, files) # Process streaming response for chunk in response.content: # Handle chunk data pass
xarray#
Warning
These functions will only be available with eodag-cube installed.
Xarray-related utilities
- eodag_cube.utils.xarray.guess_engines(file)[source]#
Guess matching
xarrayengines for fsspecfsspec.core.OpenFile
- eodag_cube.utils.xarray.try_open_dataset(file, **xarray_kwargs)[source]#
Try opening xarray dataset from fsspec OpenFile
- Parameters:
file (
OpenFile) – fsspec https OpenFilexarray_kwargs (
Any) – (optional) keyword arguments passed toxarray.open_dataset()
- Return type:
- Returns:
opened xarray dataset
Misc#
Miscellaneous utilities to be used throughout eodag.
Everything that does not fit into one of the specialised categories of utilities in this package should go here
- class eodag.utils.FloatRange(min=None, max=None)[source]#
A parameter that works similar to
click.FLOATbut restricts the value to fit into a range. Fails if the value doesn’t fit into the range.- name: str = 'percentage'#
the descriptive name of this type
- class eodag.utils.MockResponse(json_data=None, status_code=200, headers=None)[source]#
Fake requests response
- Parameters:
- class eodag.utils.StreamResponse(content, filename=None, size=None, headers=None, media_type=None, status_code=None, arcname=None)[source]#
Represents a streaming response
- Parameters:
- eodag.utils.cached_parse(str_to_parse)[source]#
Cached
jsonpath_ng.ext.parse()>>> cached_parse.cache_clear() >>> cached_parse("$.foo") Child(Root(), Fields('foo')) >>> cached_parse.cache_info() CacheInfo(hits=0, misses=1, maxsize=128, currsize=1) >>> cached_parse("$.foo") Child(Root(), Fields('foo')) >>> cached_parse.cache_info() CacheInfo(hits=1, misses=1, maxsize=128, currsize=1) >>> cached_parse("$.bar") Child(Root(), Fields('bar')) >>> cached_parse.cache_info() CacheInfo(hits=1, misses=2, maxsize=128, currsize=2)
- Parameters:
str_to_parse (
str) – string to parse asjsonpath_ng.JSONPath- Return type:
JSONPath- Returns:
parsed
jsonpath_ng.JSONPath
- eodag.utils.cached_yaml_load_all(config_path)[source]#
Cached
yaml.load_all()Load all configurations stored in the configuration file as separated yaml documents
- eodag.utils.cast_scalar_value(value, new_type)[source]#
Convert a scalar (not nested) value type to the given one
>>> cast_scalar_value('1', int) 1 >>> cast_scalar_value(1, str) '1' >>> cast_scalar_value('false', bool) False
- eodag.utils.deepcopy(sth)[source]#
Customized and faster deepcopy inspired by https://stackoverflow.com/a/45858907
_copy_listand_copy_dictdispatchers available for the moment
- eodag.utils.dict_items_recursive_apply(config_dict, apply_method, **apply_method_parameters)[source]#
Recursive apply method to dict elements
>>> dict_items_recursive_apply( ... {"foo": {"bar": "baz"}, "qux": ["a", "b"]}, ... lambda k, v, x: v.upper() + x, **{"x": "!"} ... ) == {'foo': {'bar': 'BAZ!'}, 'qux': ['A!', 'B!']} True
- eodag.utils.dict_items_recursive_sort(config_dict)[source]#
Recursive sort dict elements
>>> dict_items_recursive_sort( ... {"b": {"b": "c", "a": 0}, "a": ["b", {2: 0, 0: 1, 1: 2}]}, ... ) == {"a": ["b", {0: 1, 1: 2, 2: 0}], "b": {"a": 0, "b": "c"}} True
- eodag.utils.dict_md5sum(input_dict)[source]#
Hash nested dictionary
>>> hd = dict_md5sum({"b": {"c": 1, "a": 2, "b": 3}, "a": 4}) >>> hd 'a195bcef1bb3b419e9e74b7cc5db8098' >>> assert(dict_md5sum({"a": 4, "b": {"b": 3, "c": 1, "a": 2}}) == hd)
- eodag.utils.flatten_top_directories(nested_dir_root, common_subdirs_path=None)[source]#
Flatten directory structure, removing common empty sub-directories
- eodag.utils.format_dict_items(config_dict, **format_variables)[source]#
Recursively apply
str.format()to**format_variablesonconfig_dictvalues>>> format_dict_items( ... {"foo": {"bar": "{a}"}, "baz": ["{b}?", "{b}!"]}, ... **{"a": "qux", "b": "quux"}, ... ) == {"foo": {"bar": "qux"}, "baz": ["quux?", "quux!"]} True
- eodag.utils.format_pydantic_error(e)[source]#
Format Pydantic ValidationError
- Parameters:
e (PydanticValidationError) – A Pydantic ValidationError object
- Return type:
- eodag.utils.format_string(key, str_to_format, **format_variables)[source]#
Format
"{foo}"-like string>>> format_string(None, "foo {bar}, {baz} ?", **{"bar": "qux", "baz": "quux"}) 'foo qux, quux ?'
- eodag.utils.get_bucket_name_and_prefix(url, bucket_path_level=None)[source]#
Extract bucket name and prefix from URL
- eodag.utils.get_geometry_from_various(locations_config=[], **query_args)[source]#
Creates a
shapely.geometryusing given query kwargs arguments
- eodag.utils.get_ssl_context(ssl_verify)[source]#
Returns an SSL context based on
ssl_verifyargument.- Parameters:
ssl_verify (
bool) –ssl_verifyparameter- Return type:
- Returns:
An SSL context object.
- eodag.utils.guess_extension(type)[source]#
Guess extension from mime type, using eodag extended mimetypes definition
>>> guess_extension('image/tiff') '.tiff' >>> guess_extension('application/x-grib') '.grib'
- eodag.utils.guess_file_type(file)[source]#
Guess the mime type of a file or URL based on its extension, using eodag extended mimetypes definition
>>> guess_file_type('foo.tiff') 'image/tiff' >>> guess_file_type('foo.grib') 'application/x-grib'
- eodag.utils.items_recursive_apply(input_obj, apply_method, **apply_method_parameters)[source]#
Recursive apply method to items contained in input object (dict or list)
>>> items_recursive_apply( ... {"foo": {"bar":"baz"}, "qux": ["a","b"]}, ... lambda k,v,x: v.upper()+x, **{"x":"!"} ... ) == {'foo': {'bar': 'BAZ!'}, 'qux': ['A!', 'B!']} True >>> items_recursive_apply( ... [{"foo": {"bar":"baz"}}, "qux"], ... lambda k,v,x: v.upper()+x, ... **{"x":"!"}) [{'foo': {'bar': 'BAZ!'}}, 'QUX!'] >>> items_recursive_apply( ... "foo", ... lambda k,v,x: v.upper()+x, ... **{"x":"!"}) 'foo'
- eodag.utils.items_recursive_sort(input_obj)[source]#
Recursive sort dict items contained in input object (dict or list)
>>> items_recursive_sort( ... {"b": {"b": "c", "a": 0}, "a": ["b", {2: 0, 0: 1, 1: 2}]}, ... ) == {"a": ["b", {0: 1, 1: 2, 2: 0}], "b": {"a": 0, "b": "c"}} True >>> items_recursive_sort(["b", {2: 0, 0: 1, 1:2}]) ['b', {0: 1, 1: 2, 2: 0}] >>> items_recursive_sort("foo") 'foo'
- eodag.utils.jsonpath_parse_dict_items(jsonpath_dict, values_dict)[source]#
Recursively parse
jsonpath_ng.JSONPathelements in dict>>> import jsonpath_ng.ext as jsonpath >>> jsonpath_parse_dict_items( ... {"foo": {"bar": parse("$.a.b")}, "qux": [parse("$.c"), parse("$.c")]}, ... {"a":{"b":"baz"}, "c":"quux"} ... ) == {'foo': {'bar': 'baz'}, 'qux': ['quux', 'quux']} True
- eodag.utils.list_items_recursive_apply(config_list, apply_method, **apply_method_parameters)[source]#
Recursive apply method to list elements
>>> list_items_recursive_apply( ... [{"foo": {"bar": "baz"}}, "qux"], ... lambda k, v, x: v.upper() + x, ... **{"x": "!"}) [{'foo': {'bar': 'BAZ!'}}, 'QUX!']
- eodag.utils.list_items_recursive_sort(config_list)[source]#
Recursive sort dicts in list elements
>>> list_items_recursive_sort(["b", {2: 0, 0: 1, 1: 2}]) ['b', {0: 1, 1: 2, 2: 0}]
- eodag.utils.maybe_generator(obj)[source]#
Generator function that get an arbitrary object and generate values from it if the object is a generator.
- eodag.utils.md5sum(file_path)[source]#
Get file MD5 checksum
>>> import os >>> md5sum(os.devnull) 'd41d8cd98f00b204e9800998ecf8427e'
- eodag.utils.merge_mappings(mapping1, mapping2)[source]#
Merge two mappings with string keys, values from
mapping2overriding values frommapping1.Do its best to detect the key in
mapping1to override. For example:>>> mapping2 = {"keya": "new"} >>> mapping1 = {"keyA": "obsolete"} >>> merge_mappings(mapping1, mapping2) >>> mapping1 {'keyA': 'new'}
If
mapping2has a key that cannot be detected inmapping1, this new key is added tomapping1as is.
- eodag.utils.mutate_dict_in_place(func, mapping)[source]#
Apply func to values of mapping.
The mapping object’s values are modified in-place. The function is recursive, allowing to also modify values of nested dicts that may be level-1 values of mapping.
- eodag.utils.nested_pairs2dict(pairs)[source]#
Create a dict using nested pairs
>>> nested_pairs2dict([["foo", [["bar", "baz"]]]]) {'foo': {'bar': 'baz'}}
- eodag.utils.obj_md5sum(data)[source]#
Get MD5 checksum from JSON serializable object
>>> obj_md5sum(None) '37a6259cc0c1dae299a7866489dff0bd'
- eodag.utils.parse_header(header)[source]#
Parse HTTP header
>>> parse_header( ... 'Content-Disposition: form-data; name="field2"; filename="example.txt"' ... ).get_param("filename") 'example.txt'
- eodag.utils.parse_jsonpath(key, jsonpath_obj, **values_dict)[source]#
Parse jsonpah in
jsonpath_objusingvalues_dict>>> import jsonpath_ng.ext as jsonpath >>> parse_jsonpath(None, parse("$.foo.bar"), **{"foo": {"bar": "baz"}}) 'baz'
- eodag.utils.parse_le_uint16(data)[source]#
Parse little-endian unsigned 2-byte integer.
>>> parse_le_uint16(b'\x01\x00') 1 >>> parse_le_uint16(b'\xff\xff') 65535
- eodag.utils.parse_le_uint32(data)[source]#
Parse little-endian unsigned 4-byte integer.
>>> parse_le_uint32(b'\x01\x00\x00\x00') 1 >>> parse_le_uint32(b'\xff\xff\xff\xff') 4294967295
- eodag.utils.remove_str_array_quotes(input_str)[source]#
Remove quotes around arrays to avoid json parsing errors
- Parameters:
input_str (
str) – string to format- Return type:
- Returns:
string without quotes surrounding array brackets
>>> remove_str_array_quotes('"a":"["a", "b"]"') '"a":["a", "b"]' >>> remove_str_array_quotes('{"a":"["a", "b"]", "b": ["c", "d"]}') '{"a":["a", "b"], "b": ["c", "d"]}'
- eodag.utils.rename_subfolder(dirpath, name)[source]#
Rename first subfolder found in
dirpathwith givenname, raiseRuntimeErrorif no subfolder can be found- Parameters:
- Raises:
- Return type:
Example:
>>> import os >>> import tempfile >>> with tempfile.TemporaryDirectory() as tmpdir: ... somefolder = os.path.join(tmpdir, "somefolder") ... otherfolder = os.path.join(tmpdir, "otherfolder") ... os.makedirs(somefolder) ... assert os.path.isdir(somefolder) and not os.path.isdir(otherfolder) ... rename_subfolder(tmpdir, "otherfolder") ... assert not os.path.isdir(somefolder) and os.path.isdir(otherfolder)
Before:
$ tree <tmp-folder> <tmp-folder> └── somefolder └── somefileAfter:
$ tree <tmp-folder> <tmp-folder> └── otherfolder └── somefile
- eodag.utils.rename_with_version(file_path, suffix='old')[source]#
Renames a file by appending and incrementing a version number if a conflict exists.
- Parameters:
- Return type:
- Returns:
new file path with the version appended or incremented
Example:
>>> import tempfile >>> from pathlib import Path >>> with tempfile.TemporaryDirectory() as tmpdir: ... file_path = (Path(tmpdir) / "foo.txt") ... file_path.touch() ... (Path(tmpdir) / "foo_old1.txt").touch() ... expected = str(Path(tmpdir) / "foo_old2.txt") ... assert expected == rename_with_version(str(file_path))
- eodag.utils.sanitize(value)[source]#
Sanitize string to be used as a name of a directory.
>>> sanitize('productName') 'productName' >>> sanitize('name with multiple spaces') 'name_with_multiple_spaces' >>> sanitize('âtre fête île alcôve bûche çà génèse où Noël ovoïde capharnaüm') 'atre_fete_ile_alcove_buche_ca_genese_ou_Noel_ovoide_capharnaum' >>> sanitize('replace,ponctuation:;signs!?byunderscorekeeping-hyphen.dot_and_underscore') # noqa 'replace_ponctuation_signs_byunderscorekeeping-hyphen.dot_and_underscore'
- eodag.utils.slugify(value, allow_unicode=False)[source]#
Copied from Django Source code, only modifying last line (no need for safe strings).
source: django/django
Convert to ASCII if
allow_unicodeisFalse. Convert spaces to hyphens. Remove characters that aren’t alphanumerics, underscores, or hyphens. Convert to lowercase. Also strip leading and trailing whitespace.
- eodag.utils.sort_dict(input_dict)[source]#
Recursively sorts a dict by keys.
- Parameters:
- Return type:
- Returns:
sorted dict
>>> sort_dict({"b": {"c": 1, "a": 2, "b": 3}, "a": 4}) {'a': 4, 'b': {'a': 2, 'b': 3, 'c': 1}}
- eodag.utils.string_to_jsonpath(*args, force=False)[source]#
Get
jsonpath_ng.JSONPathfor$.foo.barlike string>>> string_to_jsonpath(None, "$.foo.bar") Child(Child(Root(), Fields('foo')), Fields('bar')) >>> string_to_jsonpath("$.foo.bar") Child(Child(Root(), Fields('foo')), Fields('bar')) >>> string_to_jsonpath('$.foo[0][*]') Child(Child(Child(Root(), Fields('foo')), Index(index=0)), Slice(start=None,end=None,step=None)) >>> string_to_jsonpath("foo") 'foo' >>> string_to_jsonpath("foo", force=True) Fields('foo')
- eodag.utils.strip_accents(s)[source]#
Strip accents of a string.
>>> strip_accents('productName') 'productName' >>> strip_accents('génèse') 'genese' >>> strip_accents('preserve-punct-special-chars:;,?!§%$£œ') 'preserve-punct-special-chars:;,?!§%$£œ'
- eodag.utils.update_nested_dict(old_dict, new_dict, extend_list_values=False, allow_empty_values=False, allow_extend_duplicates=True)[source]#
Update recursively
old_dictitems withnew_dictones>>> update_nested_dict( ... {"a": {"a.a": 1, "a.b": 2}, "b": 3}, ... {"a": {"a.a": 10}} ... ) == {'a': {'a.a': 10, 'a.b': 2}, 'b': 3} True >>> update_nested_dict( ... {"a": {"a.a": [1, 2]}}, ... {"a": {"a.a": [10, 2]}}, ... extend_list_values=True, ... allow_extend_duplicates=True ... ) == {'a': {'a.a': [1, 2, 10, 2]}} True >>> update_nested_dict( ... {"a": {"a.a": [1, 2]}}, ... {"a": {"a.a": [10, 2]}}, ... extend_list_values=True, ... allow_extend_duplicates=False ... ) == {'a': {'a.a': [1, 2, 10]}} True >>> update_nested_dict( ... {"a": {"a.a": 1, "a.b": 2}, "b": 3}, ... {"a": {"a.a": None}}, ... ) == {'a': {'a.a': 1, 'a.b': 2}, 'b': 3} True >>> update_nested_dict( ... {"a": {"a.a": 1, "a.b": 2}, "b": 3}, ... {"a": {"a.a": None}}, ... allow_empty_values=True ... ) == {'a': {'a.a': None, 'a.b': 2}, 'b': 3} True