Enrichment

Translation

The tamr-toolbox provides functions to translate standardized data and store it in dictionaries making sure that data is not translated twice. These translation capabilities can be applied to a Tamr dataset or a Pandas DataFrame.

Translate data within a pandas DataFrame and save dictionaries on disk

"""An example script to translate data from disk and save results on disk"""
from typing import List

import tamr_toolbox as tbox
import pandas as pd

import argparse


def main(
    json_credential_path: str,
    dictionary_folder: str,
    attributes_to_translate: List[str],
    path_to_csv_to_translate: str,
    path_to_translated_csv: str,
) -> None:
    """
    Translate data located on disk and save results to disk
    Args:
        json_credential_path: path to the json file containing Google Translate API keys
        dictionary_folder: Path to the folder on disk where local versions of dictionary are saved
        attributes_to_translate: List of attributes from the local csv file to translate
        path_to_csv_to_translate: Path to the CSV file to translate
        path_to_translated_csv: path to the CSV file with translated data

    Returns:

    """
    # make Google api translation client
    google = tbox.enrichment.api_client.google.translation_client_from_json(json_credential_path)

    # read csv file from disk
    df = pd.read_csv(path_to_csv_to_translate, dtype=object)

    # load dictionary
    LOGGER.info(f"Starting translation from french to english")
    dictionary = tbox.enrichment.dictionary.load(
        dictionary_folder=dictionary_folder, target_language="en", source_language="fr"
    )

    # translate attribute by attribute
    for attribute in attributes_to_translate:
        LOGGER.info(f"Translating attribute: {attribute}")
        dictionary = tbox.enrichment.translate.from_list(
            all_phrases=df[attribute].unique().tolist(),
            client=google,
            dictionary=dictionary,
            target_language="en",
            source_language="fr",
            intermediate_save_every_n_chunks=100,
            intermediate_save_to_disk=True,
            intermediate_folder=dictionary_folder,
        )

    # save to disk new dictionary with added translation
    LOGGER.info(f"Finished translation from french to english")
    LOGGER.info(f"Saving updated dictionary to disk")
    tbox.enrichment.dictionary.save(
        translation_dictionary=dictionary,
        dictionary_folder=dictionary_folder,
        target_language="en",
        source_language="fr",
    )

    # Translating dataframe insitu
    LOGGER.info(f"Translating dataframe from french to english")
    LOGGER.debug("Converting dictionary to mapping of original to translated phrases")
    dictionary = tbox.enrichment.dictionary.convert_to_mappings(dictionary)

    for attribute in attributes_to_translate:
        LOGGER.info(f"Translating attribute {attribute} from french to english")
        df[attribute + "_translated"] = df[attribute].map(dictionary)

    # Then save dataframe to disk
    df.to_csv(path_to_translated_csv, index=False)


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/conf/dataset.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        json_credential_path=CONFIG["translation"]["json_credential_path"],
        dictionary_folder=CONFIG["path_to_dictionary_folder"],
        attributes_to_translate=CONFIG["translation"]["attributes"],
        path_to_csv_to_translate="path_to_my_data.csv",
        path_to_translated_csv="path_to_my_translated_data.csv",
    )

Translate data from Tamr and update dictionary saved as a source dataset on Tamr

"""An example script to translate data from Tamr and save results in Tamr"""
from typing import Dict, Any

import tamr_toolbox as tbox

import argparse


