Source code for tamr_toolbox.data_io.dataframe

"""Tasks related to moving data in or out of Tamr using pandas.DataFrames"""
import os
import logging

from typing import Optional, List, Callable, Any, Iterable, Tuple
from collections import OrderedDict, defaultdict
from functools import partial

from tamr_unify_client.dataset.resource import Dataset

from tamr_toolbox.data_io import common
from tamr_toolbox.models.validation_check import ValidationCheck
from tamr_toolbox.utils import operation


LOGGER = logging.getLogger(__name__)

# Building our documentation requires access to all dependencies, including optional ones
# This environments variable is set automatically when `invoke docs` is used
BUILDING_DOCS = os.environ.get("TAMR_TOOLBOX_DOCS") == "1"
if BUILDING_DOCS:
    # Import relevant optional dependencies
    import pandas


def _flatten_list_series(
    series: "pandas.Series",
    *,
    delimiter: str = "|",
    force: bool = False,
    columns: Optional[List] = None,
) -> "pandas.Series":
    """
    Flattens list types to strings.  See ``_flatten_list`` for details.
    Operates on a pandas Series, and is intended to be applied using `apply` to a pandas DataFrame.

    Args:
        series: pandas Series of values to flatten
        delimiter: string to use as delimiter for concatenating lists
        force: if True, will force non-string inner types to string
        columns: optional, list of columns to flatten

    Returns: Series of flattened values
    """
    func = partial(common._flatten_list, delimiter=delimiter, force=force)
    # if we haven't specified which columns, or this column is in the list, apply the function
    if columns is None or series.name in columns:
        return series.apply(func)
    # otherwise do nothing
    else:
        return series


