Workflow

Run Backup Restore

"""Example script for managing backup and restore operations for a Tamr instance"""
import tamr_toolbox as tbox

# Read config, make Tamr Client, make logger
tamr = tbox.utils.client.create(username="user", password="pw", host="localhost")

LOGGER = tbox.utils.logger.create("my-script")

# List backups and create new backup
backups = tbox.workflow.backup.list_backups(tamr)
for tamr_backup in backups:
    LOGGER.debug(tamr_backup)

# optional: delete old sparkEventLogs before backup to reduce backup size
LOGGER.info("Deleting old sparkEventLogs")
tbox.workflow.backup.delete_old_spark_event_logs("/home/ubuntu", num_days_to_keep=14)

LOGGER.info("About to run backup")
op = tbox.workflow.backup.initiate_backup(tamr)
backup_id = op.json()["relativeId"]
state = op.json()["state"]
LOGGER.info(f"Completed backup with state {state} and relative ID {backup_id}")

# Restore to previous backup file
LOGGER.info("About to run restore to backup file")
backup_id = "1"  # update with the relativeID of your desired backup file
op = tbox.workflow.backup.initiate_restore(tamr, backup_id)
state = op.json()["state"]
LOGGER.info(f"Completed restore to backup file with ID {backup_id} with state {state}")

# Delete backups
result = tbox.workflow.backup.classify_backups("/path/to/my/backups/")
LOGGER.info(
    f"Backup directory /home/ubuntu/tamr/backups contains {result['succeeded']} successful backups"
    f"and {result['not_succeeded']} unsuccessful backups."
)
tbox.workflow.backup.delete_old_backups(
    "/path/to/my/backups/", num_successful_backups_to_keep=1, num_failed_backups_to_keep=1
)
LOGGER.info(
    f"Deleted {result['succeeded']-1} old successful backups and"
    f"{result['not_succeeded']-1} unsuccessful backups."
)

Run Multiple Projects Sequentially

logging_dir: $TAMR_PROJECT_LOGGING_DIR # Example: "/home/users/jane/my-project/logs"

my_tamr_instance:
    host: $TAMR_HOST # Example: "1.2.3.4"
    protocol: "http"
    port: "9100"
    username: "admin"
    password: $TAMR_PASSWORD  # Example: "abc123"

projects:
    my_mastering_project: "1"
    my_golden_records_project: "2"
    my_categorization_project: "3"
    my_schema_mapping_project: "4"
"""Example script for running a pipeline consisting of multiple Tamr projects"""
import argparse
from typing import List, Dict

from tamr_unify_client.operation import Operation

import tamr_toolbox as tbox


def main(*, instance_connection_info: Dict[str, str], project_ids: List[str]) -> List[Operation]:
    """Runs the continuous steps of a multiple projects of any type

    Args:
        instance_connection_info: Information for connecting to Tamr (host, port, username etc)
        project_ids: The ids of the target projects to run in order

    Returns: List of jobs run

    """

    # Create the tamr client
    tamr_client = tbox.utils.client.create(**instance_connection_info)

    # Retrieve the projects
    my_projects = [tamr_client.projects.by_resource_id(p_id) for p_id in project_ids]
    LOGGER.info(f"About to run projects: {[p.name for p in my_projects]}")

    operations = tbox.workflow.jobs.run(
        my_projects, run_apply_feedback=False, run_estimate_pair_counts=False
    )

    LOGGER.info(f"Tasks for {[p.name for p in my_projects]} complete")

    return operations


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/conf/project.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        instance_connection_info=CONFIG["my_tamr_instance"],
        project_ids=[
            CONFIG["projects"]["my_schema_mapping_project"],
            CONFIG["projects"]["my_categorization_project"],
            CONFIG["projects"]["my_mastering_project"],
            CONFIG["projects"]["my_golden_records_project"],
        ],
    )

Run Multiple Projects Concurrently

logging_dir: $TAMR_PROJECT_LOGGING_DIR # Example: "/home/users/jane/my-project/logs"

my_tamr_instance:
    host: $TAMR_HOST # Example: "1.2.3.4"
    protocol: "http"
    port: "9100"
    username: "admin"
    password: $TAMR_PASSWORD  # Example: "abc123"

projects:
    my_mastering_project: "1"
    my_golden_records_project: "2"
    my_categorization_project: "3"
    my_schema_mapping_project: "4"
"""Example script for running a pipeline consisting of multiple Tamr projects concurrently"""
import argparse
from typing import List, Dict

import tamr_toolbox as tbox


