"""Tasks related to backup and restore of Tamr instances"""
import logging
import requests
from tamr_unify_client import Client
from tamr_toolbox.filesystem import bash
from tamr_toolbox.models.data_type import JsonDict
from tamr_toolbox import utils
from typing import Generator, Union, Optional, List
from datetime import datetime
import os
import json
from pathlib import Path
LOGGER = logging.getLogger(__name__)
[docs]def list_backups(client: Client) -> Generator[JsonDict, None, None]:
"""Lists all backups available to Tamr client. Will list both succeeded and failed backups.
Args:
client: A client object
Returns:
A generator of json dict objects for the backups available to client."""
response = client.get("backups")
for backup in response.json():
yield backup
[docs]def get_backup_by_id(client: Client, backup_id: str) -> JsonDict:
"""Fetches the json object for a given backup ID.
Args:
client: A Tamr client object.
backup_id: The relativeID corresponding to the desired backup.
Returns:
Json dict corresponding to the desired backup.
Raises:
ValueError: Raised if GET request to Tamr fails
"""
api_string = f"backups/{backup_id}"
response = client.get(api_string)
if not response.ok:
message = (
f"Received non-200 response code '{response.status_code}' "
f"with message '{response.json()['message']}': '{response.json()}'"
)
LOGGER.error(message)
raise ValueError(message)
return response.json()
[docs]def initiate_backup(
client: Client,
*,
poll_interval_seconds: int = 30,
polling_timeout_seconds: Optional[int] = None,
connection_retry_timeout_seconds: int = 600,
) -> requests.Response:
"""Runs a backup of Tamr client and waits until it is finished.
Args:
client: A Tamr client object
poll_interval_seconds: Amount of time in between polls of job state.
polling_timeout_seconds: Amount of time before a timeout error is thrown.
connection_retry_timeout_seconds: Amount of time before timeout error is thrown during
connection retry
Returns:
Json dict of response from API request."""
response = client.post("backups")
if not response.ok:
message = f"Received non-200 response code '{response.status_code}': {response.json()}"
LOGGER.error(message)
raise RuntimeError(message)
backup_id = response.json()["relativeId"]
op = utils.client.poll_endpoint(
client=client,
api_endpoint=f"backups/{backup_id}",
poll_interval_seconds=poll_interval_seconds,
polling_timeout_seconds=polling_timeout_seconds,
connection_retry_timeout_seconds=connection_retry_timeout_seconds,
)
return op
[docs]def initiate_restore(
client: Client,
backup_id: str,
*,
polling_timeout_seconds: Optional[int] = None,
poll_interval_seconds: int = 30,
connection_retry_timeout_seconds: int = 600,
) -> requests.Response:
"""Restores the Tamr client to the state of the supplied backup.
Args:
client: A Tamr client object
backup_id: BackupId of the desired backup.
polling_timeout_seconds: Amount of time before a timeout error is thrown.
poll_interval_seconds: Amount of time in between polls of job state.
connection_retry_timeout_seconds: Amount of time before timeout error is thrown during
connection retry
Returns:
Json dict of response from API request.
Raises:
ValueError: Raised if the target backup contains errors
RuntimeError: Raised if the restore fails to start
"""
backup = get_backup_by_id(client=client, backup_id=backup_id)
backup_state = backup["state"]
if not backup_state == "SUCCEEDED":
value_error_message1 = (
f"Backup file with ID {backup_id} did not succeed and has status {backup_state}"
)
raise ValueError(value_error_message1)
error_message = backup["errorMessage"]
if not error_message == "":
value_error_message2 = (
f"Backup file with ID {backup_id} contains non-null error message {error_message}"
)
LOGGER.error(value_error_message2)
raise ValueError(value_error_message2)
response = client.post("instance/restore", data=backup_id)
if not response.ok:
runtime_error_message = (
f"Received non-200 response code '{response.status_code}' : {response.json()}"
)
LOGGER.error(runtime_error_message)
raise RuntimeError(runtime_error_message)
op = utils.client.poll_endpoint(
client=client,
api_endpoint="instance/restore",
poll_interval_seconds=poll_interval_seconds,
polling_timeout_seconds=polling_timeout_seconds,
connection_retry_timeout_seconds=connection_retry_timeout_seconds,
)
return op
[docs]def validate_backup(
directory: Union[Path, str], *, backup_datetime_format: str = "%Y-%m-%d_%H-%M-%S-%f"
) -> bool:
"""Validates that a directory is a valid backup. A valid backup has a
manifest file, a completion file (_SUCCEEDED, _FAILED, or _CANCELED),
the folder has a valid date format, and the date is prior to the current time
Args:
directory: path to backup directory
backup_datetime_format: String datetime format in backup folder name
Returns:
True if directory is a valid backup, otherwise False.
"""
if not isinstance(directory, Path):
directory = Path(directory)
backup_name = directory.stem
if not os.path.exists(directory / "manifest.json") and not os.path.exists(
directory / "manifest.yaml"
):
LOGGER.warning(
f"The following directory '{directory}' is not a valid Tamr Backup. "
f"Neither manifest.json nor manifest.yaml exist."
)
return False
if not (
any([os.path.exists(directory / f"_{x}") for x in ["SUCCEEDED", "CANCELED", "FAILED"]])
):
LOGGER.warning(
f"The following directory '{directory}' is not a valid Tamr Backup "
f"A SUCCEEDED, CANCELED, or FAILED completion file does not exist."
)
return False
try:
time = datetime.strptime(backup_name, backup_datetime_format)
except ValueError:
LOGGER.warning(
f"The following directory '{directory}' is not a valid Tamr Backup "
f"The directory name {backup_name} is not a valid datetime"
)
return False
if datetime.now() > time > datetime(2010, 1, 1, 0, 0, 0, 0):
return True
else:
LOGGER.warning(
f"The following directory '{directory}' is not a valid "
f"Tamr Backup the datetime represents an invalid time {time}."
)
return False
[docs]def delete_backups(
*,
backups: List[str],
backup_directory: Union[Path, str],
backup_datetime_format: str = "%Y-%m-%d_%H-%M-%S-%f",
) -> List[str]:
"""Deletes backup folders recursively.
Args:
backups: list of backups to delete
backup_directory: Path to backup directory
backup_datetime_format: String datetime format in backup folder name
Returns:
list of deleted backup names
"""
if not isinstance(backup_directory, Path):
backup_directory = Path(backup_directory)
backups_to_delete = sorted(backups, reverse=True)
LOGGER.info(f"Deleting {len(backups_to_delete)} backup(s): '{backups_to_delete}'")
deleted_backups = []
for backup in backups:
backup_path = os.path.join(backup_directory, backup)
if validate_backup(Path(backup_path), backup_datetime_format=backup_datetime_format):
bash.remove_directories([backup_path], allow_recursive_deletes=True)
deleted_backups.append(backup)
else:
LOGGER.error(f"{backup} is not a valid backup to delete.")
return deleted_backups
[docs]def classify_backups(
backup_directory: Union[Path, str], *, backup_datetime_format: str = "%Y-%m-%d_%H-%M-%S-%f"
) -> JsonDict:
"""Takes stock of successful and failed valid backups in the backup directory.
Args:
backup_directory: Path to backup directory
backup_datetime_format: String datetime format in backup folder name
Returns:
JSON dict with the keys "successful" (List of successful backups) and
"not_successful" (List of failed or cancelled backups)
Raises:
ValueError: if target backup file contains an error message
"""
if not isinstance(backup_directory, Path):
backup_directory = Path(backup_directory)
backups = os.listdir(backup_directory)
LOGGER.info(f"Found '{len(backups)}' entries in the directory {str(backup_directory)}")
not_succeeded = []
succeeded = []
for backup in backups:
if validate_backup(
backup_directory / backup, backup_datetime_format=backup_datetime_format
):
backup_path = backup_directory / backup
if os.path.exists(backup_path / "_SUCCEEDED"):
succeeded.append(backup)
with open(backup_path / "_SUCCEEDED") as f:
succeeded_file = json.load(f)
if not succeeded_file["errorMessage"] is None:
raise ValueError(
f"Successful backup file {backup_path}/_SUCCEEDED contains non-null "
f"error message '{succeeded_file['errorMessage']}'"
)
else:
not_succeeded.append(backup)
return {"succeeded": succeeded, "not_succeeded": not_succeeded}
[docs]def delete_old_backups(
backup_directory: Union[Path, str],
*,
num_successful_backups_to_keep: int,
num_failed_backups_to_keep: int,
backup_datetime_format: str = "%Y-%m-%d_%H-%M-%S-%f",
) -> Optional[List[JsonDict]]:
"""Deletes old backups. Keeps the most recent num_successful_backups_to_keep successful backups
and the most recent num_failed_backups_to_keep failed backups
Args:
backup_directory: Path to backup directory
num_successful_backups_to_keep: Number of successful backups to keep
num_failed_backups_to_keep: Number of failed or canceled backups to keep
backup_datetime_format: String datetime format in backup folder name
Returns:
A list of deleted backups. Returns None if no backups are deleted.
Raises:
ValueError: if the number of backups to keep is less than 0
"""
if not isinstance(backup_directory, Path):
backup_directory = Path(backup_directory)
if num_successful_backups_to_keep < 0:
value_error_message_1 = (
f"Argument for num_successful_backups_to_keep must be greater than 0. "
f"Found {num_successful_backups_to_keep}"
)
LOGGER.error(value_error_message_1)
raise ValueError(value_error_message_1)
if num_failed_backups_to_keep < 0:
value_error_message_2 = (
f"Argument for num_failed_backups_to_keep must be greater than 0. "
f"Found '{num_failed_backups_to_keep}'"
)
LOGGER.error(value_error_message_2)
raise ValueError(value_error_message_2)
LOGGER.info(f"Fetching all backups from directory '{backup_directory}'")
result = classify_backups(backup_directory, backup_datetime_format=backup_datetime_format)
succeeded = result["succeeded"]
not_succeeded = result["not_succeeded"]
if (len(succeeded) < 1) and (len(not_succeeded) < 1):
LOGGER.info(f"Found zero backups in directory '{backup_directory}'")
return None
else:
deleted_failed_backups = []
deleted_successful_backups = []
if len(succeeded) <= num_successful_backups_to_keep:
LOGGER.info(
f"Found {len(succeeded)} successful backup. Keeping all successful backups"
)
else:
LOGGER.info(
f"Removing {len(succeeded) - num_successful_backups_to_keep} successful backups"
)
succeeded = sorted(succeeded, reverse=False)
LOGGER.warning(
f"backups {succeeded} from directory {backup_directory} "
f"which is {type(backup_directory)}"
)
deleted_successful_backups = delete_backups(
backups=succeeded[: len(succeeded) - num_successful_backups_to_keep],
backup_directory=backup_directory,
backup_datetime_format=backup_datetime_format,
)
if len(not_succeeded) <= num_failed_backups_to_keep:
LOGGER.info(f"Found {len(not_succeeded)} failed backups. Keeping all failed backups")
else:
LOGGER.info(
f"Removing {len(not_succeeded) - num_failed_backups_to_keep} unsuccessful backups"
)
not_succeeded = sorted(not_succeeded, reverse=False)
deleted_failed_backups = delete_backups(
backups=not_succeeded[: len(not_succeeded) - num_failed_backups_to_keep],
backup_directory=backup_directory,
backup_datetime_format=backup_datetime_format,
)
return deleted_failed_backups + deleted_successful_backups
[docs]def delete_old_spark_event_logs(
tamr_home_directory: Union[Path, str], *, num_days_to_keep: int = 14
) -> List[str]:
"""
Deletes sparkEventLogs older than the specified number of days.
This assumes that Spark is running locally on the same VM as Tamr
and that the logs are on the local filesystem.
Args:
tamr_home_directory: Path to the Tamr home directory
num_days_to_keep: Number of days for which to keep logs
Returns:
A list of deleted sparkEventLogs files
Raises:
ValueError: if num_days_to_keep is less than 0
FileNotFoundError: if sparkEventLogs directory doesn't exist
"""
spark_event_log_directory = os.path.join(
tamr_home_directory, "tamr/unify-data/job/sparkEventLogs"
)
if not os.path.exists(spark_event_log_directory):
message = f"directory does not exist: {spark_event_log_directory}"
LOGGER.error(message)
raise FileNotFoundError(message)
return bash.delete_old_files(spark_event_log_directory, num_days_to_keep=num_days_to_keep)