[docs]def flatten( df: "pandas.DataFrame", *, delimiter: str = "|", columns: Optional[List[str]] = None, force: bool = False, ) -> "pandas.DataFrame": """ Converts DataFrame columns with list type to strings and returns a copy of the DataFrame with this change. Tamr often produces datasets with columns of list type, which often are more easily consumed as single-valued columns. Args: df: DataFrame from Tamr dataset delimiter: string to use as delimiter for concatenating lists columns: optional, list of columns to flatten force: if True, will force non-string inner types to string Returns: flattened DataFrame """ # apply a series-wise function that can operate on specific columns of the dataframe series_func = partial(_flatten_list_series, delimiter=delimiter, force=force, columns=columns) return df.apply(series_func)
[docs]def from_dataset( dataset: Dataset, *, columns: Optional[List[str]] = None, flatten_delimiter: Optional[str] = None, flatten_columns: Optional[List[str]] = None, force_flatten: bool = False, nrows: Optional[int] = None, allow_dataset_refresh: bool = False, ) -> "pandas.DataFrame": """ Creates a DataFrame from a Tamr Dataset Args: dataset: Tamr Dataset object columns: optional, ordered list of columns to keep flatten_delimiter: if set, flatten list types to strings by concatenating with this delimiter flatten_columns: optional, list of columns to flatten force_flatten: if False, arrays with inner types other than string will not be flattened. if True, will force all inner types to strings when flattening values. if True, `flatten_delimiter` must be specified. nrows: number of rows to read. default None will read all rows allow_dataset_refresh: if True, allows running a job to refresh dataset to make streamable Returns: DataFrame Raises: ValueError: if `columns` or `flatten_columns` contain columns that are not present in `dataset` """ # Validate arguments if force_flatten and flatten_delimiter is None: raise ValueError("`force_flatten` is True but no `flatten_delimiter` is specified.") # This function requires pandas, an optional dependency import pandas LOGGER.info( f"Streaming records to DataFrame for dataset {dataset.name} (id={dataset.resource_id})." ) dataset_attrs = [attr for attr in dataset.attributes] attr_names = [attr.name for attr in dataset_attrs] # check that specified columns exist if columns is not None: common._check_columns_subset( input_list=columns, reference_list=attr_names, raise_error=True ) # checks on columns to flatten if flatten_delimiter is not None: if flatten_columns is None: flatten_columns = list(attr_names) else: # check that specified columns exist common._check_columns_subset( input_list=flatten_columns, reference_list=attr_names, raise_error=True ) # check types of flatten_columns for attr in dataset_attrs: if attr.name not in flatten_columns: continue attr_type = attr.spec().to_dict()["type"] if attr_type["baseType"] == "ARRAY" and attr_type["innerType"]["baseType"] != "STRING": if force_flatten: LOGGER.info( f"Will force attribute to string: {attr.name}, with type: {attr_type}" ) else: LOGGER.warning( f"Will not flatten attribute: {attr.name}, with type: {attr_type}" ) flatten_columns.remove(attr.name) if not dataset.status().is_streamable: if allow_dataset_refresh: LOGGER.info(f"Refreshing dataset {dataset.name} to make streamable.") op = dataset.refresh() operation.enforce_success(op) else: message = ( f"Dataset {dataset.name} is not streamable. Refresh it first, or run" f" with allow_dataset_refresh=True" ) LOGGER.error(message) raise RuntimeError(message) # if flattening, set the function to apply to records as _flatten_list # otherwise set as _identity func = None if flatten_delimiter is not None: func = partial(common._flatten_list, delimiter=flatten_delimiter, force=force_flatten) df = pandas.DataFrame.from_records( common._yield_records( dataset, func=func, columns=columns, flatten_columns=flatten_columns ), columns=columns, nrows=nrows, ) return df
def _count_null(s: "pandas.Series") -> int: """ Returns the number of null values in a pandas Series Args: s: pandas Series Returns: int """ return s.isna().sum()
[docs]def profile(df: "pandas.DataFrame") -> "pandas.DataFrame": """ Computes profile statistics from an input DataFrame, and returns statistics in another DataFrame. Intended to be used for validation checks on a DataFrame before upserting records to a Tamr Dataset Args: df: DataFrame Returns: DataFrame with profile statistics """ # compute per-attribute metrics # attr_metrics: aggregations that return a Series indexed by the attribute names # attr_metric_names: aggregations that return a Series indexed by the attribute names attr_agg_functions = OrderedDict() # number of unique values attr_agg_functions["DistinctValueCount"] = "nunique" # number of null values attr_agg_functions["EmptyValueCount"] = _count_null # aggregate to compute per-attribute metrics df_profile = df.agg(list(attr_agg_functions.values()), axis=0).T # rename columns df_profile.columns = attr_agg_functions.keys() # add summary level metrics df_profile["RecordCount"] = df.shape[0] return df_profile
def _check_present_columns( df: "pandas.DataFrame", *, require_present_columns: Optional[List[str]] = None ) -> ValidationCheck: """ Checks that a specified list of columns in a DataFrame are all present Args: df: DataFrame require_present_columns: list of columns that are checked to be present Returns: ValidationCheck object, with bool for whether check passed and dict of failing columns """ failed_checks_dict = defaultdict(list) if require_present_columns is None: require_present_columns = [] # check if columns are present for col in require_present_columns: if col not in df.columns: LOGGER.warning(f"column {col} is not present") failed_checks_dict["failed_present_columns"].append(col) passed = len(failed_checks_dict) == 0 return ValidationCheck(passed, failed_checks_dict) def _check_unique_columns( df_profile: "pandas.DataFrame", *, require_unique_columns: Optional[List[str]] = None ) -> ValidationCheck: """ Checks that a specified list of columns in a DataFrame have all unique values Args: df_profile: DataFrame containing profile information require_unique_columns: list of columns that are checked to have all unique values, e.g. a primary key column Returns: ValidationCheck object, with bool for whether check passed and dict of failing columns """ failed_checks_dict = defaultdict(list) if require_unique_columns is None: require_unique_columns = [] # check for unique columns for col in require_unique_columns: num_unique = df_profile.loc[col, "DistinctValueCount"] num_records = df_profile.loc[col, "RecordCount"] if not num_unique == num_records: LOGGER.warning( f"column {col} has only {num_unique} unique values out of {num_records} records" ) failed_checks_dict["failed_unique_columns"].append(col) passed = len(failed_checks_dict) == 0 return ValidationCheck(passed, failed_checks_dict) def _check_nonnull_columns( df_profile: "pandas.DataFrame", *, require_nonnull_columns: Optional[List[str]] = None ) -> ValidationCheck: """ Checks that a specified list of columns in a DataFrame have all non-null values Args: df_profile: DataFrame containing profile information require_nonnull_columns: list of columns that are checked to have all non-null values Returns: ValidationCheck object, with bool for whether check passed and dict of failing columns """ failed_checks_dict = defaultdict(list) if require_nonnull_columns is None: require_nonnull_columns = [] # check for nonnull columns for col in require_nonnull_columns: num_null = df_profile.loc[col, "EmptyValueCount"] if num_null > 0: LOGGER.warning(f"column {col} has {num_null} null values") failed_checks_dict["failed_nonnull_columns"].append(col) passed = len(failed_checks_dict) == 0 return ValidationCheck(passed, failed_checks_dict) def _check_custom( df: "pandas.DataFrame", columns_to_check: Optional[List[str]], check_function: Callable[[Any], bool], ) -> ValidationCheck: """ Args: df: Dataframe columns_to_check: columns on which check_function will be applied check_function: function applied on columns_to_check Returns: ValidationCheck object, with bool for whether all checks passed and dict of failing columns """ failed_checks_dict = defaultdict(list) df1 = df[columns_to_check].applymap(check_function) for col in columns_to_check: if not df1[col].all(): LOGGER.warning(f"column {col} failed custom check {check_function.__name__}") failed_checks_dict[f"failed custom check {check_function.__name__}"].append(col) passed = len(failed_checks_dict) == 0 return ValidationCheck(passed, failed_checks_dict)
[docs]def validate( df: "pandas.DataFrame", *, raise_error: bool = True, require_present_columns: Optional[List[str]] = None, require_unique_columns: Optional[List[str]] = None, require_nonnull_columns: Optional[List[str]] = None, custom_checks: Iterable[Tuple[Callable[[Any], bool], List[str]]] = tuple(), ) -> ValidationCheck: """ Performs validation checks on a DataFrame. Returns a dict of columns that fail each check, and optionally returns an error. Intended to be used on a DataFrame prior to upserting records into a Tamr dataset. Args: df: DataFrame raise_error: if True, will raise a ValueError on failing checks. if False, will print Warning and return a dict require_present_columns: list of columns that are checked to be present require_unique_columns: list of columns that are checked to have all unique values, e.g. a primary key column require_nonnull_columns: list of columns that are checked to have all non-null values custom_checks: collection of tuples each containing a custom function and list of columns, on which to apply it Returns: ValidationCheck object, with bool for whether all checks passed and dict of failing columns Raises: ValueError: if `raise_error` is set True, and any checks fail """ failed_checks_dict = defaultdict(list) if require_present_columns is None: require_present_columns = [] if require_unique_columns is None: require_unique_columns = [] if require_nonnull_columns is None: require_nonnull_columns = [] # compute profile stats df_profile = profile(df) # check for present columns failed_checks_dict.update( _check_present_columns(df, require_present_columns=require_present_columns).details ) # check for unique columns failed_checks_dict.update( _check_unique_columns(df_profile, require_unique_columns=require_unique_columns).details ) # check for nonnull columns failed_checks_dict.update( _check_nonnull_columns(df_profile, require_nonnull_columns=require_nonnull_columns).details ) for custom_check in custom_checks: failed_checks_dict.update( _check_custom( df, check_function=custom_check[0], columns_to_check=custom_check[1] ).details ) failed_checks_dict = dict(failed_checks_dict) passed = len(failed_checks_dict) == 0 if not passed and raise_error: raise ( ValueError( f"DataFrame validation failed. failed columns for each check: " f"{failed_checks_dict}" ) ) return ValidationCheck(passed, failed_checks_dict)