Source code for tamr_toolbox.utils.operation

"""Tasks related to Tamr operations (or jobs)"""
import logging
from typing import Union, List, Optional
from time import sleep, time as now

from tamr_unify_client import Client
from tamr_unify_client.operation import Operation
from tamr_toolbox.models.data_type import JsonDict
from tamr_toolbox.models.operation_state import OperationState

LOGGER = logging.getLogger(__name__)


[docs]def enforce_success(operation: Operation) -> None: """Raises an error if an operation fails Args: operation: A Tamr operation """ if not operation.succeeded(): raise RuntimeError( f"Operation {operation.resource_id} failed. Description: {operation.description}." f"Status: {operation.status}" )
[docs]def from_resource_id(tamr: Client, *, job_id: Union[int, str]) -> Operation: """Create an operation from a job id Args: tamr: A Tamr client job_id: A job ID Returns: A Tamr operation """ # Tamr sometimes returns a job id of -1 if everthing is up-to-date so the requested task # would result in no change -- make a dummy operation in this case if str(job_id) == "-1": return Operation.from_json(tamr, _dummy_no_op_response()) return Operation.from_resource_id(tamr, str(job_id))
def _dummy_no_op_response(code: str = "job ID -1") -> JsonDict: """ Create a dict to represent 'no-operation operation', in cases where all results that would be produced by an operation are already up-to-date so there's nothing to do. Args: code: response code include in description, Default 'job ID -1' Returns: dict from which dummy operation object can be constructed """ _never = "0000-00-00T00:00:00.000Z" _description = f"""Tamr returned {code} for this operation, indicating that all results that would be produced by the operation are already up-to-date.""" dummy_dict = { "id": "-1", "type": "NOOP", "description": _description, "status": {"state": "SUCCEEDED", "startTime": _never, "endTime": _never, "message": ""}, "created": {"username": "", "time": _never, "version": "-1"}, "lastModified": {"username": "", "time": _never, "version": "-1"}, "relativeId": "operations/-1", } return dummy_dict
[docs]def get_latest(tamr: Client) -> Operation: """Get the latest operation Args: tamr: A Tamr client Returns: The latest job """ op = get_all(tamr)[0] return op
[docs]def get_details(*, operation: Operation) -> str: """Return a text describing the information of a job Args: operation: A Tamr operation Returns: A text describing the information of a job """ return ( f"Host: {operation.client.host} \n Job: {operation.resource_id} \n " f"Description: {operation.description} \n Status: {operation.state} " )
[docs]def get_all(tamr: Client) -> List[Operation]: """Get a list of all jobs or operations. Args: tamr: A Tamr client Returns: A list of Operation objects. """ response = tamr.get( "/api/versioned/v1/operations", headers={"Accept": "application/json"}, stream=True ).json() ops = [Operation.from_json(tamr, item) for item in response] return ops
[docs]def get_active(tamr: Client) -> List[Operation]: """Get a list of pending and running jobs. Args: tamr: A Tamr client Returns: A list of Operations objects """ ops = get_all(tamr) active_states = [OperationState.PENDING, OperationState.RUNNING] active_ops = [op for op in ops if OperationState(op.state) in active_states] return active_ops
[docs]def wait( operation: Operation, *, poll_interval_seconds: int = 3, timeout_seconds: Optional[int] = None ) -> Operation: """Continuously polls for this operation's server-side state. Args: operation: Operation to be polled. poll_interval_seconds: Time interval (in seconds) between subsequent polls. timeout_seconds: Time (in seconds) to wait for operation to resolve. Raises: TimeoutError: If operation takes longer than `timeout_seconds` to resolve. """ started = now() while timeout_seconds is None or now() - started < timeout_seconds: if operation.status is None: return operation elif operation.status["state"] in [OperationState.PENDING, OperationState.RUNNING]: sleep(poll_interval_seconds) elif operation.status["state"] in [ OperationState.CANCELED, OperationState.SUCCEEDED, OperationState.FAILED, ]: return operation operation = operation.poll() raise TimeoutError(f"Waiting for operation took longer than {timeout_seconds} seconds.")
[docs]def monitor( operation: Operation, *, poll_interval_seconds: float = 1, timeout_seconds: float = 300 ) -> Operation: """Continuously polls for this operation's server-side state and returns operation when there is a state change Args: operation: Operation to be monitored. poll_interval_seconds: Time interval (in seconds) between subsequent polls. timeout_seconds: Time (in seconds) to wait for operation to resolve. Raises: TimeoutError: If operation takes longer than `timeout_seconds` to resolve. """ status = OperationState[operation.state] started = now() while timeout_seconds is None or now() - started < timeout_seconds: operation = operation.poll() new_status = OperationState[operation.state] if operation.status is None: return operation elif new_status == status: sleep(poll_interval_seconds) else: return operation raise TimeoutError(f"Waiting for operation took longer than {timeout_seconds} seconds.")
[docs]def safe_estimate_counts(project) -> Operation: """ Run the estimate counts job of project that works if it is the first Args: project: A Tamr project object Returns: An operation object for the estimate pairs job """ project = project.as_mastering() response = project.client.post(f"{project.api_path}/estimatedPairCounts:refresh") return Operation.from_response(project.client, response).wait()