Data Input/Output

DF-Connect

Connecting to Tamr’s Auxiliary Service Df-Connect

Connecting to Tamr’s df-connect auxiliary service is straightforward using the tamr-toolbox. The module that does this is tamr_toolbox/data_io/df_connect and the main object is an instance of type ConnectInfo. The easiest way to generate such an object is via reading in a configuration file like so:

df_connect:
    host: $CONNECT_HOST
    protocol: "http"
    port: "9030"
    tamr_username: $TAMR_USERNAME
    tamr_password: $TAMR_PASSWORD
    jdbc:
        ingest:
            jdbc_url: $CONNECT_INGEST_DB_URL
            db_user: $CONNECT_INGEST_DB_USERNAME
            db_password: $CONNECT_INGEST_DB_PASSWORD
            fetch_size: 10000

Ingesting a table as a Tamr dataset

"""
The below code is an example that reads a config file, shows how to use it to create a
 `Client` object, and then streams an example table into Tamr. Relies on default values and
 everything being specified by config file.
"""
import tamr_toolbox as tbox


my_config = tbox.utils.config.from_yaml("examples/resources/conf/connect.config.yaml")
my_connect = tbox.data_io.df_connect.client.from_config(my_config)

# ingest table A to dataset A
tbox.data_io.df_connect.client.ingest_dataset(
    my_connect, dataset_name="dataset_A", query="SELECT * FROM TABLE_A"
)

Ingesting multiple tables from multiple sources

df_connect:
    host: $CONNECT_HOST
    protocol: "http"
    port: "9030"
    tamr_username: $TAMR_USERNAME
    tamr_password: $TAMR_PASSWORD
    jdbc:
        oracle:
            jdbc_url: $CONNECT_ORACLE_DB_URL
            db_user: $CONNECT_ORACLE_DB_USERNAME
            db_password: $CONNECT_ORACLE_DB_PASSWORD
            fetch_size: 10000
        postgres:
            jdbc_url: $CONNECT_POSTGRES_DB_URL
            db_user: $CONNECT_POSTGRES_DB_USERNAME
            db_password: $CONNECT_POSTGRES_DB_PASSWORD
            fetch_size: 10000
"""Each instance of `Client` is configured to connect to one jdbc source via the `jdbc_key`
parameter. The default value of *ingest* is what is used in the simple example.
However, the code below shows how to connect to multiple source databases using a more
complicated config file.
"""
import tamr_toolbox as tbox

# load example config file for multiple databases
my_connect_config = tbox.utils.config.from_yaml(
    "examples/resources/conf/connect_multi_ingest.config.yaml"
)

# stream table A into Tamr using 'oracle' db source
my_oracle_connect = tbox.data_io.df_connect.client.from_config(
    my_connect_config, jdbc_key="oracle"
)
tbox.data_io.df_connect.client.ingest_dataset(
    my_oracle_connect, dataset_name="source_A", query="SELECT * FROM schemaA.tableA"
)

# stream table B into Tamr using postgres db source
my_postgres_connect = tbox.data_io.df_connect.client.from_config(
    my_connect_config, jdbc_key="postgres"
)
tbox.data_io.df_connect.client.ingest_dataset(
    my_postgres_connect, dataset_name="source_B", query="SELECT * FROM schemaB.tableB"
)

Profiling tables and writing results to a Tamr dataset

"""
The below code is an example that reads in the above config file, shows how to use it to create a
 `Client` object, and then profiles example tables, writing the results into Tamr.
 Relies on default values and everything being specified by config file.
"""
import tamr_toolbox as tbox


my_config = tbox.utils.config.from_yaml("examples/resources/conf/connect.config.yaml")
my_connect = tbox.data_io.df_connect.client.from_config(my_config)

# profile tables A and B, and write results to Tamr dataset dfconnect_profiling
tbox.data_io.df_connect.client.profile_query_results(
    my_connect,
    dataset_name="dfconnect_profiling",
    queries=["SELECT * FROM TABLE_A", "SELECT * FROM TABLE_B"],
)

Exporting a Tamr dataset to a single target

"""
Export data from Tamr using df-connect. An example where everything is default in config file,
which implies exported data is written back to same database as ingested from.
"""
import tamr_toolbox as tbox


my_config = tbox.utils.config.from_yaml("examples/resources/conf/connect.config.yaml")
my_connect = tbox.data_io.df_connect.client.from_config(my_config)

tbox.data_io.df_connect.client.export_dataset(
    my_connect, dataset_name="example_dataset", target_table_name="example_target_table",
)

Exporting a Tamr dataset to multiple targets

