Source code for tamr_toolbox.project.mastering.jobs

"""Tasks related to running jobs for Tamr Mastering projects"""
from typing import List
import logging

from tamr_unify_client.mastering.project import MasteringProject
from tamr_unify_client.operation import Operation
from tamr_toolbox.realtime.matching import update_realtime_match_data

from tamr_toolbox.models.project_type import ProjectType
from tamr_toolbox.utils import operation

LOGGER = logging.getLogger(__name__)


def _run_custom(
    project: MasteringProject,
    *,
    run_update_unified_dataset: bool = False,
    run_estimate_pair_counts: bool = False,
    run_generate_pairs: bool = False,
    run_apply_feedback: bool = False,
    run_update_pair_results: bool = False,
    run_update_high_impact_pairs: bool = False,
    run_update_cluster_results: bool = False,
    run_publish_clusters: bool = False,
    run_update_realtime_match: bool = False,
    process_asynchronously: bool = False,
) -> List[Operation]:
    """Executes specified steps of a mastering project.

    Args:
        project: The target mastering project
        run_update_unified_dataset: Whether refresh should be called on the unified dataset
        run_estimate_pair_counts: Whether an estimate pairs job should be run
        run_generate_pairs: Whether refresh should be called on the pairs dataset
        run_apply_feedback: Whether train should be called on the pair matching model
        run_update_pair_results: Whether predict should be called on the pair matching model
        run_update_high_impact_pairs: Whether refresh should be called on the high impact pairs
            dataset
        run_update_cluster_results: Whether refresh should be called on the record clusters dataset
        run_publish_clusters: Whether refresh should be called on the published record clusters
            dataset
        run_update_realtime_match: Whether to update match data with latest published clusters
        process_asynchronously: Whether or not to wait for the job to finish before returning
            - must be set to True for concurrent workflow

    Returns:
        The operations that were run

    Raises:
        TypeError: if the `project` is not a Mastering project
    """
    if ProjectType[project.type] != ProjectType.DEDUP:
        error_msg = f"Cannot use as a mastering project. Project type: {project.type}"
        LOGGER.error(error_msg)
        raise TypeError(error_msg)
    else:
        project = project.as_mastering()

    completed_operations = []
    if run_update_unified_dataset:
        LOGGER.info(
            f"Updating the unified dataset for project {project.name} (id={project.resource_id})."
        )
        op = project.unified_dataset().refresh(asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)

    # Run pair-related operations
    completed_operations.extend(
        _run_custom_pair_operations(
            project,
            run_estimate_pair_counts=run_estimate_pair_counts,
            run_generate_pairs=run_generate_pairs,
            run_apply_feedback=run_apply_feedback,
            run_update_pair_results=run_update_pair_results,
            run_update_high_impact_pairs=run_update_high_impact_pairs,
            process_asynchronously=process_asynchronously,
        )
    )

    if run_update_cluster_results:
        LOGGER.info(
            f"Updating cluster prediction results for project {project.name} "
            f"(id={project.resource_id})."
        )
        op = project.record_clusters().refresh(asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)
    if run_publish_clusters:
        LOGGER.info(f"Publishing clusters for project {project.name} (id={project.resource_id}).")
        op = project.published_clusters().refresh(asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)
    if run_update_realtime_match:
        LOGGER.info(
            f"Updating match database for project {project.name} (id={project.resource_id})."
        )
        op = update_realtime_match_data(project=project, asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)

    return completed_operations


def _run_custom_pair_operations(
    project: MasteringProject,
    *,
    run_estimate_pair_counts: bool = False,
    run_generate_pairs: bool = False,
    run_apply_feedback: bool = False,
    run_update_pair_results: bool = False,
    run_update_high_impact_pairs: bool = False,
    process_asynchronously: bool = False,
) -> List[Operation]:
    """Executes specified steps of a mastering project.

    Args:
        project: The target mastering project
        run_estimate_pair_counts: Whether an estimate pairs job should be run
        run_generate_pairs: Whether refresh should be called on the pairs dataset
        run_apply_feedback: Whether train should be called on the pair matching model
        run_update_pair_results: Whether predict should be called on the pair matching model
        run_update_high_impact_pairs: Whether refresh should be called on the high impact pairs
            dataset
        process_asynchronously: Whether or not to wait for the job to finish before returning
            - must be set to True for concurrent workflow

    Returns:
        The operations that were run

    """

    completed_operations = []
    if run_estimate_pair_counts:
        LOGGER.info(f"Estimate pair counts for project {project.name} (id={project.resource_id}).")
        op = operation.safe_estimate_counts(project)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)
    if run_generate_pairs:
        LOGGER.info(f"Generating pairs for project {project.name} (id={project.resource_id}).")
        op = project.pairs().refresh(asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)
    if run_apply_feedback:
        LOGGER.info(
            f"Applying feedback to the pairs model for project {project.name} "
            f"(id={project.resource_id})."
        )
        op = project.pair_matching_model().train(asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)
    if run_update_pair_results:
        LOGGER.info(
            f"Updating pair prediction results for project {project.name} "
            f"(id={project.resource_id})."
        )
        op = project.pair_matching_model().predict(asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)
    if run_update_high_impact_pairs:
        LOGGER.info(
            f"Refreshing high impact pairs for project {project.name} (id={project.resource_id})."
        )
        op = project.high_impact_pairs().refresh(asynchronous=process_asynchronously)
        if not process_asynchronously:
            operation.enforce_success(op)
        completed_operations.append(op)

    return completed_operations


