"""Tasks related to editing the taxonomy for a tamr categorization project"""
import logging
import os
from tamr_unify_client import Client
import json
import copy
LOGGER = logging.getLogger(__name__)
# Building our documentation requires access to all dependencies, including optional ones
# This environments variable is set automatically when `invoke docs` is used
BUILDING_DOCS = os.environ.get("TAMR_TOOLBOX_DOCS") == "1"
if BUILDING_DOCS:
# Import relevant optional dependencies
import pandas as pd
def _get_children_nodes(all_categories: list, target_node: str):
"""
Gets all the children nodes for a given node ID from the full taxonomy
Args:
all_categories: List of all categories of the taxonomy returned by categories API
target_node: Full internal ID of target node to get children for
Returns: List of children nodes including non-leaf nodes under the given target node
"""
first_pass_children = [cat for cat in all_categories if cat["parent"] == target_node]
if len(first_pass_children) == 0:
# Node is a leaf node:
children_nodes = []
else:
children_nodes = first_pass_children
# Iterate to get children at subsequent tiers:
while True:
new_children_all = []
for node in first_pass_children:
new_children = [cat for cat in all_categories if cat["parent"] == node["id"]]
if len(new_children) > 0:
children_nodes.extend(new_children)
new_children_all.extend(new_children)
if len(new_children_all) == 0:
# No deeper tiers found.
break
else:
# Look for deeper tiers among the nodes found in the last iteration:
first_pass_children = new_children_all
return children_nodes
[docs]def delete_node(client: Client, project_id: str, path: list, force_delete: bool = False):
"""
Deletes a node from a taxonomy.
Args:
client: Tamr client connected to target instance
project_id: ID of the categorization project
path: Full path of the node to be deleted
force_delete: Optional flag. Default is false. If true, deletes even if there are still
records assigned to that category. If false, the operation fails with an error.
Returns: None
"""
# Get all categories to extract category ID:
cat_response = client.get(f"projects/{project_id}/taxonomy/categories")
all_cats = json.loads(cat_response.content)
target_cat = [cat for cat in all_cats if cat["path"] == path][0]
target_cat_full_id = target_cat["id"]
target_cat_id = target_cat_full_id.split("/")[-1]
# Check if there are any children nodes to the target node:
children_nodes = _get_children_nodes(all_cats, target_cat_full_id)
all_children_ids = [cat["id"] for cat in children_nodes]
all_children_ids.append(target_cat_full_id)
# Check if there are any records verified to the category to be deleted:
verified_cats_response = client.get(f"projects/{project_id}/categorizations/labels/records")
node_records = []
for line in verified_cats_response.iter_lines():
record = json.loads(line)
if "verified" in record:
if record["verified"]["category"]["categoryId"] in all_children_ids:
node_records.append(record)
if len(node_records) == 0:
LOGGER.info(f"No records found for node {path} or any children nodes. Deleting node.")
client.delete(f"projects/{project_id}/taxonomy/categories/{target_cat_id}")
else:
LOGGER.info(
f"There are verified records under node {path}. Deleting the node without "
f"moving these records will result in loss of work. If you wish to proceed "
f"set force_delete flag to True."
)
if force_delete:
LOGGER.info(f"Force Delete is set to true. Deleting node {path}")
client.delete(f"projects/{project_id}/taxonomy/categories/{target_cat_id}")
else:
delete_node_error = "Force delete is set to false. Exiting without deletion."
LOGGER.error(delete_node_error)
raise RuntimeError(delete_node_error)
return
[docs]def rename_node(client: Client, project_id: str, new_name: str, path: list):
"""
Renames an existing node in the taxonomy.
Args:
client: Tamr client connected to target instance
project_id: ID of the categorization project
new_name: New name to assign to the leaf node
path: Full path of the existing leaf node to rename
Returns: None
"""
body = {"path": path, "name": new_name}
# Get all categories to extract category ID:
cat_response = client.get(f"projects/{project_id}/taxonomy/categories")
all_cats = json.loads(cat_response.content)
target_cat = [cat for cat in all_cats if cat["path"] == path][0]
target_cat_id = target_cat["id"].split("/")[-1]
LOGGER.info(f"Renaming category id {target_cat_id} in project {project_id} to {new_name}")
response = client.put(f"projects/{project_id}/taxonomy/categories/{target_cat_id}", json=body)
# Log an error if the operation failed:
if not response.ok:
content = json.loads(response.content)
rename_node_error = (
f"Renaming node {target_cat_id} failed with message " f"{content['message']}"
)
LOGGER.error(rename_node_error)
raise RuntimeError(rename_node_error)
return
[docs]def create_node(client: Client, project_id: str, path: list):
"""
Creates a category with the specified path in the project taxonomy.
Args:
client: Tamr client connected to target instance
project_id: ID of the categorization project
path: Full path of the new category to be added
Returns: None
"""
body = {"name": path[-1], "path": path}
LOGGER.info(f"Creating new category {path[-1]} in project {project_id}")
response = client.post(f"projects/{project_id}/taxonomy/categories", json=body)
# Log an error if the operation failed:
if not response.ok:
content = json.loads(response.content)
create_node_error = f"Creating node {path[-1]} failed with message {content['message']}"
LOGGER.error(create_node_error)
raise RuntimeError(create_node_error)
return
[docs]def get_taxonomy_as_dataframe(client: Client, project_id: str) -> "pd.DataFrame":
"""
Returns the taxonomy for a project given the project ID.
Args:
client: Tamr client connected to target instance
project_id: ID of the categorization project
Returns:
Current taxonomy categories as a dataframe
Raises:
RuntimeError: if project is not a categorization project or if the taxonomy does not exist
"""
# This function requires pandas, an optional dependency
import pandas as pd
LOGGER.info(f"Retrieving taxonomy for project ID {project_id}")
response = client.get(f"projects/{project_id}/taxonomy/categories")
# Check for bad or empty response:
if not response.ok or (response.text == "[]"):
empty_response_error = (
f"There was no taxonomy found for project {project_id}. Check if the project_id is "
f"correct and if there is an existing taxonomy."
)
LOGGER.error(empty_response_error)
raise RuntimeError(empty_response_error)
max_depth = 0
categories = []
for blob in response.json():
category = blob["path"]
max_depth = max(max_depth, len(category))
categories.append(category)
tiers = [f"T{i}" for i in range(1, max_depth + 1)]
taxonomy_dict = {}
for tier in tiers:
taxonomy_dict[tier] = []
for category in categories:
for tier_index, tier_name in enumerate(tiers):
node_name = None
if len(category) > tier_index:
node_name = category[tier_index]
taxonomy_dict[tier_name].append(node_name)
return pd.DataFrame(taxonomy_dict)
def _create_action(record_id: str, category_path: list):
"""
Internal function to create the actions to POST to the updateRecords API.
Args:
record_id: The ID of the record to be labeled.
category_path: The path of the category to assign to the record.
Returns: Action in correct format.
"""
action = {
"action": "CREATE",
"recordId": record_id,
"record": {"verified": {"category": {"path": category_path}}},
}
json_row = json.dumps(action).encode("utf-8")
return json_row
def _batch(iterable, n=500):
"""
Internal function to batch payload to POST categorizations API
"""
total_len = len(iterable)
for ndx in range(0, total_len, n):
yield iterable[ndx : min(ndx + n, total_len)]
[docs]def move_node(
client: Client,
project_id: str,
old_node_path: list,
new_node_path: list,
move_verifications: bool = True,
):
"""
Function to move a node in a taxonomy to a new path. By default, the function will also move
any verified categorizations under the old node to the new paths.
Args:
client: Tamr client connected to the target instance.
project_id: Project ID of categorization project.
old_node_path: List of the full path for the node to be moved.
new_node_path: List of the full path for where the node is to be moved to.
move_verifications: Optional boolean argument to move verifications to the new path.
Setting to false may result in loss of work.
Returns: None
"""
# First we need to get all the paths of nodes to be moved:
taxonomy_df = get_taxonomy_as_dataframe(client, project_id)
all_paths = taxonomy_df.agg(lambda x: list(x.dropna()), axis=1).tolist()
paths_to_move = [path for path in all_paths if all(x in path for x in old_node_path)]
paths_to_move.sort()
# Now we need to generate the new paths:
new_paths = copy.deepcopy(paths_to_move)
length_to_replace = len(old_node_path)
for path in new_paths:
path[:length_to_replace] = new_node_path
# Next we want to create all the new paths. This operation is safe if the paths exist:
for path in new_paths:
LOGGER.info("Creating the new nodes")
create_node(client, project_id, path)
# Then, we want to move any existing verifications from the old to new paths:
if move_verifications:
payload = []
# First get all the verified records:
verified_cats_response = client.get(
f"projects/{project_id}/categorizations/labels/records"
)
all_records = []
for line in verified_cats_response.iter_lines():
all_records.append(json.loads(line))
# Check if there are any verified responses in the stream:
verified_recs = [record for record in all_records if "verified" in record]
# Next, for each node, check if there are any verified responses:
if len(verified_recs) > 0:
for i in range(len(paths_to_move)):
sub_records = [
record
for record in all_records
if record["verified"]["category"]["path"] == paths_to_move[i]
]
if len(sub_records) == 0:
continue
else:
# If there are records, create actions to move them:
for record in sub_records:
action = _create_action(record["recordId"], new_paths[i])
payload.append(action)
# Post verified records in batches:
LOGGER.info(f"Moving verified records from {old_node_path} to {new_node_path}")
for batch in _batch(payload):
client.post(
f"projects/{project_id}/categorizations/labels:updateRecords",
data=(row for row in batch),
)
# Finally, delete the original node (all children will also be deleted):
delete_node(client, project_id, old_node_path)
LOGGER.info(f"Successfully moved {old_node_path} to {new_node_path}")
return