df_connect:
    host: $CONNECT_HOST
    protocol: "https"
    port: ""
    base_path: "df_connect"
    tamr_username: $TAMR_USERNAME
    tamr_password: $TAMR_PASSWORD
    jdbc:
        ingest:
            jdbc_url: $CONNECT_INGEST_DB_URL
            db_user: $CONNECT_INGEST_DB_USERNAME
            db_password: $CONNECT_INGEST_DB_PASSWORD
            fetch_size: 10000
        oracle:
            jdbc_url: $CONNECT_EXPORT_ORACLE_DB_URL
            db_user: $CONNECT_EXPORT_ORACLE_DB_USERNAME
            db_password: $CONNECT_EXPORT_ORACLE_DB_PASSWORD
            fetch_size: 0
        postgres:
            jdbc_url: $CONNECT_EXPORT_POSTGRES_DB_URL
            db_user: $CONNECT_EXPORT_POSTGRES_DB_USERNAME
            db_password: $CONNECT_EXPORT_POSTGRES_DB_PASSWORD
            fetch_size: 0
"""
An example script to demonstrate how to export datasets from Tamr using df_connect
sending multiple datasets to multiple different databases with multiple different
parameters/behaviors
"""
import tamr_toolbox as tbox

# load example multi config
my_config = tbox.utils.config.from_yaml("examples/resources/conf/connect_multi_export.yaml")

# stream dataset A to Oracle with default export values from config file
my_connect_oracle = tbox.data_io.df_connect.client.from_config(my_config, jdbc_key="oracle")
tbox.data_io.df_connect.client.export_dataset(
    my_connect_oracle, dataset_name="dataset_A", target_table_name="target_A", jdbc_key="oracle"
)

# stream dataset A to Oracle target table B, while truncating before loading and only 1k records
tbox.data_io.df_connect.client.export_dataset(
    my_connect_oracle,
    dataset_name="dataset_A",
    target_table_name="target_B",
    truncate_before_load=True,
    limit_records=1000,
)

# stream dataset A to Postgres, keeping all Tamr-generated columns
my_connect_postgres = tbox.data_io.df_connect.client.from_config(my_config, jdbc_key="postgres")
tbox.data_io.df_connect.client.export_dataset(
    my_connect_postgres,
    dataset_name="dataset_A",
    target_table_name="target_postgres_A",
    columns_exclude_regex="",
)

# stream dataset A to Postgres, flattening arrays into single string with comma separation
tbox.data_io.df_connect.client.export_dataset(
    my_connect_postgres,
    dataset_name="dataset_A",
    target_table_name="target_postgres_B",
    multi_value_delimiter=",",
)

Exporting a Tamr dataset to Hive

"""
Export data from Tamr to Hive using df-connect. Df-connect supports making *external* tables in
Hive see https://cwiki.apache.org/confluence/display/Hive/Managed+vs.+External+Tables
for some discussion on the difference between managed and external tables. The biggest reason to
take this route is performance. It is infeasible to generate a managed table and insert millions
of rows into Hive. Streaming the data to hdfs and then creating an external table that points at
that table is orders of magnitude faster.

Creating an external table consists of three steps

1) stream the data to HDFS as an avro file (note: this assumes that df-connect itself has been
setup to talk the HDFS) into its own directory

2) stream the avro schema file to HDFS in some other directory than the data file
    (steps 1 and 2 are re-orderable)

3) Create a table in Hive telling Hive about the location of the data and the schema file
(Hive needs both where the data is and what the schema is in order to create a table,
this is messy with avro unless you use an avro schema file, see
https://community.cloudera.com/t5/Support-Questions/Is-there-a-way-to-create-Hive-table-based-on-Avro-data/td-p/119473
for some discussion


WARNING: The jdbc execute endpoint is purposefully designed for flexibility of the engineer to
accomplish data pipeline tasks. As such it allows arbitrary SQL to be run. You should take care to
ensure that you neither run any SQL deleterious to the database. Related,
DO NOT RUN ARBITRARY SQL PASSED FROM UNKNOWN SOURCES.
Doing so opens you up to SQL-injection vulnerabilities. https://xkcd.com/327/
"""
from typing import List
from tamr_toolbox.models.data_type import JsonDict
from tamr_toolbox.data_io.file_system_type import FileSystemType

import tamr_toolbox as tbox

import click