def main(
    *,
    instance_connection_info: Dict[str, Any],
    unified_dataset_id: str,
    dictionary_dataset_id: str,
) -> None:
    """
    Translate data streamed from Tamr and save results on Tamr

    Args:
        instance_connection_info: Information for connecting to Tamr (host, port, username etc)
        unified_dataset_id: id of the Tamr unified dataset containing the data to translate
        dictionary_dataset_id: id of the Tamr toolbox translation dictionary dataset

    Returns:

    """
    # make Tamr Client, make Google api translation client
    tamr = tbox.utils.client.create(**instance_connection_info)
    google = tbox.enrichment.api_client.google.translation_client_from_json(
        json_credential_path=CONFIG["translation"]["json_credential_path"]
    )

    # list attributes to translate
    attributes_to_translate = CONFIG["translation"]["attributes"]

    # get dataframe from Tamr unified dataset: best is to pass a delta dataset where
    # only untranslated data is kept.
    # To do this setup a SM project connected to your current translated UD and filter to records
    # with null values in the translated attributes.
    dataset = tamr.datasets.by_resource_id(unified_dataset_id)
    df = tbox.data_io.dataframe.from_dataset(
        dataset, columns=attributes_to_translate, flatten_delimiter=" | "
    )

    # stream dictionary from Tamr. Dictionaries should follow the TranslationDictionary class of
    # the toolbox: "standardized_phrase" (str), "translated_phrase" (str),
    # "detected_language" (str), "original_phrases" (List[str])
    dictionary_dataset = tamr.datasets.by_resource_id(dictionary_dataset_id)
    dictionary = tbox.enrichment.dictionary.from_dataset(dictionary_dataset)

    for column in df.columns:
        LOGGER.info(f"Translating attribute: {column}")
        dictionary = tbox.enrichment.translate.from_list(
            all_phrases=df[column].unique().tolist(),
            client=google,
            dictionary=dictionary,
            source_language="fr",
            target_language="en",
        )

    # update dictionary on Tamr
    dataset_name = tbox.enrichment.dictionary.to_dataset(
        dictionary=dictionary, dataset=dictionary_dataset
    )
    LOGGER.info(f"Tamr dataset {dataset_name} updated with new translation data")
    LOGGER.info("Script complete.")


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/conf/dataset.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        instance_connection_info=CONFIG["my_tamr_instance"],
        unified_dataset_id=CONFIG["datasets"]["my_mastering_project_dataset"]["id"],
        dictionary_dataset_id=CONFIG["datasets"]["my_dictionary"]["id"],
    )

Initiate a translation dictionary on Tamr

"""An example script to create an empty translation dictionary on Tamr"""
from typing import Dict, Any
import tamr_toolbox as tbox
import argparse


def main(
    *,
    instance_connection_info: Dict[str, Any],
    dictionary_folder: str,
    source_language: str,
    target_language: str,
) -> None:
    """
    Create an empty toolbox translation dictionary dataset on Tamr
    Args:
        instance_connection_info: Information for connecting to Tamr (host, port, username etc)
        dictionary_folder: Path to the folder on disk where local versions of dictionary are saved
        source_language: Source language of the dictionary
        target_language: Target language of the dictionary

    Returns:

    """
    # Connect to tamr
    tamr = tbox.utils.client.create(**instance_connection_info)

    # Create an empty dictionary on Tamr or load existing dictionary
    LOGGER.info(
        f"Initiating empty translation dictionary from source language {source_language} "
        f"to target language {target_language}"
    )

    dictionary = tbox.enrichment.dictionary.load(
        dictionary_folder=dictionary_folder,
        target_language=target_language,
        source_language=source_language,
    )

    if len(dictionary) > 0:
        error_message = (
            f"Warning: dictionary from {source_language} to {target_language} in "
            f"{dictionary_folder} already exists and is not empty"
        )
        LOGGER.warning(error_message)

    dataset_name = tbox.enrichment.dictionary.to_dataset(
        dictionary=dictionary,
        datasets_collection=tamr.datasets,
        target_language=target_language,
        source_language=source_language,
        create_dataset=True,
    )
    LOGGER.info(f"{dataset_name} created as a source dataset on Tamr")


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    parser.add_argument("--source", help="source language to translate from", required=True)
    parser.add_argument("--target", help="target language to translate to", required=True)

    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/conf/dataset.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        instance_connection_info=CONFIG["my_tamr_instance"],
        dictionary_folder=CONFIG["translation"]["my_dictionary_folder"],
        source_language=args.source,
        target_language=args.target,
    )

Address Validation

The tamr-toolbox provides functions to validate address data and store it (up to the cache time limit) to ensure addresses are not repeatedly validated. These validation capabilities can be applied to a Tamr dataset or local data.

Validate data within a local CSV and save results on disk

"""An example script to validate address data from disk and save results on disk."""
import argparse
from dataclasses import fields
from typing import List

import pandas as pd

import tamr_toolbox as tbox
from tamr_toolbox.enrichment.enrichment_utils import join_clean_tuple


