Source code for tamr_toolbox.project._common.schema

"""Tasks related to schema mapping as part of Tamr projects"""
from tamr_unify_client.project.attribute_mapping.resource import (
    AttributeMappingSpec,
    AttributeMapping,
)
from tamr_unify_client.project.resource import Project
from tamr_unify_client.dataset.resource import Dataset

from typing import List
import logging
from json import JSONDecodeError

LOGGER = logging.getLogger(__name__)


def _get_mapping_spec_for_ud(
    *, source_ds_name: str, source_attr_name: str, unified_attr_name: str, unified_ds_name: str
) -> AttributeMappingSpec:
    """Makes an AttributeMappingSpec to be used to map attributes.
    Is a wrapper for its creation to hide internals in one place.

    Args:
        source_ds_name: the name of the source dataset
        source_attr_name: the name of the source attribute
        unified_attr_name: the name of the unified attribute
        unified_ds_name: the name of the unified dataset

    Returns:
        AttributeMappingSpec containing the passed information
    """
    spec = (
        AttributeMappingSpec.new()
        .with_input_attribute_name(source_attr_name)
        .with_input_dataset_name(source_ds_name)
        .with_unified_dataset_name(unified_ds_name)
        .with_unified_attribute_name(
            unified_attr_name
        )  # rest can be empty - Tamr will fill them in
        .with_input_attribute_id("")
        .with_relative_input_attribute_id("")
        .with_unified_attribute_id("")
        .with_relative_unified_attribute_id("")
    )
    return spec


