Source code for tamr_toolbox.workflow.concurrent.PlanNode

from dataclasses import dataclass, field
from dataclasses_json import dataclass_json
from typing import List, Union
import time
import logging

from tamr_unify_client.project.resource import Project
from tamr_unify_client.operation import Operation
from tamr_toolbox.workflow.concurrent.PlanNodeStatus import PlanNodeStatus, from_plan_node
from tamr_toolbox.models.project_type import ProjectType
from tamr_toolbox.models.project_steps import (
    SchemaMappingSteps,
    CategorizationSteps,
    MasteringSteps,
    GoldenRecordsSteps,
)
from tamr_toolbox.project import schema_mapping, categorization, mastering, golden_records

LOGGER = logging.getLogger(__name__)

WORKFLOW_MAP = {
    ProjectType.SCHEMA_MAPPING_RECOMMENDATIONS: {
        SchemaMappingSteps.UPDATE_UNIFIED_DATASET: schema_mapping.jobs.update_unified_dataset
    },
    ProjectType.DEDUP: {
        MasteringSteps.UPDATE_UNIFIED_DATASET: mastering.jobs.update_unified_dataset,
        MasteringSteps.GENERATE_PAIRS: mastering.jobs.generate_pairs,
        MasteringSteps.APPLY_FEEDBACK: mastering.jobs.apply_feedback,
        MasteringSteps.UPDATE_HIGH_IMPACT_PAIRS: mastering.jobs.update_pair_predictions,
        MasteringSteps.UPDATE_CLUSTERS: mastering.jobs.update_clusters,
        MasteringSteps.PUBLISH_CLUSTERS: mastering.jobs.publish_clusters,
    },
    ProjectType.CATEGORIZATION: {
        CategorizationSteps.UPDATE_UNIFIED_DATASET: categorization.jobs.update_unified_dataset,
        CategorizationSteps.APPLY_FEEDBACK: categorization.jobs.apply_feedback,
        CategorizationSteps.UPDATE_RESULTS_ONLY: categorization.jobs.update_results_only,
    },
    ProjectType.GOLDEN_RECORDS: {
        GoldenRecordsSteps.PROFILE_GOLDEN_RECORDS: golden_records.jobs.update_profiling_info,
        GoldenRecordsSteps.UPDATE_GOLDEN_RECORDS: golden_records.jobs.update_golden_records,
        GoldenRecordsSteps.PUBLISH_GOLDEN_RECORDS: golden_records.jobs.publish_golden_records,
    },
}