def main(
    googlemaps_api_key: str,
    mapping_folder: str,
    address_columns: List[str],
    path_to_csv_to_validate: str,
    path_to_validated_csv: str,
) -> None:
    """Validate data located on disk and save results to disk.

    Uses the address validation mapping data found in the mapping folder, adding to it any new
    lookups needed. Adds columns corresponding to `AddressValidationMapping` fields to the data
    read from the csv, and saves it at `path_to_validated_csv`.

    Args:
        googlemaps_api_key: API key for the Google Maps address validation API
        mapping_folder: path to the folder on disk where local validation data is saved
        address_columns: ordered list of address columns from the local csv file
        path_to_csv_to_validate: Path to the CSV file to validate
        path_to_validated_csv: path to the CSV file augmented with validation data
    """
    # Make Google Maps API client
    maps_client = tbox.enrichment.api_client.google_address_validate.get_maps_client(
        googlemaps_api_key
    )

    # Read CSV file from disk
    dataframe = pd.read_csv(path_to_csv_to_validate, dtype=object)

    # Load any existing validation data
    LOGGER.info("Starting address validation.")
    mapping = tbox.enrichment.address_mapping.load(addr_folder=mapping_folder)

    # Validate attributes
    tuples = tbox.enrichment.enrichment_utils.dataframe_to_tuples(
        dataframe=dataframe, columns_to_join=address_columns
    )

    LOGGER.info("Generated %s tuples; beginning API validation", len(tuples))
    mapping = tbox.enrichment.address_validation.from_list(
        all_addresses=list(set(tuples)),
        client=maps_client,
        dictionary=mapping,
        region_code="US",
        intermediate_save_every_n=100,
        intermediate_save_to_disk=True,
        intermediate_folder=mapping_folder,
    )

    # Save to disk new mapping with added validation data
    LOGGER.info("Saving updated address validation data to disk.")
    tbox.enrichment.address_mapping.save(addr_mapping=mapping, addr_folder=mapping_folder)

    # Augmenting dataframe in situ
    LOGGER.info("Augmenting dataframe with validation data")

    # Add empty columns for each entry from the AddressValidation Mapping
    for att in fields(tbox.enrichment.address_mapping.AddressValidationMapping):
        dataframe[att.name + "_from_address_validation"] = None

    dataframe["lookup_key"] = [join_clean_tuple(tup) for tup in tuples]

    for ind, row in dataframe.iterrows():
        for att in fields(tbox.enrichment.address_mapping.AddressValidationMapping):
            col_name = att.name + "_from_address_validation"
            dataframe.at[ind, col_name] = getattr(mapping[row.lookup_key], att.name)

    # Then save dataframe to disk
    dataframe.to_csv(path_to_validated_csv, index=False)


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/address_validation.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        googlemaps_api_key=CONFIG["address_validation"]["googlemaps_api_key"],
        mapping_folder=CONFIG["address_validation"]["address_mapping_folder"],
        address_columns=CONFIG["address_validation"]["address_columns"],
        path_to_csv_to_validate="/path/to/data.csv",
        path_to_validated_csv="/path/to/augmented_data.csv",
    )

Validate data from Tamr and update validation data mapping saved as a source dataset on Tamr

"""An example script to validate address data from Tamr and save results in Tamr"""
import argparse
from datetime import timedelta
from typing import Any, Dict, List

import tamr_toolbox as tbox


