Workflow¶
Run Backup Restore¶
"""Example script for managing backup and restore operations for a Tamr instance"""
import tamr_toolbox as tbox
# Read config, make Tamr Client, make logger
tamr = tbox.utils.client.create(username="user", password="pw", host="localhost")
LOGGER = tbox.utils.logger.create("my-script")
# List backups and create new backup
backups = tbox.workflow.backup.list_backups(tamr)
for tamr_backup in backups:
LOGGER.debug(tamr_backup)
# optional: delete old sparkEventLogs before backup to reduce backup size
LOGGER.info("Deleting old sparkEventLogs")
tbox.workflow.backup.delete_old_spark_event_logs("/home/ubuntu", num_days_to_keep=14)
LOGGER.info("About to run backup")
op = tbox.workflow.backup.initiate_backup(tamr)
backup_id = op.json()["relativeId"]
state = op.json()["state"]
LOGGER.info(f"Completed backup with state {state} and relative ID {backup_id}")
# Restore to previous backup file
LOGGER.info("About to run restore to backup file")
backup_id = "1" # update with the relativeID of your desired backup file
op = tbox.workflow.backup.initiate_restore(tamr, backup_id)
state = op.json()["state"]
LOGGER.info(f"Completed restore to backup file with ID {backup_id} with state {state}")
# Delete backups
result = tbox.workflow.backup.classify_backups("/path/to/my/backups/")
LOGGER.info(
f"Backup directory /home/ubuntu/tamr/backups contains {result['succeeded']} successful backups"
f"and {result['not_succeeded']} unsuccessful backups."
)
tbox.workflow.backup.delete_old_backups(
"/path/to/my/backups/", num_successful_backups_to_keep=1, num_failed_backups_to_keep=1
)
LOGGER.info(
f"Deleted {result['succeeded']-1} old successful backups and"
f"{result['not_succeeded']-1} unsuccessful backups."
)
Run Multiple Projects Sequentially¶
logging_dir: $TAMR_PROJECT_LOGGING_DIR # Example: "/home/users/jane/my-project/logs"
my_tamr_instance:
host: $TAMR_HOST # Example: "1.2.3.4"
protocol: "http"
port: "9100"
username: "admin"
password: $TAMR_PASSWORD # Example: "abc123"
projects:
my_mastering_project: "1"
my_golden_records_project: "2"
my_categorization_project: "3"
my_schema_mapping_project: "4"
"""Example script for running a pipeline consisting of multiple Tamr projects"""
import argparse
from typing import List, Dict
from tamr_unify_client.operation import Operation
import tamr_toolbox as tbox
def main(*, instance_connection_info: Dict[str, str], project_ids: List[str]) -> List[Operation]:
"""Runs the continuous steps of a multiple projects of any type
Args:
instance_connection_info: Information for connecting to Tamr (host, port, username etc)
project_ids: The ids of the target projects to run in order
Returns: List of jobs run
"""
# Create the tamr client
tamr_client = tbox.utils.client.create(**instance_connection_info)
# Retrieve the projects
my_projects = [tamr_client.projects.by_resource_id(p_id) for p_id in project_ids]
LOGGER.info(f"About to run projects: {[p.name for p in my_projects]}")
# runs the projects specifying to wait 30 seconds between jobs
operations = tbox.workflow.jobs.run(
my_projects, run_apply_feedback=False, run_estimate_pair_counts=False, sleep_interval=30
)
LOGGER.info(f"Tasks for {[p.name for p in my_projects]} complete")
return operations
if __name__ == "__main__":
# Set up command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--config", help="path to a YAML configuration file", required=False)
args = parser.parse_args()
# Load the configuration from the file path provided or the default file path specified
CONFIG = tbox.utils.config.from_yaml(
path_to_file=args.config, default_path_to_file="/path/to/my/conf/project.config.yaml"
)
# Use the configuration to create a global logger
LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])
# Run the main function
main(
instance_connection_info=CONFIG["my_tamr_instance"],
project_ids=[
CONFIG["projects"]["my_schema_mapping_project"],
CONFIG["projects"]["my_categorization_project"],
CONFIG["projects"]["my_mastering_project"],
CONFIG["projects"]["my_golden_records_project"],
],
)
Run Multiple Projects Concurrently¶
logging_dir: $TAMR_PROJECT_LOGGING_DIR # Example: "/home/users/jane/my-project/logs"
my_tamr_instance:
host: $TAMR_HOST # Example: "1.2.3.4"
protocol: "http"
port: "9100"
username: "admin"
password: $TAMR_PASSWORD # Example: "abc123"
projects:
my_mastering_project: "1"
my_golden_records_project: "2"
my_categorization_project: "3"
my_schema_mapping_project: "4"
"""Example script for running a pipeline consisting of multiple Tamr projects concurrently"""
import argparse
from typing import List, Dict
import tamr_toolbox as tbox
def main(*, instance_connection_info: Dict[str, str], project_ids: List[str]) -> None:
"""Runs the continuous steps of a multiple projects of any type
Args:
instance_connection_info: Information for connecting to Tamr (host, port, username etc)
project_ids: The ids of the target projects to run in order
Returns: List of jobs run
"""
# Create the tamr client
tamr_client = tbox.utils.client.create(**instance_connection_info)
# Retrieve the projects
my_projects = [tamr_client.projects.by_resource_id(p_id) for p_id in project_ids]
LOGGER.info(f"About to run build graph for projects: {[p.name for p in my_projects]}")
my_graph = tbox.workflow.concurrent.Graph.from_project_list(my_projects, tamr_client)
LOGGER.info(f"Building planner object")
my_planner = tbox.workflow.concurrent.Planner.from_graph(my_graph, tamr_client=tamr_client)
LOGGER.info(f"Executing concurrent workflow with 3 concurrent jobs")
my_planner = tbox.workflow.concurrent.Planner.execute(
my_planner, tamr_client, concurrency_level=3
)
plan_status = tbox.workflow.concurrent.PlanStatus.from_planner(my_planner)
LOGGER.info(f"Status after running plan {plan_status} complete")
if __name__ == "__main__":
# Set up command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--config", help="path to a YAML configuration file", required=False)
args = parser.parse_args()
# Load the configuration from the file path provided or the default file path specified
CONFIG = tbox.utils.config.from_yaml(
path_to_file=args.config, default_path_to_file="/path/to/my/conf/project.config.yaml"
)
# Use the configuration to create a global logger
LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])
# Run the main function
main(
instance_connection_info=CONFIG["my_tamr_instance"],
project_ids=[
CONFIG["projects"]["my_schema_mapping_project"],
CONFIG["projects"]["my_categorization_project"],
CONFIG["projects"]["my_mastering_project"],
CONFIG["projects"]["my_golden_records_project"],
],
)
Interactive Backup Cleanup¶
"""Example script for an interactive command prompt for Tamr backup management"""
from typing import Union
import argparse
from pathlib import Path
import os
from datetime import datetime
import pandas as pd
import tamr_toolbox as tbox
def main(
*, backup_directory: Union[Path, str], backup_datetime_format: str = "%Y-%m-%d_%H-%M-%S-%f"
) -> None:
"""Provides prompts to delete or keep files/directories in backup_directory
Args:
backup_directory: Path to backup directory
backup_datetime_format: String datetime format in backup folder name
"""
if not isinstance(backup_directory, Path):
backup_directory = Path(backup_directory)
backups = os.listdir(backup_directory)
backups = sorted(backups, reverse=True)
backups_to_delete = []
for backup_name in backups:
backup_path = backup_directory / backup_name
if tbox.workflow.backup.validate_backup(
backup_path, backup_datetime_format=backup_datetime_format
):
backup_time = datetime.strptime(backup_name, backup_datetime_format)
if os.path.exists(backup_path / "_SUCCEEDED"):
succeeded_file = pd.read_json(
backup_path / "_SUCCEEDED", orient="records", typ="series"
)
if succeeded_file["errorMessage"] is None:
response = input(
f"Backup {backup_name} completed successfully at {backup_time} "
f"with no error message. Would you like to delete (Y/N)?"
)
else:
response = input(
f"Backup {backup_name} completed successfully at {backup_time} "
f"with error message {succeeded_file['errorMessage']}. "
f"Would you like to delete (Y/N)?"
)
else:
response = input(
f"Backup {backup_name} failed at {backup_time}. "
f"Would you like to delete (Y/N)?"
)
while response not in ["Y", "y", "N", "n"]:
response = input("Invalid response please enter 'Y' or 'N'")
if response in ["Y", "y"]:
backups_to_delete.append(backup_name)
else:
response = input(
f"{backup_name} is an invalid backup. Would you like to delete (Y/N)?"
)
while response not in ["Y", "y", "N", "n"]:
response = input("Invalid response please enter 'Y' or 'N'")
if response in ["Y", "y"]:
tbox.filesystem.bash.remove_directories(
list(backup_path), allow_recursive_deletes=True
)
tbox.workflow.backup.delete_backups(
backups=backups_to_delete,
backup_directory=backup_directory,
backup_datetime_format=backup_datetime_format,
)
if __name__ == "__main__":
# Set up command line arguments
parser = argparse.ArgumentParser()
parser.add_argument("--backup_directory", help="path to a directory to cleanup", required=True)
parser.add_argument(
"--backup_datetime_format",
help="String datetime format in backup folder name",
required=False,
)
args = parser.parse_args()
# Run the main function
main(
backup_directory=args.backup_directory, backup_datetime_format=args.backup_datetime_format
)