@click.command()
@click.option(
    "--config_file_path",
    default="examples/resources/conf/connect.config.yaml",
    help="the path to a config file containing df-connect connection information.",
)
@click.option(
    "--dataset_name", default="my_dataset", help="the name of the dataset to stream from Tamr."
)
@click.option(
    "--table_name",
    default="my_hive_db.my_hive_table",
    help="the name of the table to create in Hive for this dataset",
)
@click.option("--hdfs_base_path", help="the location in hdfs to use to stream the data/schema")
def main(
    config_file_path: str, dataset_name: str, table_name: str, hdfs_base_path: str,
) -> List[JsonDict]:
    """"Takes a config file (to setup df connect client), locations for both where to put the avro
     file and where to put the avro schema file, along with the hive statement to execute
     for creating a table.

    Args:
        config_file_path: the path to your config file
        dataset_name: the name of the dataset to stream from Tamr to create the table
        table_name: the name of the target table in Hive to create
        hdfs_base_path: the location in hdfs where to put this data. Note that the avro file will
         be in <hdfs_base_path>/data/my_dataset.avro while the schema will be in
         <hdfs_base_path>/schema/my_dataset.avsc

    Returns:
        List of json objects returned by each request that is made
    """

    # setup df_connect client
    my_config = tbox.utils.config.from_yaml(config_file_path)
    my_connect = tbox.data_io.df_connect.client.from_config(my_config)

    # create needed variables
    hdfs_data_path = hdfs_base_path + "/data/"
    hdfs_file_url = hdfs_data_path + f"/{dataset_name}.avro"
    hdfs_schema_url = hdfs_base_path + f"/schema/{dataset_name}.avsc"

    # now generate the create table statement - pay attention to the LOCATION keyword and
    # notice it is the directory of the avro file, not the file itself

    # **** SEE WARNING IN TOP LEVEL COMMENTS ****
    hive_create_table_statement = (
        f"CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} ROW FORMAT SERDE "
        + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED AS INPUTFORMAT "
        + "'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT "
        + "'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION "
        + f"'{hdfs_data_path}' TBLPROPERTIES "
        + f"('avro.schema.url'='{hdfs_schema_url}')"
    )

    response_jsons = []
    # export the schema first
    schema_response = tbox.data_io.df_connect.client.export_dataset_avro_schema(
        my_connect, url=hdfs_schema_url, dataset_name=dataset_name, fs_type=FileSystemType.HDFS
    )
    response_jsons.append(schema_response)

    # export the data next
    data_response = tbox.data_io.df_connect.client.export_dataset_as_avro(
        my_connect, url=hdfs_file_url, dataset_name=dataset_name, fs_type=FileSystemType.HDFS
    )
    response_jsons.append(data_response)

    # now create the table via execute endpoint
    execute_response = tbox.data_io.df_connect.client.execute_statement(
        my_connect, statement=hive_create_table_statement
    )
    response_jsons.append(execute_response)

    return response_jsons


if __name__ == "__main__":
    main()

Dataframe

tamr-toolbox provides functions to create a pandas Dataframe from a Tamr dataset and optionally flatten it. It can also perform basic profiling and validation of pandas Dataframes, intended to be used for data quality checks before upserting records into Tamr.

Create Dataframe from Tamr Dataset

"""Snippet for exporting a dataset from Tamr as a pandas.Dataframe"""
import tamr_toolbox as tbox

# Read config, make Tamr Client
tamr = tbox.utils.client.create(username="user", password="pw", host="localhost")

# get dataframe from Tamr Dataset
dataset = tamr.datasets.by_resource_id("my_tamr_dataset_id")
# default will stream all rows and not apply any flattening
df = tbox.data_io.dataframe.from_dataset(dataset)
# get with lists flattened to strings and a subset of columns and rows
df1 = tbox.data_io.dataframe.from_dataset(
    dataset, flatten_delimiter="|", columns=["tamr_id", "last_name", "first_name"], nrows=5
)
# if the Tamr dataset is not streamable, pass this option to allow refreshing it
df2 = tbox.data_io.dataframe.from_dataset(dataset, nrows=5, allow_dataset_refresh=True)

# a dataframe can also be flattened after creation
# default will attempt to flatten all columns
df3 = tbox.data_io.dataframe.flatten(df)
# flatten only a subset of columns, and force non-string inner array types to strings
df4 = tbox.data_io.dataframe.flatten(df, delimiter="|", columns=["last_name"], force=True)

Validate a Dataframe before upserting records to a Tamr Dataset

"""Snippet for validating the contents of a pandas.Dataframe followed by ingestion into a
Tamr dataset"""
import tamr_toolbox as tbox
import pandas as pd

# Read config, make Tamr Client
tamr = tbox.utils.client.create(username="user", password="pw", host="localhost")

# create DataFrame.  typically this would be from a CSV file
df = pd.read_csv("my_file.csv", dtype="str")

# perform validation
tbox.data_io.dataframe.validate(
    df,
    require_present_columns=["primary_key", "column_1", "column_2"],
    require_unique_columns=["primary_key"],
    require_nonnull_columns=["primary_key", "column_1"],
)