def main(
    *,
    instance_connection_info: Dict[str, Any],
    dataset_id: str,
    dataset_addr_columns: List[str],
    mapping_dataset_id: str,
    googlemaps_api_key: str,
) -> None:
    """Validate address data streamed from Tamr and save results on Tamr.

    Note that this does not update the dataset corresponding to the input `dataset_id` -- it
    performs lookups based on data in that dataset, and updates the dataset corresponding to the
    `mapping_dataset_id` with the new data.

    Args:
        instance_connection_info: Information for connecting to Tamr (host, port, username etc)
        dataset_id: id of the Tamr dataset containing the data to validate
        dataset_addr_columns: ordered list of columns in the unified dataset with address info
        mapping_dataset_id: id of the Tamr toolbox address validation mapping dataset
        googlemaps_api_key: API key for the Google Maps address validation API

    """
    # Make Tamr Client
    tamr = tbox.utils.client.create(**instance_connection_info)

    # Get dataframe from Tamr dataset.
    # For large datasets, it is to preferable to use a delta dataset with only unvalidated/expired
    # data. To do this, set up a SM project connected to current validated dataset and filter to
    # records with null/expired values in the validation columns
    dataset = tamr.datasets.by_resource_id(dataset_id)
    dataframe = tbox.data_io.dataframe.from_dataset(
        dataset, columns=dataset_addr_columns, flatten_delimiter=" | "
    )

    # Stream address mapping data from Tamr -- must match Toolbox AddressValidationMapping class
    mapping_dataset = tamr.datasets.by_resource_id(mapping_dataset_id)
    mapping = tbox.enrichment.address_mapping.from_dataset(mapping_dataset)

    LOGGER.info("Starting address validation.")
    maps_client = tbox.enrichment.api_client.google_address_validate.get_maps_client(
        googlemaps_api_key
    )

    tuples = tbox.enrichment.enrichment_utils.dataframe_to_tuples(
        dataframe=dataframe, columns_to_join=dataset_addr_columns
    )

    # Update the `region_code` below to match the expected region of your dataset, or remove it if
    # no `region_code` can be inferred
    # Update the expiration date buffer depending on update frequency of your pipeline
    mapping = tbox.enrichment.address_validation.from_list(
        all_addresses=tuples,
        client=maps_client,
        dictionary=mapping,
        enable_usps_cass=False,
        region_code="US",
        expiration_date_buffer=timedelta(days=2),
    )

    # Update address validation mapping on Tamr
    dataset_name = tbox.enrichment.address_mapping.to_dataset(
        addr_mapping=mapping, dataset=mapping_dataset
    )
    LOGGER.info("Tamr dataset %s updated with new validation data", dataset_name)
    LOGGER.info("Script complete.")


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config,
        default_path_to_file="/path/to/my/conf/address_validation.config.yaml",
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        instance_connection_info=CONFIG["my_tamr_instance"],
        dataset_id=CONFIG["datasets"]["my_dataset_to_be_addr_validated"]["id"],
        dataset_addr_columns=CONFIG["datasets"]["my_dataset_to_be_addr_validated"][
            "address_columns"
        ],
        mapping_dataset_id=CONFIG["datasets"]["my_addr_validation_mapping"]["id"],
        googlemaps_api_key=CONFIG["address_validation"]["googlemaps_api_key"],
    )

Initiate an address validation mapping dataset on Tamr

"""An example script to create an address validation mapping on Tamr."""
import argparse
from typing import Dict, Any

import tamr_toolbox as tbox


def main(
    *, instance_connection_info: Dict[str, Any], existing_mapping_folder: str, dataset_name: str
) -> None:
    """Create a toolbox address validation mapping dataset on Tamr.

    If a mapping is found in the `existing_mapping_folder`, it will be loaded to Tamr; otherwise
    an empty dataset is created in Tamr (no file will be created on the local filesystem).

    Args:
        instance_connection_info: Information for connecting to Tamr (host, port, username etc)
        existing_mapping_folder: Path to the folder on disk for existing validation data
        dataset_name: name for the new address validation dataset in Tamr
    """
    # Connect to tamr
    tamr = tbox.utils.client.create(**instance_connection_info)

    LOGGER.info("Initializing address validation mapping dataset on Tamr.")
    # Load existing data. If existing data is saved under another name than the default
    # "address_validation_mapping.json", pass the filename to the `load` function here
    mapping = tbox.enrichment.address_mapping.load(addr_folder=existing_mapping_folder)

    if len(mapping) > 0:
        LOGGER.warning(
            "Alert: address validation mapping in %s already exists and is not empty",
            existing_mapping_folder,
        )

    dataset_name = tbox.enrichment.address_mapping.to_dataset(
        addr_mapping=mapping,
        datasets_collection=tamr.datasets,
        create_dataset=True,
        dataset_name=dataset_name,
    )
    LOGGER.info("Dataset %s created as a source dataset on Tamr", dataset_name)


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)

    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/address_validation.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    # Use config name for `my_addr_validation_mapping` if supplied, otherwise use default
    main(
        instance_connection_info=CONFIG["my_tamr_instance"],
        existing_mapping_folder=CONFIG["address_validation"]["address_mapping_folder"],
        dataset_name=CONFIG.get("datasets", dict())
        .get("my_addr_validation_mapping", dict())
        .get("name", "address_validation_mapping"),
    )