Source code for tamr_toolbox.utils.client

"""Tasks related to connecting to a Tamr instance"""
import logging
import re
from base64 import b64decode

import requests
from json import dumps
from typing import Optional, Union
from time import sleep, time as now

from requests import Response
from tamr_unify_client import Client
from tamr_unify_client.auth import UsernamePasswordAuth
from tamr_unify_client.auth import JwtTokenAuth

from tamr_toolbox.utils.version import requires_tamr_version

LOGGER = logging.getLogger(__name__)

TAMR_JWT_RELEASE_VERSION = "2022.010.0"


[docs]def health_check(client: Client) -> bool: """ Query the health check API and check if each service is healthy (returns True) Args: client: the tamr client Returns: True if all services are healthy, False if unhealthy """ try: response = client.get(endpoint="/api/service/health") healthy_status = all([value["healthy"] for value in response.json().values()]) if healthy_status: LOGGER.info(f"Client is healthy: {dumps(response.json(), indent=2)}") else: LOGGER.error(f"Client is unhealthy: {dumps(response.json(), indent=2)}") return healthy_status except requests.exceptions.ConnectionError as e: LOGGER.error(f"Could not connect to {client.host}. Error: {e}") return False
[docs]def create( *, username: str, password: str, host: str, port: Optional[Union[str, int]] = 9100, protocol: str = "http", base_path: str = "/api/versioned/v1/", session: Optional[requests.Session] = None, store_auth_cookie: bool = False, enforce_healthy: bool = False, ) -> Client: """Creates a Tamr client from the provided configuration values Args: username: The username to log access Tamr as password: the password for the user host: The ip address of Tamr port: The port of the Tamr UI. Pass a value of `None` to specify an address with no port protocol: https or http base_path: Optional argument to specify a different base path session: Optional argument to pass an existing requests Session store_auth_cookie: If true will allow Tamr authentication cookie to be stored and reused enforce_healthy: If true will enforce a healthy state upon creation Returns: Tamr client """ full_address = f"{protocol}://{host}:{port}" if port is not None else f"{protocol}://{host}" LOGGER.info(f"Creating client as user {username} at {full_address}.") client = Client( auth=UsernamePasswordAuth(username=username, password=password), host=host, port=int(port) if port is not None else None, protocol=protocol, base_path=base_path, session=session, store_auth_cookie=store_auth_cookie, ) if enforce_healthy: if not health_check(client): LOGGER.error(f"Tamr is not healthy. Check logs and Tamr.") raise SystemError("Tamr is not healthy. Check logs and Tamr.") return client
[docs]@requires_tamr_version(min_version=TAMR_JWT_RELEASE_VERSION) def create_with_jwt( *, token: str, host: str, port: Optional[Union[str, int]] = 9100, protocol: str = "http", base_path: str = "/api/versioned/v1/", session: Optional[requests.Session] = None, store_auth_cookie: bool = False, enforce_healthy: bool = False, ) -> Client: """Creates a Tamr client from the provided configuration values using a JWT token instead of a username and password. Note that this feature is only available on v2022.010.0 or later. Args: token: A JWT token to authenticate the client host: The ip address of Tamr port: The port of the Tamr UI. Pass a value of `None` to specify an address with no port protocol: https or http base_path: Optional argument to specify a different base path session: Optional argument to pass an existing requests Session store_auth_cookie: If true will allow Tamr authentication cookie to be stored and reused enforce_healthy: If true will enforce a healthy state upon creation Returns: Tamr client """ full_address = f"{protocol}://{host}:{port}" if port is not None else f"{protocol}://{host}" LOGGER.info(f"Creating client using JWT token at {full_address}.") client = Client( auth=JwtTokenAuth(token=token), host=host, port=int(port) if port is not None else None, protocol=protocol, base_path=base_path, session=session, store_auth_cookie=store_auth_cookie, ) if enforce_healthy: if not health_check(client): LOGGER.error(f"Tamr is not healthy. Check logs and Tamr.") raise SystemError("Tamr is not healthy. Check logs and Tamr.") return client
[docs]def get_with_connection_retry( client: Client, api_endpoint: str, *, timeout_seconds: int = 600, sleep_seconds: int = 20 ) -> requests.Response: """Will handle exceptions when attempting to connect to the Tamr API. This is used to handle connection issues when Tamr restarts due to a restore. Args: client: A Tamr client object api_endpoint: Tamr API endpoint timeout_seconds: Amount of time before a timeout error is thrown. Default is 600 seconds sleep_seconds: Amount of time in between attempts to connect to Tamr. Returns: A response object from API request.""" started = now() while timeout_seconds is None or now() - started < timeout_seconds: try: response = client.get(api_endpoint) return response except requests.exceptions.ConnectionError as e: # If we got for example a connection refused exception, try again later LOGGER.warning(f"Caught exception in connect {e}") sleep(sleep_seconds) raise TimeoutError(f"Took longer than {timeout_seconds} seconds to connect to tamr.")
[docs]def poll_endpoint( client: Client, api_endpoint: str, *, poll_interval_seconds: int = 3, polling_timeout_seconds: Optional[int] = None, connection_retry_timeout_seconds: int = 600, ) -> requests.Response: """Waits until job has a state of Canceled, Succeeded, or Failed. Args: client: A Tamr client object api_endpoint: Tamr API endpoint poll_interval_seconds: Amount of time in between polls of job state. polling_timeout_seconds: Amount of time before a timeout error is thrown. connection_retry_timeout_seconds: Amount of time before timeout error is thrown during connection retry. Returns: A response object from API request. """ started = now() op = get_with_connection_retry( client=client, api_endpoint=api_endpoint, timeout_seconds=connection_retry_timeout_seconds, sleep_seconds=poll_interval_seconds, ) state = op.json()["state"] while polling_timeout_seconds is None or now() - started < polling_timeout_seconds: if state in ["PENDING", "RUNNING"]: sleep(poll_interval_seconds) elif state in ["CANCELED", "SUCCEEDED", "FAILED"]: return op op = get_with_connection_retry( client=client, api_endpoint=api_endpoint, timeout_seconds=connection_retry_timeout_seconds, sleep_seconds=poll_interval_seconds, ) state = op.json()["state"] raise TimeoutError(f"Took longer than {polling_timeout_seconds} seconds to connect to tamr.")
def _from_response(response: Response) -> Client: """Creates a Tamr Client based on a previous api response Args: response: The response to base the Client on Returns: New Tamr Client based on the previous response """ request = response.request url_matcher = re.match(r"(https?)://(.*):(\d{4})(.*)", request.url) auth_hash_matcher = re.match(r"BasicCreds (.*)", request.headers.get("Authorization")) creds_matcher = re.match(r"(.*):(.*)", b64decode(auth_hash_matcher.group(1)).decode("latin1")) return create( username=creds_matcher.group(1), password=creds_matcher.group(2), host=url_matcher.group(2), port=url_matcher.group(3), protocol=url_matcher.group(1), )