Source code for tamr_toolbox.dataset.manage

from typing import List, Optional, Dict, Iterable
import logging

from tamr_unify_client import Client
from tamr_unify_client.dataset.resource import Dataset
from tamr_toolbox.models import attribute_type

from tamr_toolbox.models.data_type import JsonDict

LOGGER = logging.getLogger(__name__)


[docs]def exists(*, client: Client, dataset_name: str) -> bool: """ Check if a dataset exists in a Tamr instance Args: client: Tamr python client object for the target instance dataset_name: The dataset name Return: True or False for if the dataset exists in target instance """ try: client.datasets.by_name(dataset_name) except KeyError: return False return True
[docs]def create( *, client: Client, dataset_name: str, dataset: Optional[Dataset] = None, primary_keys: Optional[List[str]] = None, attributes: Optional[Iterable[str]] = None, attribute_types: Optional[Dict[str, attribute_type.AttributeType]] = None, attribute_descriptions: Optional[Dict[str, str]] = None, description: Optional[str] = None, external_id: Optional[str] = None, tags: Optional[List[str]] = None, ) -> Dataset: """Flexibly create a source dataset in Tamr A template dataset object can be passed in to create a duplicate dataset with a new name. If the template dataset is not provided, the primary_keys must be defined for the dataset to be created. Additional attributes can be added in the attributes argument. The default attribute type will be ARRAY STRING. Non-default attribute types can be specified in the attribute_types dictionary. Any attribute descriptions can be specified in the attribute_descriptions dictionary. Args: client: TUC client object dataset_name: name for the new dataset being created dataset: optional dataset TUC object to use as a template for the new dataset primary_keys: one or more attributes for primary key(s) of the new dataset attributes: a list of attribute names to create in the new dataset attribute_types: dictionary for non-default types, attribute name is the key and AttributeType is the value attribute_descriptions: dictionary for attribute descriptions, attribute name is the key and the attribute description is the value description: description of the new dataset external_id: external_id for dataset, if None Tamr will create one for you tags: the list of tags for the new dataset Returns: Dataset created in Tamr Raises: requests.HTTPError: If any HTTP error is encountered ValueError: If both dataset and primary_keys are not defined ValueError: If the dataset already exists TypeError: If the attributes argument is not an Iterable Example: >>> import tamr_toolbox as tbox >>> tamr_client = tbox.utils.client.create(**instance_connection_info) >>> tbox.dataset.manage.create( >>> client=tamr_client, >>> dataset_name="my_new_dataset", >>> primary_keys=["unique_id"], >>> attributes=["name","address"], >>> description="My new dataset", >>> ) """ if not dataset and not primary_keys: raise ValueError(f"dataset or primary_keys must be defined") # Get dataset information if dataset: # Get attributes from dataset object attribute_types, attribute_descriptions = {}, {} for attr in dataset.attributes.stream(): attribute_types[attr.name] = attribute_type.from_json(attr.type.spec().to_dict()) attribute_descriptions[attr.name] = attr.description attributes = attribute_types.keys() # Get dataset spec information description = dataset.description tags = dataset.tags primary_keys = dataset.key_attribute_names # Check input type is correct if attributes and not isinstance(attributes, Iterable): raise TypeError("attributes arg must be an Iterable") if not exists(client=client, dataset_name=dataset_name): creation_spec = { "name": dataset_name, "description": description, "keyAttributeNames": primary_keys, "externalId": external_id, "tags": tags, } client.datasets.create(creation_spec) LOGGER.info(f"A dataset with name {dataset_name} has been created") else: raise ValueError(f"A dataset with name '{dataset_name}' already exists") # Get new dataset target_dataset = client.datasets.by_name(dataset_name) # Update attributes in dataset if attributes: filtered_attributes = [attr for attr in attributes if attr not in primary_keys] create_attributes( dataset=target_dataset, attributes=filtered_attributes, attribute_types=attribute_types, attribute_descriptions=attribute_descriptions, ) return target_dataset
[docs]def update( dataset: Dataset, *, attributes: Optional[Iterable[str]] = None, attribute_types: Optional[Dict[str, attribute_type.AttributeType]] = None, attribute_descriptions: Optional[Dict[str, str]] = None, description: Optional[str] = None, tags: Optional[List[str]] = None, override_existing_types: bool = False, ) -> Dataset: """Flexibly update a source dataset in Tamr All the attributes that should exist in the dataset must be defined in the attributes argument. This function will add/remove attributes in the dataset until the dataset attributes matches the set of attributes passed in as an argument. The default attribute type will be ARRAY STRING . To set non-default attribute types, they must be defined in the attribute_types dictionary. Any attribute descriptions can be specified in the attribute_descriptions dictionary. By default, the existing attribute types will not change unless override_existing_types is set to True. When False, the attribute type updates will only be logged. Args: dataset: An existing TUC dataset attributes: Complete list of attribute names that should exist in the updated dataset attribute_types: dictionary for non-default types, attribute name is the key and AttributeType is the value attribute_descriptions: dictionary for attribute descriptions, attribute name is the key and the attribute description is the value description: updated description of dataset, if None will not update the description tags: updated tags for the dataset, if None will not update tags override_existing_types: boolean flag, when true will alter existing attribute's types Returns: Updated Dataset Raises: requests.HTTPError: If any HTTP error is encountered ValueError: If the dataset is not a source dataset TypeError: If the attributes argument is not an Iterable Example: >>> import tamr_toolbox as tbox >>> from tbox.models import attribute_type >>> tamr_client = tbox.utils.client.create(**instance_connection_info) >>> dataset = = tamr_client.datasets.by_name("my_dataset_name") >>> tbox.dataset.manage.update( >>> client=tamr_client, >>> dataset=dataset, >>> attributes=["unique_id","name","address","total_sales"], >>> attribute_types={"total_sales":attribute_type.ARRAY(attribute_type.DOUBLE)}, >>> override_existing_types = True, >>> ) """ dataset_name = dataset.name if dataset.upstream_datasets(): raise ValueError(f"{dataset_name} is not a source dataset") primary_keys = dataset.spec().to_dict()["keyAttributeNames"] # Check input type is correct if attributes and not isinstance(attributes, Iterable): raise TypeError("attributes arg must be an Iterable") # Update description and tags dataset_spec = dataset.spec() if description: dataset_spec = dataset_spec.with_description(description) LOGGER.info(f"Updating description for {dataset_name}") if tags: dataset_spec = dataset_spec.with_tags(tags) LOGGER.info(f"Updating tags for {dataset_name}") dataset_spec.put() if attributes: # Get current dataset attributes existing_attributes = [attr.name for attr in dataset.attributes] # Update attributes in dataset for attribute_name in attributes: if attribute_name in primary_keys: continue elif attribute_name in existing_attributes: # This attribute already exists, update to new type type_dict = { attribute_name: (attribute_types or dict()).get( attribute_name, attribute_type.DEFAULT ) } desc_dict = { attribute_name: (attribute_descriptions or dict()).get(attribute_name) } edit_attributes( dataset=dataset, attribute_types=type_dict, attribute_descriptions=desc_dict, override_existing_types=override_existing_types, ) else: # This attribute does not already exist, create create_attributes( dataset=dataset, attributes=[attribute_name], attribute_types=attribute_types, attribute_descriptions=attribute_descriptions, ) # Remove any attributes from dataset that aren't in the new list of attributes for attribute_name in existing_attributes: if attribute_name not in attributes and attribute_name not in primary_keys: delete_attributes(dataset=dataset, attributes=[attribute_name]) return dataset
[docs]def create_attributes( *, dataset: Dataset, attributes: Iterable[str], attribute_types: Optional[Dict[str, attribute_type.AttributeType]] = None, attribute_descriptions: Optional[Dict[str, str]] = None, ) -> Dataset: """Create new attributes in a dataset The default attribute type will be ARRAY STRING. To set non-default attribute types, they must be defined in the attribute_types dictionary. Any attribute descriptions can be specified in the attribute_descriptions dictionary. Args: dataset: An existing TUC dataset attributes: list of attribute names to be added to dataset attribute_types: dictionary for non-default types, attribute name is the key and AttributeType is the value attribute_descriptions: dictionary for attribute descriptions, attribute name is the key and the attribute description is the value Returns: Updated Dataset Raises: requests.HTTPError: If any HTTP error is encountered TypeError: If the attributes argument is not an Iterable ValueError: If the dataset is a unified dataset ValueError: If an attribute passed in already exists in the dataset """ dataset_name = dataset.name if dataset.upstream_datasets(): raise ValueError(f"{dataset_name} is not a source dataset") # Check input type is correct if not isinstance(attributes, Iterable): raise TypeError("attributes arg must be an Iterable") # Get current dataset attributes existing_attributes = [attr.name for attr in dataset.attributes] # Check that none of the new attribute names already exist for attribute_name in attributes: if attribute_name in existing_attributes: # This attribute already exists raise ValueError( f"An attribute with name '{attribute_name}' already exists in {dataset_name}" ) # Add attributes to dataset for attribute_name in attributes: attr_spec_dict = _make_spec_dict( attribute_name=attribute_name, attribute_types=attribute_types, attribute_descriptions=attribute_descriptions, ) dataset.attributes.create(attr_spec_dict) LOGGER.info(f"Created attribute '{attribute_name}' in {dataset_name}") return dataset
[docs]def edit_attributes( *, dataset: Dataset, attribute_types: Optional[Dict[str, attribute_type.AttributeType]] = None, attribute_descriptions: Optional[Dict[str, str]] = None, override_existing_types: bool = True, ) -> Dataset: """Edit existing attributes in a dataset The attribute type and/or descriptions can be updated to new values. Attributes that will be updated must be in either the attribute_types or attribute_descriptions dictionaries or both. The default attribute type will be ARRAY STRING. To set non-default attribute types, they must be defined in the attribute_types dictionary. Any attribute descriptions can be specified in the attribute_descriptions dictionary. If only the attribute_descriptions dictionary is defined, the attribute type will not be updated. Args: dataset: An existing TUC dataset attribute_types: dictionary for non-default types, attribute name is the key and AttributeType is the value attribute_descriptions: dictionary for attribute descriptions, attribute name is the key and the attribute description is the value override_existing_types: bool flag, when true will alter existing attributes Returns: Updated Dataset Raises: requests.HTTPError: If any HTTP error is encountered ValueError: If the dataset is not a source dataset ValueError: If a passed attribute does not exist in the dataset ValueError: If a passed attribute is a primary key and can't be removed ValueError: If there are no updates to attributes in attribute_types or attribute_descriptions arguments """ dataset_name = dataset.name if dataset.upstream_datasets(): raise ValueError(f"{dataset_name} is not a source dataset") # Check description or type changes are passed in if attribute_types is None and attribute_descriptions is None: raise ValueError( """Updates to attributes must be passed in via attribute_types or attribute_descriptions arguments""" ) # Get list of attributes that need updating from attribute_types and # attribute_descriptions dictionaries attributes = {attr for attr in attribute_types or list()} | { attr for attr in attribute_descriptions or list() } # Get current dataset attributes target_attribute_dict = {attr.name: attr for attr in dataset.attributes} existing_attributes = target_attribute_dict.keys() primary_keys = dataset.spec().to_dict()["keyAttributeNames"] # Check that all of the attribute names already exist in dataset for attribute_name in attributes: if attribute_name not in existing_attributes: # This attribute does not exist raise ValueError( f"An attribute with name '{attribute_name}' does not exist in {dataset_name}" ) elif attribute_name in primary_keys: # Can not edit a primary key raise ValueError( f"The attribute '{attribute_name}' is a primary key and can't be updated" ) # Update attributes in dataset for attribute_name in attributes: attr_spec_dict = _make_spec_dict( attribute_name=attribute_name, attribute_types=attribute_types, attribute_descriptions=attribute_descriptions, ) existing_attribute_spec = target_attribute_dict[attribute_name].spec() if attribute_types is None or attribute_name not in attribute_types: new_type_class = attribute_type.from_json(existing_attribute_spec.to_dict()["type"]) else: new_type_class = attribute_type.from_json(attr_spec_dict["type"]) old_type_class = attribute_type.from_json(existing_attribute_spec.to_dict()["type"]) if new_type_class == old_type_class: # Update description if ( attribute_descriptions is not None and attribute_name in attribute_descriptions.keys() ): existing_attribute_spec = existing_attribute_spec.with_description( attribute_descriptions[attribute_name] ) existing_attribute_spec.put() else: LOGGER.info( f"There are no updates to the attribute '{attribute_name}' in {dataset_name}" ) elif override_existing_types: # Update type new_attr_spec = existing_attribute_spec.to_dict() new_attr_spec["type"] = attr_spec_dict["type"] # Update description if "description" in attr_spec_dict.keys(): new_attr_spec["description"] = attr_spec_dict["description"] # Remove and add attribute with new spec dataset.attributes.delete_by_resource_id( target_attribute_dict[attribute_name].resource_id ) dataset.attributes.create(new_attr_spec) LOGGER.info(f"Updated attribute '{attribute_name}' in {dataset_name}") else: LOGGER.info( f"""The attribute '{attribute_name}' in {dataset_name} curently has the type '{str(old_type_class)}'. Set 'override_existing_types' to True to update the type to '{str(new_type_class)}' """ ) return dataset
[docs]def delete_attributes(*, dataset: Dataset, attributes: Iterable[str] = None) -> Dataset: """Remove attributes from dataset by attribute name Args: dataset: An existing TUC dataset attributes: list of attribute names to delete from dataset Returns: Updated Dataset Raises: ValueError: If the dataset is not a source dataset ValueError: If a passed attribute does not exist in the dataset ValueError: If a passed attribute is a primary key and can't be removed TypeError: If the attributes argument is not an Iterable """ dataset_name = dataset.name if dataset.upstream_datasets(): raise ValueError(f"{dataset_name} is not a source dataset") # Check input type is correct if not isinstance(attributes, Iterable): raise TypeError("attributes arg must be an Iterable") # Get current dataset attributes target_attribute_dict = {attr.name: attr for attr in dataset.attributes} existing_attributes = target_attribute_dict.keys() primary_keys = dataset.spec().to_dict()["keyAttributeNames"] # Check all attributes exist before starting to remove any for attribute_name in attributes: if attribute_name not in existing_attributes: raise ValueError(f"The attribute '{attribute_name}' does not exist in {dataset_name}") elif attribute_name in primary_keys: # Can not edit a primary key raise ValueError( f"The attribute '{attribute_name}' is a primary key and can't be removed" ) # Remove attributes from dataset for attribute_name in attributes: dataset.attributes.delete_by_resource_id(target_attribute_dict[attribute_name].resource_id) LOGGER.info(f"Deleted attribute '{attribute_name}' in {dataset_name}") return dataset
[docs]def update_records( dataset: Dataset, *, updates: Optional[list] = None, delete_all: bool = False, primary_keys: List[str], primary_key_name: str, ): """Flexibly update the records of a dataset. The user supplies a list of primary keys for a subset of the datasets records, along with a list of updates describing how each record should be altered. An update should either be the string "delete" or a dictionary in "attribute: value" format. In the first case, the record having the corresponding primary key is deleted, and in the second case, the data in the dictionary replaces the record having the corresponding primary key. If no such record exists, a new record is created. Alternatively, the user can set a flag to specify that all records indicated by the list of primary keys should be deleted. Args: dataset: An existing TUC dataset updates: List of updates to push to the dataset delete_all: Whether all indicated records should be deleted primary_keys: List of primary key values for all target records primary_key_name: Name of the primary key of the target dataset Returns: Updated dataset Raises: KeyError: If an indicated attribute does not exist TypeError: If an update in the list is not "delete" or a dict ValueError: If updates and primary_keys have differing lengths """ # If delete_all, create an updates list of all deletes. Otherwise, ensure that the list of # updates and the list of primary keys have the same length. if delete_all: updates = ["delete"] * len(primary_keys) else: if updates is None or len(primary_keys) != len(updates): raise ValueError( f"Arguments updates and primary_keys must exist and have equal length" ) # Populate list of primary keys for deletion and records to upsert. deletions = [] records = [] attributes = [attribute.name for attribute in dataset.attributes] for i in range(len(updates)): if updates[i] == "delete": deletions.append(primary_keys[i]) else: if not isinstance(updates[i], dict): raise TypeError(f"Invalid update at index {i}") if primary_key_name not in updates[i]: updates[i][primary_key_name] = primary_keys[i] for k in updates[i]: if k not in attributes: raise KeyError(f"Key {k} is not an attribute of input dataset") records.append(updates[i]) # Carry out any deletions. if deletions: dataset.delete_records_by_id(deletions) # Carry out any upserts. if records: dataset.upsert_records(records, primary_key_name) return dataset
def _make_spec_dict( attribute_name: str, attribute_types: Dict[str, attribute_type.AttributeType], attribute_descriptions: Dict[str, str], ) -> JsonDict: """Create attribute spec dictionary The default attribute type will be ARRAY STRING. Non-default attribute types can be specified in the attribute_types dictionary. Any attribute descriptions can be specified in the attribute_descriptions dictionary. Args: attribute_name: name of the attribute attribute_types: dictionary for non-default types, attribute name is the key and AttributeType is the value attribute_descriptions: dictionary for attribute descriptions, attribute name is the key and the attribute description is the value Returns: Json Dict """ if attribute_types is not None and attribute_name in attribute_types.keys(): attr_type = attribute_types[attribute_name] else: attr_type = attribute_type.DEFAULT result = {"name": attribute_name, "type": attribute_type.to_json(attr_type=attr_type)} if attribute_descriptions is not None and attribute_name in attribute_descriptions.keys(): result["description"] = attribute_descriptions[attribute_name] return result