# can also have validation return a boolean and a dict of failed columns
result = tbox.data_io.dataframe.validate(
    df,
    raise_error=False,
    require_present_columns=["primary_key", "column_1", "column_2"],
    require_unique_columns=["primary_key"],
    require_nonnull_columns=["primary_key", "column_1"],
)

# result bool will be True if all tests succeed, then proceed to upsert the records from the
# CSV dataset to Tamr
if result.passed:
    tamr_dataset = tamr.datasets.by_resource_id("my_tamr_dataset_id")
    tamr_dataset.upsert_from_dataframe(df, primary_key_name="primary_key")

# if any checks have failed, get the details
else:
    print(f"Validation failed, failing checks: {result.details}")

CSV

Export CSV from Tamr Dataset to a designated filepath

# dataset.config.yaml

logging_dir: $TAMR_PROJECT_LOGGING_DIR # Example: "/home/users/jane/my-project/logs"

my_tamr_instance:
    host: $TAMR_HOST # Example: "1.2.3.4"
    protocol: "http"
    port: "9100"
    username: "admin"
    password: $TAMR_PASSWORD  # Example: "abc123"

datasets:
    my_mastering_project_dataset:
        name: "my_mastering_project_unified_dataset_published_clusters_with_data"
        id: "5"
        export_file_path: "/path/to/data/output/my_mastering_project_unified_dataset_published_clusters_with_data.csv"
    my_categorization_project_dataset:
        name: "my_categorization_project_unified_dataset_classifications_with_data"
        id: "12"
        export_file_path: "/path/to/data/output/my_categorization_project_unified_dataset_classifications_with_data.csv"
"""Example script for exporting a dataset from Tamr as a CSV"""
import argparse
from typing import Dict, Any

import tamr_toolbox as tbox


def main(
    *, instance_connection_info: Dict[str, Any], dataset_id: str, export_file_path: str
) -> None:
    """Exports a csv from a dataset

    Args:
        instance_connection_info: Information for connecting to Tamr (host, port, username etc)
        dataset_id: The id of the dataset to export
        export_file_path: Path to write the csv file to
    """

    # Create the tamr client
    tamr_client = tbox.utils.client.create(**instance_connection_info)

    dataset = tamr_client.datasets.by_resource_id(dataset_id)

    # Export the default using default settings
    tbox.data_io.csv.from_dataset(dataset, export_file_path=export_file_path)

    # Export after adjusting some of the default parameters including csv delimiter,
    # flattening delimiter for recording multi-values, limiting and ordering the columns, limiting
    # the number of rows, and adjusting the buffer size that determines at what interval records
    # are written to disk
    tbox.data_io.csv.from_dataset(
        dataset,
        export_file_path,
        csv_delimiter=";",
        flatten_delimiter="#",
        columns=["tamr_id", "last_name", "first_name"],
        nrows=1000,
        buffer_size=100,
    )

    LOGGER.info("Writing CSV is complete")


if __name__ == "__main__":
    # Set up command line arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--config", help="path to a YAML configuration file", required=False)
    args = parser.parse_args()

    # Load the configuration from the file path provided or the default file path specified
    CONFIG = tbox.utils.config.from_yaml(
        path_to_file=args.config, default_path_to_file="/path/to/my/conf/dataset.config.yaml"
    )

    # Use the configuration to create a global logger
    LOGGER = tbox.utils.logger.create(__name__, log_directory=CONFIG["logging_dir"])

    # Run the main function
    main(
        instance_connection_info=CONFIG["my_tamr_instance"],
        dataset_id=CONFIG["datasets"]["my_mastering_project_dataset"]["id"],
        export_file_path=CONFIG["datasets"]["my_mastering_project_dataset"]["export_file_path"],
    )

Export CSV from taxonomy to a designated filepath

"""Snippet for exporting taxonomy of a Tamr Categorization project to a csv file"""
import csv
import tamr_toolbox as tbox

# Create the Tamr client
tamr = tbox.utils.client.create(username="user", password="pw", host="localhost")

# Get a Tamr categorization project by ID
my_project = tamr.projects.by_resource_id("2")

# Export the taxonomy to a csv file
export_file_path = "path_to_the_csv_file/file_name.csv"
records_written = tbox.data_io.csv.from_taxonomy(
    my_project,
    export_file_path,
    csv_delimiter=",",
    flatten_delimiter="|",
    quote_character='"',
    quoting=csv.QUOTE_MINIMAL,
    overwrite=False,
)
print(f"Wrote {records_written} categories to {export_file_path}")