Source code for eodag.plugins.authentication.openid_connect

# -*- coding: utf-8 -*-
# Copyright 2018, CS GROUP - France, https://www.csgroup.eu/
#
# This file is part of EODAG project
#     https://www.github.com/CS-SI/EODAG
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

import logging
import re
import string
from datetime import datetime, timedelta, timezone
from random import SystemRandom
from typing import TYPE_CHECKING, Any, Optional
from urllib.parse import parse_qs, urlparse

import jwt
import requests
from lxml import etree
from requests.auth import AuthBase

from eodag.plugins.authentication import Authentication
from eodag.utils import (
    DEFAULT_TOKEN_EXPIRATION_MARGIN,
    HTTP_REQ_TIMEOUT,
    USER_AGENT,
    repeatfunc,
)
from eodag.utils.exceptions import (
    AuthenticationError,
    MisconfiguredError,
    RequestError,
    TimeOutError,
)

if TYPE_CHECKING:
    from requests import PreparedRequest, Response

    from eodag.config import PluginConfig


logger = logging.getLogger("eodag.auth.openid_connect")


[docs] class OIDCRefreshTokenBase(Authentication): """OIDC refresh token base class, to be used through specific OIDC flows plugins; Common mechanism to handle refresh token from all OIDC auth plugins; Plugins inheriting from this base class must implement the methods ``_request_new_token()`` and ``_get_token_with_refresh_token()``. Depending on the implementation of these methods they can have different configuration parameters. """ jwks_client: jwt.PyJWKClient access_token: str access_token_expiration: datetime refresh_token: str refresh_token_expiration: datetime token_endpoint: str authorization_endpoint: str
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: super(OIDCRefreshTokenBase, self).__init__(provider, config) self.session = requests.Session() self.access_token = "" self.access_token_expiration = datetime.min.replace(tzinfo=timezone.utc) self.refresh_token = "" self.refresh_token_expiration = datetime.min.replace(tzinfo=timezone.utc) try: response = requests.get(self.config.oidc_config_url) response.raise_for_status() auth_config = response.json() except requests.HTTPError as e: raise MisconfiguredError( f"Cannot obtain OIDC endpoints from {self.config.oidc_config_url}" f"Request returned {e.response.text}." ) self.jwks_client = jwt.PyJWKClient(auth_config["jwks_uri"]) self.token_endpoint = auth_config["token_endpoint"] self.authorization_endpoint = auth_config["authorization_endpoint"] self.algorithms = auth_config["id_token_signing_alg_values_supported"]
def decode_jwt_token(self, token: str) -> dict[str, Any]: """Decode JWT token.""" try: key = self.jwks_client.get_signing_key_from_jwt(token).key if getattr(self.config, "allowed_audiences", None): return jwt.decode( token, key, algorithms=self.algorithms, # NOTE: Audience validation MUST match audience claim if set in token # (https://pyjwt.readthedocs.io/en/stable/changelog.html?highlight=audience#id40) audience=self.config.allowed_audiences, ) else: return jwt.decode( token, key, algorithms=self.algorithms, ) except (jwt.exceptions.InvalidTokenError, jwt.exceptions.DecodeError) as e: raise AuthenticationError(e) def _get_access_token(self) -> str: now = datetime.now(timezone.utc) expiration_margin = timedelta( seconds=getattr( self.config, "token_expiration_margin", DEFAULT_TOKEN_EXPIRATION_MARGIN ) ) if self.access_token and self.access_token_expiration - now > expiration_margin: logger.debug( f"Existing access_token is still valid until {self.access_token_expiration.isoformat()}." ) return self.access_token elif ( self.refresh_token and self.refresh_token_expiration - now > expiration_margin ): response = self._get_token_with_refresh_token() logger.debug( "access_token expired, fetching new access_token using refresh_token" ) else: logger.debug("access_token expired or not available yet, new token request") response = self._request_new_token() self.access_token = response[getattr(self.config, "token_key", "access_token")] self.access_token_expiration = datetime.fromtimestamp( self.decode_jwt_token(self.access_token)["exp"], timezone.utc ) self.refresh_token = response.get( getattr(self.config, "refresh_token_key", "refresh_token"), "" ) if self.refresh_token and response.get("refresh_expires_in", "0"): self.refresh_token_expiration = now + timedelta( seconds=int(response["refresh_expires_in"]) ) else: # refresh token does not expire but will be changed at each request self.refresh_token_expiration = now + timedelta(days=1000) return self.access_token def _request_new_token(self) -> dict[str, str]: """Fetch the access token with a new authentication""" raise NotImplementedError( "Incomplete OIDC refresh token retrieval mechanism implementation" ) def _request_new_token_error(self, e: requests.RequestException) -> dict[str, str]: """Handle RequestException raised by `self._request_new_token()`""" if self.access_token: # try using already retrieved token if authenticate() fails (OTP use-case) return { "access_token": self.access_token, "expires_in": self.access_token_expiration.isoformat(), } response_text = getattr(e.response, "text", "").strip() # check if error is identified as auth_error in provider conf auth_errors = getattr(self.config, "auth_error_code", [None]) if not isinstance(auth_errors, list): auth_errors = [auth_errors] if ( e.response and hasattr(e.response, "status_code") and e.response.status_code in auth_errors ): raise AuthenticationError( f"Please check your credentials for {self.provider}.", f"HTTP Error {e.response.status_code} returned.", response_text, ) # other error else: import traceback as tb logger.error( f"Provider {self.provider} returned {getattr(e.response, 'status_code', '')}: {response_text}" ) raise AuthenticationError( "Something went wrong while trying to get access token:\n{}".format( tb.format_exc() ) ) def _get_token_with_refresh_token(self) -> dict[str, str]: """Fetch the access token with the refresh token""" raise NotImplementedError( "Incomplete OIDC refresh token retrieval mechanism implementation" )
[docs] class OIDCAuthorizationCodeFlowAuth(OIDCRefreshTokenBase): """Implement the authorization code flow of the OpenIDConnect authorization specification. The `OpenID Connect <http://openid.net/specs/openid-connect-core-1_0.html>`_ specification adds an authentication layer on top of oauth 2.0. This plugin implements the `authorization code flow <http://openid.net/specs/openid-connect-core-1_0.html#Authentication>`_ option of this specification. The particularity of this plugin is that it proceeds to a headless (not involving the user) interaction with the OpenID provider (if necessary) to authenticate a registered user with its username and password on the server and then granting to eodag the necessary rights. It does that using the client ID of the eodag provider that use it. If the client secret of the eodag provider using this plugin is known, it is used in conjunction with the client ID to do a BASIC Auth during the token exchange request. The headless interaction is fully configurable, and rely on XPATH to retrieve all the necessary information. :param provider: provider name :param config: Authentication plugin configuration: * :attr:`~eodag.config.PluginConfig.type` (``str``) (**mandatory**): OIDCAuthorizationCodeFlowAuth * :attr:`~eodag.config.PluginConfig.redirect_uri` (``str``) (**mandatory**): The callback url that will handle the code given by the OIDC provider * :attr:`~eodag.config.PluginConfig.oidc_config_url` (``str``) (**mandatory**): The url to get the OIDC Provider's endpoints * :attr:`~eodag.config.PluginConfig.client_id` (``str``) (**mandatory**): The OIDC provider's client ID of the eodag provider * :attr:`~eodag.config.PluginConfig.user_consent_needed` (``bool``) (mandatory): Whether a user consent is needed during the authentication * :attr:`~eodag.config.PluginConfig.token_exchange_post_data_method` (``str``) (**mandatory**): One of: ``json``, ``data`` or ``params``. This is the way to pass the data to the POST request that is made to the token server. They correspond to the recognised keywords arguments of the Python `requests <http://docs.python-requests.org/>`_ library * :attr:`~eodag.config.PluginConfig.token_key` (``str``): The key pointing to the token in the json response to the POST request to the token server * :attr:`~eodag.config.PluginConfig.token_provision` (``str``) (**mandatory**): One of ``qs`` or ``header``. This is how the token obtained will be used to authenticate the user on protected requests. If ``qs`` is chosen, then ``token_qs_key`` is mandatory * :attr:`~eodag.config.PluginConfig.login_form_xpath` (``str``) (**mandatory**): The xpath to the HTML form element representing the user login form * :attr:`~eodag.config.PluginConfig.authentication_uri_source` (``str``) (**mandatory**): Where to look for the authentication_uri. One of ``config`` (in the configuration) or ``login-form`` (use the 'action' URL found in the login form retrieved with login_form_xpath). If the value is ``config``, authentication_uri config param is mandatory * :attr:`~eodag.config.PluginConfig.authentication_uri` (``str``): (**mandatory if authentication_uri_source=config**) The URL of the authentication backend of the OIDC provider * :attr:`~eodag.config.PluginConfig.user_consent_form_xpath` (``str``): The xpath to the user consent form. The form is searched in the content of the response to the authorization request * :attr:`~eodag.config.PluginConfig.user_consent_form_data` (``dict[str, str]``): The data that will be passed with the POST request on the form 'action' URL. The data are given as key value pairs, the keys representing the data key and the value being either a 'constant' string value, or a string of the form 'xpath(<path-to-a-value-to-be-retrieved>)' and representing a value to be retrieved in the user consent form. The xpath must resolve directly to a string value, not to an HTML element. Example: ``xpath(//input[@name="sessionDataKeyConsent"]/@value)`` * :attr:`~eodag.config.PluginConfig.additional_login_form_data` (``dict[str, str]``): A mapping giving additional data to be passed to the login POST request. The value follows the same rules as with user_consent_form_data * :attr:`~eodag.config.PluginConfig.exchange_url_error_pattern` (``dict[str, str]``): Key/value pairs of patterns/messages. If exchange_url contains the given pattern, the associated message will be sent in an AuthenticationError * :attr:`~eodag.config.PluginConfig.client_secret` (``str``): The OIDC provider's client secret of the eodag provider * :attr:`~eodag.config.PluginConfig.token_exchange_params` (``dict[str, str]``): mandatory keys for the dict: redirect_uri, client_id; A mapping between OIDC url query string and token handler query string params (only necessary if they are not the same as for OIDC). This is eodag provider dependant * :attr:`~eodag.config.PluginConfig.token_qs_key` (``str``): (mandatory when token_provision=qs) Refers to the name of the query param to be used in the query request * :attr:`~eodag.config.PluginConfig.refresh_token_key` (``str``): The key pointing to the refresh_token in the json response to the POST request to the token server * :attr:`~eodag.config.PluginConfig.token_expiration_margin` (``int``): The margin of time (in seconds) before a token is considered expired. Default: 60 seconds. """ SCOPE = "openid" RESPONSE_TYPE = "code" CONFIG_XPATH_REGEX = re.compile(r"^xpath\((?P<xpath_value>.+)\)$")
[docs] def __init__(self, provider: str, config: PluginConfig) -> None: super(OIDCAuthorizationCodeFlowAuth, self).__init__(provider, config)
def validate_config_credentials(self) -> None: """Validate configured credentials""" super(OIDCAuthorizationCodeFlowAuth, self).validate_config_credentials() if getattr(self.config, "token_provision", None) not in ("qs", "header"): raise MisconfiguredError( 'Provider config parameter "token_provision" must be one of "qs" or "header"' ) if self.config.token_provision == "qs" and not getattr( self.config, "token_qs_key", "" ): raise MisconfiguredError( 'Provider config parameter "token_provision" with value "qs" must have ' '"token_qs_key" config parameter as well' ) def authenticate(self) -> CodeAuthorizedAuth: """Authenticate""" self._get_access_token() return CodeAuthorizedAuth( self.access_token, self.config.token_provision, key=getattr(self.config, "token_qs_key", None), ) def _request_new_token(self) -> dict[str, str]: """Fetch the access token with a new authentication""" logger.debug("Fetching access token from %s", self.token_endpoint) state = self.compute_state() authentication_response = self.authenticate_user(state) exchange_url = authentication_response.url for err_pattern, err_message in getattr( self.config, "exchange_url_error_pattern", {} ).items(): if err_pattern in exchange_url: raise AuthenticationError(err_message) if not exchange_url.startswith(self.config.redirect_uri): if "Invalid username or password" in authentication_response.text: raise AuthenticationError("Invalid username or password") raise AuthenticationError( f"Could not authenticate user with provider {self.provider}.", "Please verify your credentials", ) if self.config.user_consent_needed: user_consent_response = self.grant_user_consent(authentication_response) exchange_url = user_consent_response.url try: token_response = self.exchange_code_for_token(exchange_url, state) token_response.raise_for_status() except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc except requests.RequestException as e: return self._request_new_token_error(e) return token_response.json() def _get_token_with_refresh_token(self) -> dict[str, str]: """Fetch the access token with the refresh token""" logger.debug( "Fetching access token with refresh token from %s.", self.token_endpoint ) token_data: dict[str, Any] = { "refresh_token": self.refresh_token, "grant_type": "refresh_token", } token_data = self._prepare_token_post_data(token_data) post_request_kwargs: Any = { self.config.token_exchange_post_data_method: token_data } ssl_verify = getattr(self.config, "ssl_verify", True) try: token_response = self.session.post( self.token_endpoint, timeout=HTTP_REQ_TIMEOUT, verify=ssl_verify, **post_request_kwargs, ) token_response.raise_for_status() except requests.exceptions.Timeout as exc: raise TimeOutError(exc, timeout=HTTP_REQ_TIMEOUT) from exc except requests.RequestException as exc: logger.error( "Could not fetch access token with refresh token, executing new token request, error: %s", getattr(exc.response, "text", ""), ) return self._request_new_token() return token_response.json() def authenticate_user(self, state: str) -> Response: """Authenticate user""" self.validate_config_credentials() params = { "client_id": self.config.client_id, "response_type": self.RESPONSE_TYPE, "scope": self.SCOPE, "state": state, "redirect_uri": self.config.redirect_uri, } ssl_verify = getattr(self.config, "ssl_verify", True) try: authorization_response = self.session.get( self.authorization_endpoint, params=params, headers=USER_AGENT, timeout=HTTP_REQ_TIMEOUT, verify=ssl_verify, ) except requests.exceptions.Timeout as exc: raise TimeoutError(exc, "The authentication request timed out.") from exc except requests.RequestException as exc: raise RequestError.from_error( exc, "An error occurred while authenticating the user." ) from exc login_document = etree.HTML(authorization_response.text) login_forms = login_document.xpath(self.config.login_form_xpath) if not login_forms: # we assume user is already logged in # no form found because we got redirected to the redirect_uri return authorization_response login_form = login_forms[0] # Get the form data to pass to the login form from config or from the login form login_data = { key: self._constant_or_xpath_extracted(value, login_form) for key, value in getattr( self.config, "additional_login_form_data", {} ).items() } # Add the credentials login_data.update(self.config.credentials) # Retrieve the authentication_uri from the login form if so configured if self.config.authentication_uri_source == "login-form": # Given that the login_form_xpath resolves to an HTML element, if suffices to add '/@action' to get # the value of its action attribute to this xpath auth_uri = login_form.xpath( self.config.login_form_xpath.rstrip("/") + "/@action" ) if not auth_uri or not auth_uri[0]: raise MisconfiguredError( f"Could not get auth_uri from {self.config.login_form_xpath}" ) auth_uri = auth_uri[0] else: auth_uri = getattr(self.config, "authentication_uri", None) if not auth_uri: raise MisconfiguredError("authentication_uri is missing") try: return self.session.post( auth_uri, data=login_data, headers=USER_AGENT, timeout=HTTP_REQ_TIMEOUT, verify=ssl_verify, ) except requests.exceptions.Timeout as exc: raise TimeoutError(exc, "The authentication request timed out.") from exc except requests.RequestException as exc: raise RequestError.from_error( exc, "An error occurred while authenticating the user." ) from exc def grant_user_consent(self, authentication_response: Response) -> Response: """Grant user consent""" user_consent_document = etree.HTML(authentication_response.text) user_consent_form = user_consent_document.xpath( self.config.user_consent_form_xpath )[0] # Get the form data to pass to the consent form from config or from the consent form user_consent_data = { key: self._constant_or_xpath_extracted(value, user_consent_form) for key, value in self.config.user_consent_form_data.items() } ssl_verify = getattr(self.config, "ssl_verify", True) try: return self.session.post( self.authorization_endpoint, data=user_consent_data, headers=USER_AGENT, timeout=HTTP_REQ_TIMEOUT, verify=ssl_verify, ) except requests.exceptions.Timeout as exc: raise TimeoutError(exc, "The authentication request timed out.") from exc except requests.RequestException as exc: raise RequestError.from_error( exc, "An error occurred while authenticating the user." ) from exc def _prepare_token_post_data(self, token_data: dict[str, Any]) -> dict[str, Any]: """Prepare the common data to post to the token URI""" token_data.update( { "redirect_uri": self.config.redirect_uri, "client_id": self.config.client_id, } ) # If necessary, change the keys of the form data that will be passed to the token exchange POST request custom_token_exchange_params = getattr(self.config, "token_exchange_params", {}) if custom_token_exchange_params: token_data[custom_token_exchange_params["redirect_uri"]] = token_data.pop( "redirect_uri" ) token_data[custom_token_exchange_params["client_id"]] = token_data.pop( "client_id" ) # If the client_secret is known, the token exchange request must be authenticated with a BASIC Auth, using the # client_id and client_secret as username and password respectively if getattr(self.config, "client_secret", None): token_data.update( { "auth": (self.config.client_id, self.config.client_secret), "client_secret": self.config.client_secret, } ) return token_data def exchange_code_for_token(self, authorized_url: str, state: str) -> Response: """Get exchange code for token""" qs = parse_qs(urlparse(authorized_url).query) if qs["state"][0] != state: raise AuthenticationError( "The state received in the authorized url does not match initially computed state" ) code = qs["code"][0] token_exchange_data: dict[str, Any] = { "code": code, "state": state, "grant_type": "authorization_code", } token_exchange_data = self._prepare_token_post_data(token_exchange_data) post_request_kwargs: Any = { self.config.token_exchange_post_data_method: token_exchange_data } ssl_verify = getattr(self.config, "ssl_verify", True) try: r = self.session.post( self.token_endpoint, headers=USER_AGENT, timeout=HTTP_REQ_TIMEOUT, verify=ssl_verify, **post_request_kwargs, ) return r except requests.exceptions.Timeout as exc: raise TimeoutError(exc, "The authentication request timed out.") from exc except requests.RequestException as exc: raise RequestError.from_error( exc, "An error occurred while authenticating the user." ) from exc def _constant_or_xpath_extracted( self, value: str, form_element: Any ) -> Optional[str]: match = self.CONFIG_XPATH_REGEX.match(value) if not match: return value value_from_xpath = form_element.xpath( match.groupdict("xpath_value")["xpath_value"] ) if len(value_from_xpath) == 1: return value_from_xpath[0] return None @staticmethod def compute_state() -> str: """Compute state""" rand = SystemRandom() return "".join( repeatfunc( rand.choice, 22, string.digits + string.ascii_lowercase + string.ascii_uppercase, ) )
class CodeAuthorizedAuth(AuthBase): """CodeAuthorizedAuth custom authentication class to be used with requests module""" def __init__(self, token: str, where: str, key: Optional[str] = None) -> None: self.token = token self.where = where self.key = key def __call__(self, request: PreparedRequest) -> PreparedRequest: """Perform the actual authentication""" if self.where == "qs": parts = urlparse(str(request.url)) query_dict = parse_qs(parts.query) if self.key is not None: query_dict.update({self.key: [self.token]}) url_without_args = parts._replace(query="").geturl() request.prepare_url(url_without_args, query_dict) elif self.where == "header": request.headers["Authorization"] = "Bearer {}".format(self.token) logger.debug( re.sub( r"'Bearer [^']+'", r"'Bearer ***'", f"PreparedRequest: {request.__dict__}", ) ) return request