Source code for tamr_toolbox.utils.upstream

"""Functions related to projects upstream of a specified project"""
from typing import List, Optional
import logging

from tamr_unify_client.project.resource import Project
from tamr_unify_client.dataset.resource import Dataset

LOGGER = logging.getLogger(__name__)


def _find_projects(
    project: Project,
    all_projects: Optional[List[Project]] = None,
    project_list: Optional[List[str]] = None,
    upstream_projects: Optional[List[str]] = None,
) -> List[Project]:
    """Check for upstream projects that have source datasets that require
    updating
    Args:
        project: a tamr project
        track of projects that need to be checked for upstream dependencies
        all_projects: list of all projects that exist within Tamr
        project_list: keeps track of projects that have yet to be checked
        for upstream dependencies.
        upstream_projects: list to keep track of projects within a chain of projects
    Returns:
        List of tamr projects in a chained workflow
    """
    # Obtain the name of project to be updated and initiate the list of
    # projects (project_list) that are maintained within the check_for_upstream_project
    # function to check for upstream dependencies

    # Get project_id from project
    project_id = project.resource_id

    if not upstream_projects:
        upstream_projects = []
        project_list = [project_id]

        all_projects = {}

        for project in project.client.projects:
            all_projects[project.name] = project.resource_id

    for project_id in project_list:
        # Obtain resource id of project to be updated from the dictionary of
        # project details
        project = project.client.projects.by_resource_id(project_id)
        if project.resource_id not in upstream_projects:
            upstream_projects.append(project.resource_id)

        # Remove the current project from the list tracking projects that
        # need to be reviewed for upstream projects/dependencies
        project_list.remove(project_id)

        project_source_datasets = project.client.get(
            f"/api/versioned/v1/projects/{project.resource_id}/inputDatasets"
        ).json()
        relative_ids = [dataset["relativeId"] for dataset in project_source_datasets]

        # Check which upstream projects the upstream datasets derive from
        for relative_id in relative_ids:
            dataset_details = project.client.datasets.by_relative_id(relative_id)

            # If these projects are linked to further datasets upstream,
            # keep track of these upstream projects in project_list
            upstream_datasets = dataset_details.usage().usage.output_from_project_steps
            if upstream_datasets:
                for upstream_dataset in upstream_datasets:
                    upstream_project = upstream_dataset.project_name
                    project_id = all_projects[upstream_project]
                    if project_id not in project_list:
                        project_list.append(project_id)

    # If there are projects added to the project list, check whether these
    # projects have dependencies upstream by rerunning the
    # check_for_upstream_datasets recursively
    if project_list:
        return _find_projects(
            project=project,
            project_list=project_list,
            all_projects=all_projects,
            upstream_projects=upstream_projects,
        )
    else:
        # If the first project in the chain is reached, return the list of
        # projects that need to be updated

        # Reverse list of projects so that the first project in the chain is
        # returned first
        upstream_projects.reverse()

        upstream_projects = [
            project.client.projects.by_resource_id(project_id) for project_id in upstream_projects
        ]

        return upstream_projects


def _find_datasets(dataset: Dataset) -> List[Dataset]:
    """Returns a dataset's upstream datasets.

    Args:
        dataset: a Tamr Dataset Object
    Returns:
        List of upstream datasets

    """

    # Create empty list to populate with upstream datasets
    data_upstream = []
    # Main dataset
    main_dataset = dataset
    # Create list of datasets to go through
    datasets_to_check = [main_dataset]
    # Create list of checked datasets
    datasets_checked = []

    # Find all upstream datasets
    while len(datasets_to_check) > 0:
        # output is a list; check if anything present, add output to upstream datasets
        # and add to list of datasets to check
        if datasets_to_check[0].name not in datasets_checked:
            upstream = _request_upstream_datasets(datasets_to_check[0])
            datasets_checked.append(datasets_to_check[0].name)
            datasets_to_check.remove(datasets_to_check[0])
            if len(upstream) != 0:
                data_upstream.extend(upstream)
                datasets_to_check.extend(upstream)
        else:
            datasets_to_check.remove(datasets_to_check[0])
    # Deduplicate & collect names: deduplicate based on identical names
    dataset_names = []
    for data in data_upstream:
        dataset_names.append(data.name)
    # Use dictionary to remove duplicate names
    names_datasets_zip = dict(zip(dataset_names, data_upstream))
    dataset_upstream = list(names_datasets_zip.values())
    return dataset_upstream


def _request_upstream_datasets(dataset: Dataset) -> Dataset:

    """Returns a dataset's upstream dataset

    Args:
        dataset: a Tamr Dataset Object
    Returns:
        The upstream datasets
    """
    # Find upstream datasets, output is a DatasetURI
    upstream = dataset.upstream_datasets()
    dataset_upstream = []
    # Make Dataset our of DatasetURI
    for data in upstream:
        dataset_upstream.append(dataset.client.datasets.by_resource_id(data.resource_id))
    return dataset_upstream


[docs]def datasets(dataset: Dataset) -> List[Dataset]: """Check for upstream datasets associated with a specified dataset Args: dataset: the Tamr dataset for which associated upstream datasets are retrieved Returns: List of Tamr datasets upstream of the target dataset """ upstream_datasets = _find_datasets(dataset) return upstream_datasets
[docs]def projects(project: Project) -> List[Project]: """Check for upstream projects associated with a specified project Args: project: the tamr project for which associated upstream projects are retrieved Returns: List of tamr projects upstream of the target project """ # Retrieve upstream projects (if any) from a specified Project upstream_projects = _find_projects(project) return upstream_projects