[docs]@dataclass_json @dataclass class PlanNode: """ Dataclass for the node of a Planner object - contains method for setting list of operations based on project type """ name: str operations: List[Operation] project: Project priority: int current_op: Operation status: PlanNodeStatus = PlanNodeStatus.PLANNED train: bool = False project_type: ProjectType = field(init=False) project_steps: Union[ List[SchemaMappingSteps], List[MasteringSteps], List[GoldenRecordsSteps], List[CategorizationSteps], ] = field(init=False) current_step: Union[ SchemaMappingSteps, MasteringSteps, CategorizationSteps, GoldenRecordsSteps ] = None steps_to_run: Union[ List[SchemaMappingSteps], List[MasteringSteps], List[GoldenRecordsSteps], List[CategorizationSteps], ] = None def __post_init__(self): # set the project type self.project_type = ProjectType[self.project.type] # then set project steps based on project type # first for schema mapping projects if self.project_type == ProjectType.SCHEMA_MAPPING_RECOMMENDATIONS: self.project_steps = [SchemaMappingSteps.UPDATE_UNIFIED_DATASET] # now for mastering projects elif self.project_type == ProjectType.DEDUP: if self.train: self.project_steps = [ MasteringSteps.UPDATE_UNIFIED_DATASET, MasteringSteps.GENERATE_PAIRS, MasteringSteps.APPLY_FEEDBACK, MasteringSteps.UPDATE_HIGH_IMPACT_PAIRS, MasteringSteps.UPDATE_CLUSTERS, MasteringSteps.PUBLISH_CLUSTERS, ] else: self.project_steps = [ MasteringSteps.UPDATE_UNIFIED_DATASET, MasteringSteps.GENERATE_PAIRS, MasteringSteps.UPDATE_HIGH_IMPACT_PAIRS, MasteringSteps.UPDATE_CLUSTERS, MasteringSteps.PUBLISH_CLUSTERS, ] # now for categorization projects elif self.project_type == ProjectType.CATEGORIZATION: if self.train: self.project_steps = [ CategorizationSteps.UPDATE_UNIFIED_DATASET, CategorizationSteps.APPLY_FEEDBACK, CategorizationSteps.UPDATE_RESULTS_ONLY, ] else: self.project_steps = [ CategorizationSteps.UPDATE_UNIFIED_DATASET, CategorizationSteps.UPDATE_RESULTS_ONLY, ] # finally for golden record projects elif self.project_type == ProjectType.GOLDEN_RECORDS: self.project_steps = [ GoldenRecordsSteps.PROFILE_GOLDEN_RECORDS, GoldenRecordsSteps.UPDATE_GOLDEN_RECORDS, GoldenRecordsSteps.PUBLISH_GOLDEN_RECORDS, ] else: raise NotImplementedError( f"Don't know how to create a PlanNode object for project type {self.project_type}" )
[docs]def poll(plan_node: PlanNode) -> PlanNode: """ Polls the status of the current_op object and returns a new PlanNode with updated status Args: plan_node: the PlanNode object to poll Returns: Copy of the original plan node with status updated based on the status of the current_op """ # get current op and see if it is None (i.e. if plannode hasn't been triggered) current_op = plan_node.current_op if current_op is None: # if this node hasn't been triggered just return the current status in case # it has been updated by upstream actions updated_plan_node_status = plan_node.status updated_current_op = None updated_operations = None else: # current op exists so update it updated_current_op = plan_node.current_op.poll() # also update the list of all ops - rigamarole is to avoid updating current op twice # since it already exists in plan_node.operations list updated_operations = [x.poll() for x in plan_node.operations if x != current_op] updated_operations.append(updated_current_op) # if the op status changed set the plan node's current op to the updated one # and use from_plan_node to capture logic around in progress if updated_current_op.state != current_op.state: # update the current op plan_node.current_op = updated_current_op plan_node.operations = updated_operations updated_plan_node_status = from_plan_node(plan_node) # if the op didn't change status neither will the node so just pass through else: updated_plan_node_status = plan_node.status return PlanNode( name=plan_node.name, operations=updated_operations, project=plan_node.project, priority=plan_node.priority, current_op=updated_current_op, status=updated_plan_node_status, current_step=plan_node.current_step, steps_to_run=plan_node.steps_to_run, )
[docs]def run_next_step(plan_node: PlanNode) -> PlanNode: """ Takes a plan node and runs the project Args: plan_node: the node to run Returns: updated plan node """ # if current_step is None this node has never been run, # so set it and the steps_to_run from project_steps current_step = plan_node.current_step if current_step is None: steps_to_run = plan_node.project_steps[1:] next_step = plan_node.project_steps[0] # else we are on step 2+ so user and trim list of steps_to_run else: LOGGER.debug(f"plan node in trigger next step: {plan_node}") next_step = plan_node.steps_to_run[0] steps_to_run = plan_node.steps_to_run[1:] LOGGER.info(f"running step {next_step.value} for project {plan_node.name}") # We only call methods that return a list with one and only one operation current_op = WORKFLOW_MAP[plan_node.project_type][next_step]( plan_node.project, process_asynchronously=True )[0] # handle case where operations is empty list (nothing has been run) if plan_node.operations is None: operations_list = [current_op] else: operations_list = [x for x in plan_node.operations] if current_op not in operations_list: operations_list.append(current_op) # don't forget to update the status - and first update the current op and operations list plan_node.current_op = current_op plan_node.operations = operations_list plan_node.current_step = next_step status = from_plan_node(plan_node) return PlanNode( name=plan_node.name, operations=operations_list, current_op=current_op, priority=plan_node.priority, project=plan_node.project, steps_to_run=steps_to_run, status=status, current_step=next_step, )
[docs]def monitor( nodes: List[PlanNode], *, timeout: int = 12, polling_interval: int = 30 ) -> List[PlanNode]: """ Monitors the status of a list of PlanNodes, when one on that list changes to either failed/succeeded/cancelled it returns the list Args: nodes: list of nodes to monitor timeout: number of hours to poll before timing out for change in job status polling_interval: the amount of time in seconds to wait between polling Returns: the same list of planNodes with updated statuses """ # if empty then return immediate and log a warning if not nodes: LOGGER.warning("Jobs list is empty! Something is probably wrong.") return [] start_time = time.time() timeout_in_seconds = 3600 * timeout # get initial statuses initial_statuses = {x.name: x.status for x in nodes} # get project names for logging running_projects = "\n".join([x.name for x in nodes]) LOGGER.info( f"starting to monitor projects:\n {running_projects}\n with timeout of {timeout} hours" ) # poll only as long as we haven't gotten past timeout while (time.time() - start_time) < timeout_in_seconds: # sleep at beginning of loop for polling interval time.sleep(polling_interval) # poll the jobs and get updated nodes updated_nodes = [poll(x) for x in nodes] # generate updated statuses updated_statuses = {x.name: x.status for x in updated_nodes} # if status has changed return list if initial_statuses != updated_statuses: return updated_nodes # if we got here it timed out and raise runtime error error_message = f"Monitoring jobs:\n {running_projects}\n timed out after {timeout} hours!" LOGGER.error(error_message) raise RuntimeError(error_message)