def main(*, instance_connection_info: Dict[str, str], project_ids: List[str]) -> None:
    """Runs the continuous steps of a multiple projects of any type

    Args:
        instance_connection_info: Information for connecting to Tamr (host, port, username etc)
        project_ids: The ids of the target projects to run in order

    Returns: List of jobs run

    """

    # Create the tamr client
    tamr_client = tbox.utils.client.create(**instance_connection_info)

    # Retrieve the projects
    my_projects = [tamr_client.projects.by_resource_id(p_id) for p_id in project_ids]
    LOGGER.info(f"About to run build graph for projects: {[p.name for p in my_projects]}")
    my_graph = tbox.workflow.concurrent.Graph.from_project_list(my_projects)

    LOGGER.info(f"Building planner object")
    my_planner = tbox.workflow.concurrent.Planner.from_graph(my_graph, tamr_client=tamr_client)

    LOGGER.info(f"Executing concurrent workflow with 3 concurrent jobs")
    my_planner = tbox.workflow.concurrent.Planner.execute(
        my_planner, tamr_client, concurrency_level=3
    )

    plan_status = tbox.workflow.concurrent.PlanStatus.from_planner(my_planner)
    LOGGER.info(f"Status after running plan {plan_status} complete")


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/conf/project.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        instance_connection_info=CONFIG["my_tamr_instance"],
        project_ids=[
            CONFIG["projects"]["my_schema_mapping_project"],
            CONFIG["projects"]["my_categorization_project"],
            CONFIG["projects"]["my_mastering_project"],
            CONFIG["projects"]["my_golden_records_project"],
        ],
    )

Interactive Backup Cleanup

"""Example script for an interactive command prompt for Tamr backup management"""
from typing import Union
import argparse
from pathlib import Path
import os
from datetime import datetime
import pandas as pd

import tamr_toolbox as tbox


def main(
    *, backup_directory: Union[Path, str], backup_datetime_format: str = "%Y-%m-%d_%H-%M-%S-%f"
) -> None:
    """Provides prompts to delete or keep files/directories in backup_directory
    Args:
        backup_directory: Path to backup directory
        backup_datetime_format: String datetime format in backup folder name

    """
    if not isinstance(backup_directory, Path):
        backup_directory = Path(backup_directory)
    backups = os.listdir(backup_directory)

    backups = sorted(backups, reverse=True)

    backups_to_delete = []
    for backup_name in backups:
        backup_path = backup_directory / backup_name
        if tbox.workflow.backup.validate_backup(
            backup_path, backup_datetime_format=backup_datetime_format
        ):
            backup_time = datetime.strptime(backup_name, backup_datetime_format)
            if os.path.exists(backup_path / "_SUCCEEDED"):
                succeeded_file = pd.read_json(
                    backup_path / "_SUCCEEDED", orient="records", typ="series"
                )
                if succeeded_file["errorMessage"] is None:
                    response = input(
                        f"Backup {backup_name} completed successfully at {backup_time} "
                        f"with no error message. Would you like to delete (Y/N)?"
                    )
                else:
                    response = input(
                        f"Backup {backup_name} completed successfully at {backup_time} "
                        f"with error message {succeeded_file['errorMessage']}. "
                        f"Would you like to delete (Y/N)?"
                    )

            else:
                response = input(
                    f"Backup {backup_name} failed at {backup_time}. "
                    f"Would you like to delete (Y/N)?"
                )
            while response not in ["Y", "y", "N", "n"]:
                response = input("Invalid response please enter 'Y' or 'N'")
            if response in ["Y", "y"]:
                backups_to_delete.append(backup_name)
        else:
            response = input(
                f"{backup_name} is an invalid backup. Would you like to delete (Y/N)?"
            )
            while response not in ["Y", "y", "N", "n"]:
                response = input("Invalid response please enter 'Y' or 'N'")
            if response in ["Y", "y"]:
                tbox.filesystem.bash.remove_directories(
                    list(backup_path), allow_recursive_deletes=True
                )
    tbox.workflow.backup.delete_backups(
        backups=backups_to_delete,
        backup_directory=backup_directory,
        backup_datetime_format=backup_datetime_format,
    )


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--backup_directory", help="path to a directory to cleanup", required=True)
    parser.add_argument(
        "--backup_datetime_format",
        help="String datetime format in backup folder name",
        required=False,
    )
    args = parser.parse_args()

    # Run the main function
    main(
        backup_directory=args.backup_directory, backup_datetime_format=args.backup_datetime_format
    )