[docs]def map_attribute( project: Project, *, source_attribute_name: str, source_dataset_name: str, unified_attribute_name: str, ) -> AttributeMapping: """ Maps source_attribute in source_dataset to unified_attribute in unified_dataset. If the mapping already exists it will log a warning and return the existing AttributeMapping from the project's collection. Args: source_attribute_name: Source attribute name to map source_dataset_name: Source dataset containing the source attribute unified_attribute_name: Unified attribute to which to map the source attribute project: The project in which to perform the mapping Returns: The created AttributeMapping Raises: ValueError: if input variables `source_attribute_name` or `source_dataset_name` or `unified_attribute_name` are set to empty strings; or if the dataset `source_dataset_name` is not found on Tamr; or if `source_attribute_name` is missing from the attributes of `source_attribute_name` """ # simple validation, nothing should be empty variables = [source_attribute_name, source_dataset_name, unified_attribute_name] empty_variables = [x for x in variables if x == ""] if empty_variables: empty_variable_string = ", ".join(empty_variables) error_message = ( f"The following variables are set to empty strings and " f"need to be filled in: {empty_variable_string} !" ) LOGGER.error(error_message) raise ValueError(error_message) # also validate that the dataset exists and has this column try: source_dataset = project.client.datasets.by_name(source_dataset_name) except KeyError: error_msg = f"Dataset {source_dataset_name} not found!" LOGGER.error(error_msg) raise ValueError(error_msg) if source_attribute_name not in [x.name for x in source_dataset.attributes]: error_msg = f"Attribute {source_attribute_name} not found in {source_dataset_name}!" LOGGER.error(error_msg) raise ValueError(error_msg) # generate mapping spec mapping_spec = _get_mapping_spec_for_ud( source_attr_name=source_attribute_name, source_ds_name=source_dataset_name, unified_attr_name=unified_attribute_name, unified_ds_name=project.unified_dataset().name, ) # add the mapping to the project's collection - this is what does the actual mapping try: return project.attribute_mappings().create(mapping_spec.to_dict()) except JSONDecodeError as e: # can get a jsondecode error if the attribute is already mapped. # If it is, then log a warning and return empty mapping # if it is not already mapped break loudly m: AttributeMapping for m in project.attribute_mappings().stream(): if ( m.input_dataset_name == source_dataset_name and m.input_attribute_name == source_attribute_name and m.unified_attribute_name == unified_attribute_name ): # mapping exists, log warning and return existing mapping LOGGER.warning( f"mapping of attribute {source_attribute_name} in dataset " f"{source_dataset_name} to unified attribute {unified_attribute_name} " f"already exists! Returning existing mapping spec" ) return m # if haven't returned then throw the JSONDecodeError raise e
[docs]def unmap_attribute( project: Project, *, source_attribute_name: str, source_dataset_name: str, unified_attribute_name: str, ) -> None: """ Unmaps a source attribute. Args: source_attribute_name: the name of the source attribute to unmap source_dataset_name: the name of the source dataset containing that source attribute unified_attribute_name: the unified attribute from which to unmap project: the project in which to unmap the attribute Returns: None """ LOGGER.info( f"Trying to remove mapping of source attribute {source_attribute_name} in dataset " f"{source_dataset_name} from unified attribute {unified_attribute_name}" ) # get mapping collection mapping_collection = project.attribute_mappings() # run through and get the resource id of the mapping to remove resource_id_to_remove = None for mapping in mapping_collection.stream(): # consider it match if all of source attribute, source dataset and unified attribute # are equal if ( source_attribute_name == mapping.input_attribute_name and source_dataset_name == mapping.input_dataset_name and unified_attribute_name == mapping.unified_attribute_name ): resource_id_to_remove = mapping.resource_id break # log warning if resource id wasn't found if resource_id_to_remove is None: LOGGER.warning( f"Mapping of {source_attribute_name} in dataset {source_dataset_name} to " f"unified attribute {unified_attribute_name} not found!" ) # if found remove it else: mapping_collection.delete_by_resource_id(resource_id_to_remove)
[docs]def bootstrap_dataset( project: Project, *, source_dataset: Dataset, force_add_dataset_to_project: bool = False ) -> List[AttributeMapping]: """ Bootstraps a dataset (i.e. maps all source columns to themselves) Args: source_dataset: the source dataset (a Dataset object not a string) project: the project to do the mapping ing force_add_dataset_to_project: boolean whether to add the dataset to the project if it is not already a part of it Returns: List of the AttributeMappings generated Raises: RuntimeError: if `source_dataset` is not part of the given `project`, set 'force_add_dataset_to_project' flag to True to automatically add it """ # check if dataset is in the project - python doesn't handle comparison of Dataset objects # well so check on name if source_dataset.name not in [x.name for x in project.input_datasets()]: if force_add_dataset_to_project: LOGGER.info(f"adding dataset {source_dataset.name} to project {project.name}") project.add_input_dataset(source_dataset) else: raise RuntimeError( f"dataset {source_dataset.name} not in project {project.name}!" + "Set 'force_add_dataset_to_project' flag to True to automatically add it" ) # for each attribute map it source_dataset_name = source_dataset.name completed_mappings = [] for attribute in source_dataset.attributes: attribute_name = attribute.name mapping = map_attribute( source_attribute_name=attribute_name, source_dataset_name=source_dataset_name, unified_attribute_name=attribute_name, project=project, ) completed_mappings.append(mapping) return completed_mappings
[docs]def unmap_dataset( project: Project, *, source_dataset: Dataset, remove_dataset_from_project: bool = False, skip_if_missing: bool = False, ) -> None: """ Wholly unmaps a dataset and optionally removes it from a project. Args: source_dataset: the source dataset (Dataset object not a string) to unmap project: the project in which to unmap the dataset remove_dataset_from_project: boolean to also remove the dataset from the project skip_if_missing: boolean to skip if dataset is not in project. If set to false and dataset is not in project will raise a RuntimeError Returns: None Raises: RuntimeError: if `source_dataset` is not in `project` and `skip_if_missing` not set to True """ # check to make sure dataset is in project and log a warning if it is not if source_dataset.name not in [x.name for x in project.input_datasets()]: if skip_if_missing: LOGGER.warning( f"Dataset to unmap {source_dataset.name} not in project {project.name}! " f"However skip_if_missing flag is set so will do nothing" ) return None else: error_message = ( f"Dataset to unmap {source_dataset.name} not in project " f"{project.name} and skip_if_missing not set to True so failing! " ) LOGGER.error(error_message) raise RuntimeError(error_message) # the resource ids of attribute mappings unfortunately change when you delete one # so need to just do this until there are no mappings left for the source dataset of interest while True: mappings = [ x for x in project.attribute_mappings().stream() if x.input_dataset_name == source_dataset.name ] # if no mappings found for this dataset then break if not mappings: break for mapping in mappings: # can only delete one then have to break out of inner loop project.attribute_mappings().delete_by_resource_id(mapping.resource_id) break # optionally remove dataset from the project if remove_dataset_from_project: project.remove_input_dataset(source_dataset)