[docs]def run( project: MasteringProject, *, run_estimate_pair_counts: bool = False, run_apply_feedback: bool = False, run_update_realtime_match: bool = False, process_asynchronously: bool = False, ) -> List[Operation]: """Run the existing pipeline without training Args: project: Target mastering project run_estimate_pair_counts: Whether an estimate pairs job should be run run_apply_feedback: Whether train should be called on the pair matching model run_update_realtime_match: Whether to update RealTime match data after publishing clusters process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=True, run_estimate_pair_counts=run_estimate_pair_counts, run_generate_pairs=True, run_apply_feedback=run_apply_feedback, run_update_pair_results=True, run_update_high_impact_pairs=True, run_update_cluster_results=True, run_publish_clusters=True, run_update_realtime_match=run_update_realtime_match, process_asynchronously=process_asynchronously, )
[docs]def update_unified_dataset( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """Updates the unified dataset for a mastering project Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=True, run_estimate_pair_counts=False, run_generate_pairs=False, run_apply_feedback=False, run_update_pair_results=False, run_update_cluster_results=False, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def estimate_pair_counts( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """Estimates the number of pairs for a mastering project Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=True, run_generate_pairs=False, run_apply_feedback=False, run_update_pair_results=False, run_update_cluster_results=False, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def generate_pairs( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """Generates the pairs for a mastering project Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=False, run_generate_pairs=True, run_apply_feedback=False, run_update_pair_results=False, run_update_cluster_results=False, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def apply_feedback( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """ Applies feedback to update the model for a mastering project Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=False, run_generate_pairs=False, run_apply_feedback=True, run_update_pair_results=False, run_update_cluster_results=False, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def update_pair_predictions( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """ Updates pair predictions only. Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=False, run_generate_pairs=False, run_apply_feedback=False, run_update_pair_results=True, run_update_cluster_results=False, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def update_clusters( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """ Re-runs clustering only. Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=False, run_generate_pairs=False, run_apply_feedback=False, run_update_pair_results=False, run_update_cluster_results=True, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def apply_feedback_and_update_results( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """Trains the model, predicts the pair labels, and updates the draft clusters of a mastering project Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=False, run_generate_pairs=False, run_apply_feedback=True, run_update_pair_results=True, run_update_high_impact_pairs=True, run_update_cluster_results=True, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def update_results_only( project: MasteringProject, *, process_asynchronously: bool = False ) -> List[Operation]: """Predicts the pair labels based on the existing pair model and updates the draft clusters of a mastering project Args: project: Target mastering project process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=False, run_generate_pairs=False, run_apply_feedback=False, run_update_pair_results=True, run_update_high_impact_pairs=True, run_update_cluster_results=True, run_publish_clusters=False, process_asynchronously=process_asynchronously, )
[docs]def publish_clusters( project: MasteringProject, *, run_update_realtime_match: bool = False, process_asynchronously: bool = False, ) -> List[Operation]: """Publishes the clusters of a mastering project Args: project: Target mastering project run_update_realtime_match: whether to update RealTime match data after publishing clusters process_asynchronously: Whether or not to wait for the job to finish before returning - must be set to True for concurrent workflow Returns: The operations that were run """ return _run_custom( project, run_update_unified_dataset=False, run_estimate_pair_counts=False, run_generate_pairs=False, run_apply_feedback=False, run_update_pair_results=False, run_update_cluster_results=False, run_publish_clusters=True, run_update_realtime_match=run_update_realtime_match, process_asynchronously=process_asynchronously, )