Source code for tamr_toolbox.data_io.csv

"""Tasks related to moving data in or out of Tamr using delimited files"""
from typing import Optional, List, Union, Dict
from functools import partial
from pathlib import Path

import csv
import logging
import os
import requests

from tamr_unify_client.dataset.resource import Dataset
from tamr_unify_client.project.resource import Project

from tamr_toolbox.utils import operation
from tamr_toolbox.data_io import common

LOGGER = logging.getLogger(__name__)


[docs]def from_dataset( dataset: Dataset, export_file_path: Union[Path, str], *, csv_delimiter: str = ",", columns: Optional[List[str]] = None, column_name_dict: Optional[Dict[str, str]] = None, flatten_delimiter: str = "|", quote_character: str = '"', quoting: int = csv.QUOTE_MINIMAL, na_value: str = "NaN", nrows: Optional[int] = None, allow_dataset_refresh: bool = False, buffer_size: int = 10000, overwrite: bool = False, encoding: str = "utf-8", ) -> int: """ Export a Tamr Dataset to a csv file. Records are streamed to disk and written according to a given buffer size. As a result this is more memory efficient than first reading to a pandas.DataFrame and writing to CSV. Args: dataset: Tamr Dataset object export_file_path: Path to the csv file where the dataset will be saved csv_delimiter: Delimiter of the csv file columns: Optional, Ordered list of columns to write. If None, write all columns in arbitrary order. column_name_dict: Optional, Dictionary in the format {<Tamr dataset column name> : <new csv column name>}, used to rename some or all columns in the output file. flatten_delimiter: Flatten list types to strings by concatenating with this delimiter quote_character: Character used to escape value for csv delimiter when it appears in the value. quoting: The escape strategy to use according to the Python csv writer. See https://docs.python.org/2/library/csv.html#csv.QUOTE_MINIMAL na_value: Value to write that represents empty or missing data. See https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html for the na_values supported by default in pandas.read_csv nrows: Optional, Number of rows to write. If None, then write all rows. allow_dataset_refresh: If True, allows running a job to refresh dataset to make streamable. Otherwise a RuntimeError will be thrown if the dataset is unstreamable. buffer_size: Number of records to store in memory before writing to disk overwrite: if True and export_file_name already exists, overwrite the file. Otherwise throw an error encoding: The encoding to use in the written file. See https://docs.python.org/3/library/functions.html#open Returns: The total number of records written Raises: FileExistsError: if the csv file to which the dataset is to be streamed exists and `overwrite` is False RuntimeError: if `dataset` is not streamable and `allow_dataset_refresh` is False ValueError: if `columns` or `flatten_columns` contain columns that are not present in `dataset`, or if column renaming would yield duplicate column names """ LOGGER.info( f"Streaming records to csv file {export_file_path} from dataset {dataset.name} " f"(id={dataset.resource_id})." ) if os.path.exists(export_file_path): if not overwrite: message = ( f"CSV file {export_file_path} already exists. " f"(Set 'overwrite' flag to True if you wish to overwrite)" ) LOGGER.error(message) raise FileExistsError(message) else: LOGGER.warning(f"CSV file {export_file_path} already exists and will be overwritten") if csv_delimiter == flatten_delimiter: message = ( f"The CSV delimiter '{csv_delimiter}' cannot be identical to the list " f"flattening delimiter '{flatten_delimiter}'" ) LOGGER.error(message) raise ValueError(message) attribute_names = [attr.name for attr in dataset.attributes] # check that specified columns exist if columns is not None: common._check_columns_subset( input_list=columns, reference_list=attribute_names, raise_error=True ) if not dataset.status().is_streamable: if allow_dataset_refresh: LOGGER.info(f"Refreshing dataset {dataset.name} to make streamable.") op = dataset.refresh() operation.enforce_success(op) else: message = ( f"Dataset {dataset.name} is not streamable. Refresh it first, or " f"run with allow_dataset_refresh=True" ) LOGGER.error(message) raise RuntimeError(message) func = partial(common._flatten_list, delimiter=flatten_delimiter, force=True) full_column_name_dict = common._get_column_mapping_dict( dataset_attribute_names=attribute_names, column_name_dict=column_name_dict, columns=columns ) # Open CSV file and use newline='' as recommended by # https://docs.python.org/3/library/csv.html#csv.writer with open(export_file_path, "w", newline="", encoding=encoding) as csv_file: csv_writer = csv.writer( csv_file, delimiter=csv_delimiter, quotechar=quote_character, quoting=quoting ) buffer = [] header = None # Set record number to -1 in case the dataset streamed has no records record_number = -1 for record_number, record in enumerate( common._yield_records(dataset, func=func, columns=columns) ): # Obtain and write the header information only on the first pass if header is None: header = full_column_name_dict.values() csv_writer.writerow(header) # Replace empty values with a specific null value # This also allows nulls to be treated differently from empty strings record = [ na_value if record[k] is None else record[k] for k in full_column_name_dict.keys() ] buffer.append(record) at_max_buffer = buffer_size is not None and (len(buffer) >= buffer_size) at_max_rows = nrows is not None and record_number >= nrows - 1 if at_max_buffer or at_max_rows: csv_writer.writerows(buffer) LOGGER.debug(f"Written dataset {dataset.name} up to record {record_number+1}") buffer = [] if at_max_rows: break # Write anything remaining # This will occur whenever the buffer is non-zero and the number of records # is not exactly divisible by the buffer number # For example, writing a dataset with 1100 records using a buffer size of 500 # will write in 3 chunks: 2 x 500 above and the remaining 100 handled here if len(buffer) != 0: LOGGER.debug(f"Written dataset {dataset.name} up to record {record_number + 1}") csv_writer.writerows(buffer) if record_number == -1: # If record number is -1 then no records were streamed, possibly because the dataset # has no records. We therefore want to simply save the headers if columns is not None: csv_writer.writerow(columns) else: csv_writer.writerow(attribute_names) records_written = record_number + 1 LOGGER.info( f"Wrote {records_written} from dataset {dataset.name} (id={dataset.resource_id}) " f"to {export_file_path}" ) return records_written
[docs]def from_taxonomy( project: Project, export_file_path: Union[Path, str], *, csv_delimiter: str = ",", flatten_delimiter: str = "|", quote_character: str = '"', quoting: int = csv.QUOTE_MINIMAL, overwrite: bool = False, encoding: str = "utf-8", ) -> int: """ Export a Tamr taxonomy to a csv file. Records are streamed to disk and written according to a given buffer size. Args: project: Tamr Project object export_file_path: Path to the csv file where the dataset will be saved csv_delimiter: Delimiter of the csv file flatten_delimiter: Flatten list types to strings by concatenating with this delimiter quote_character: Character used to escape value for csv delimiter when it appears in the value. quoting: The escape strategy to use according to the Python csv writer. See https://docs.python.org/2/library/csv.html#csv.QUOTE_MINIMAL overwrite: if True and export_file_name already exists, overwrite the file. Otherwise throw an error encoding: The encoding to use in the written file. See https://docs.python.org/3/library/functions.html#open Returns: The total number of records written Raises: FileExistsError: if `export_file_path` exists and `overwrite` is set to False IOError: if the specified filepath does not exist or cannot be accessed RuntimeError: if the classification project is not yet associated with a taxonomy or taxonomy cannot be written to a csv file TypeError: if the project type is not classification ValueError: if `columns` and `flatten_columns` are identical values """ LOGGER.info( f"Streaming taxonomy to csv file {export_file_path} from project {project.name} " f"(project id={project.resource_id})." ) try: project = project.as_categorization() except TypeError: not_categorization_error = f"Project {project.name} is not a classification project." LOGGER.error(not_categorization_error) raise TypeError(not_categorization_error) if os.path.exists(export_file_path): if not overwrite: message = ( f"CSV file {export_file_path} already exists. " f"(Set 'overwrite' flag to True if you wish to overwrite)" ) LOGGER.error(message) raise FileExistsError(message) else: LOGGER.warning(f"CSV file {export_file_path} already exists and will be overwritten") if csv_delimiter == flatten_delimiter: message = ( f"The CSV delimiter '{csv_delimiter}' cannot be identical to the list " f"flattening delimiter '{flatten_delimiter}'" ) LOGGER.error(message) raise ValueError(message) try: taxonomy = project.as_categorization().taxonomy() except requests.exceptions.RequestException: no_taxonomy_error = f"Project {project.name} is not associated with any taxonomy yet." LOGGER.error(no_taxonomy_error) raise RuntimeError(no_taxonomy_error) # obtain categories and store in a list categories = taxonomy.categories() taxonomy_list = [] for category in categories: taxonomy_list.append(category.path) # sort the categories taxonomy_list.sort() # Open CSV file and use newline='' as recommended by # https://docs.python.org/3/library/csv.html#csv.writer try: f = open(export_file_path, "w", newline="", encoding=encoding) except (FileNotFoundError, IOError, PermissionError): cannot_open_error = f"File path {export_file_path} could not be opened for writing." LOGGER.error(cannot_open_error) raise IOError(cannot_open_error) else: try: csv_writer = csv.writer( f, delimiter=csv_delimiter, quotechar=quote_character, quoting=quoting ) csv_writer.writerows(taxonomy_list) except csv.Error as e: general_error = ( "Encountered an error while writing taxonomy categories to " f"{export_file_path}: {e}" ) f.close() LOGGER.error(general_error) raise RuntimeError(general_error) finally: f.close() records_written = len(taxonomy_list) LOGGER.info( f"Wrote {records_written} categories from {project.name} taxonomy (project id" f"={project.resource_id}) to {export_file_path}" ) return records_written