Concurrent

Graph

class tamr_toolbox.workflow.concurrent.Graph.Graph(edges, directed_graph)[source]

A dataclass for holding a set of Tamr project dependencies (edges), and the generated graph from them.

tamr_toolbox.workflow.concurrent.Graph.from_edges(edges)[source]

Directly build a graph from a list of edges - tuples of format (source, target) dependencies

Parameters

edges (Set[tuple]) – List of edges in tuple format

Return type

Graph

Returns

Graph object

tamr_toolbox.workflow.concurrent.Graph.from_project_list(projects, client)[source]

Creates a graph from a list of datasets :type projects: List[Project] :param projects: list of Tamr dataset objects :type client: Client :param client: tamr client

Return type

Graph

Returns

A Graph object built from the dependencies of the datasets passed

tamr_toolbox.workflow.concurrent.Graph.get_source_nodes(graph)[source]

Gives all source nodes in a graph

Parameters

graph (Graph) – Graph for which to find source nodes

Return type

List[str]

Returns

List of node names

tamr_toolbox.workflow.concurrent.Graph.get_end_nodes(graph)[source]

Returns all end nodes in a directed graph

Parameters

graph (Graph) – Graph for which to find end nodes

Returns

List of names of all end nodes

tamr_toolbox.workflow.concurrent.Graph.get_projects_by_tier(graph)[source]

Find the different projects at each tier

Parameters

graph (Graph) – the Graph for which to generate the tiers

Returns

[paths_at_that_tier], …} e.g. {1: [‘SM_project_1’, ‘Classification_project_1’], 2: [‘Mastering_project’], 3: [‘Golden_records_project’]}

Return type

A json dict who’s structure is {‘tier’

tamr_toolbox.workflow.concurrent.Graph.get_all_downstream_nodes(graph, node)[source]

Get all nodes downstream of this one (i.e. they have a path from this node to them)

Parameters
  • graph (Graph) – the graph to use

  • node (str) – the node to check

Return type

Set[str]

Returns

A list of downstream node names

tamr_toolbox.workflow.concurrent.Graph.get_successors(graph, node)[source]

Get all successor nodes to the current node

Parameters
  • graph (Graph) – the graph to use

  • node (str) – the node to check

Return type

Set[str]

Returns

A set of nodes that are successors to the current node

tamr_toolbox.workflow.concurrent.Graph.get_predecessors(graph, node)[source]

Get all predecessor nodes to the current node

Parameters
  • graph (Graph) – the graph to use

  • node (str) – the node to check

Return type

Set[str]

Returns

A set of nodes that are predecessors to the current node

tamr_toolbox.workflow.concurrent.Graph.add_edges(graph, edges)[source]

Takes an existing graph and creates a new one with the new edge

Parameters
  • graph (Graph) – the graph to start with

  • edges (Set[tuple]) – the edges to add

Return type

Graph

Returns

A copy of initial graph with new edge

Planner

class tamr_toolbox.workflow.concurrent.Planner.Planner(plan, starting_tier, graph)[source]

A dataclass to hold the plan, the starting tier, and the mode of execution. The plan is a json dict where each key is a project name and the value is a PlanNode object

The starting tier is the tier at which to start execution. All jobs at lower tiers are

marked as skippable.

The graph is the graph that contains the backing project dependencies.

tamr_toolbox.workflow.concurrent.Planner.from_graph(graph, *, tamr_client, starting_tier=0, train=False)[source]

Creates a Planner class from a Graph. The plan object is a json dict specifying how the plan can be executed and its status.

Parameters
  • graph (Graph) – the dataset dependency graph to use to create the planner

  • tamr_client (Client) – the tamr client object associated with the instance for which to create the plan

  • starting_tier (int) – the tier at which to start executing the plan, every job at lower tiers is skipped and marked as skippable

  • train – global config for whether or not to ‘apply feedback’/train the model in the workflows

Return type

Planner

Returns

Planner instance

tamr_toolbox.workflow.concurrent.Planner.update_plan(planner, *, plan_node)[source]

Create an new planner object with updated status from an updated PlanNode object :type planner: Planner :param planner: the original planner :type plan_node: PlanNode :param plan_node: an updated set of plan nodes

Return type

Planner

Returns

a copy of the original planner object with an updated status

tamr_toolbox.workflow.concurrent.Planner.to_json(planner)[source]

Convert planner to a json dict :type planner: Planner :param planner: the planner to convert

Return type

List[Dict]

Returns

representation of a planner in json format

tamr_toolbox.workflow.concurrent.Planner.execute(planner, tamr, *, concurrency_level=2, save_state=False, polling_interval=30)[source]

Executes the plan

Parameters
  • planner (Planner) – The planner object whose plan will be executed

  • tamr (Client) – the tamr client to use

  • concurrency_level (int) – the number of concurrent jobs to run at once

  • save_state (bool) – whether or not to save the plan state to json after each update

  • polling_interval (int) – the amount of time in seconds to wait between polling

Return type

Planner

Returns

the planner object after execution

PlanNode

class tamr_toolbox.workflow.concurrent.PlanNode.PlanNode(name, operations, project, priority, current_op, status=PlanNodeStatus.PLANNED, train=False, current_step=None, steps_to_run=None)[source]

Dataclass for the node of a Planner object - contains method for setting list of operations based on project type

tamr_toolbox.workflow.concurrent.PlanNode.poll(plan_node)[source]

Polls the status of the current_op object and returns a new PlanNode with updated status

Parameters

plan_node (PlanNode) – the PlanNode object to poll

Return type

PlanNode

Returns

Copy of the original plan node with status updated based on the status of the current_op

tamr_toolbox.workflow.concurrent.PlanNode.run_next_step(plan_node)[source]

Takes a plan node and runs the project

Parameters

plan_node (PlanNode) – the node to run

Return type

PlanNode

Returns

updated plan node

tamr_toolbox.workflow.concurrent.PlanNode.monitor(nodes, *, timeout=12, polling_interval=30)[source]
Monitors the status of a list of PlanNodes, when one on that list changes to

either failed/succeeded/cancelled it returns the list

Parameters
  • nodes (List[PlanNode]) – list of nodes to monitor

  • timeout (int) – number of hours to poll before timing out for change in job status

  • polling_interval (int) – the amount of time in seconds to wait between polling

Return type

List[PlanNode]

Returns

the same list of planNodes with updated statuses

PlanNodeStatus

Enum representing job status

class tamr_toolbox.workflow.concurrent.PlanNodeStatus.PlanNodeStatus(value)[source]

A class representing job status for executing a concurrent pipeline

tamr_toolbox.workflow.concurrent.PlanNodeStatus.from_plan_node(plan_node)[source]

Return a PlanNode Status from a PlanNode object.

Parameters

plan_node (PlanNode) – the plan node object to get the status of

Return type

PlanNodeStatus

Returns

Latest view of the status of the PlanNode passed in

PlanStatus

Enum representing plan status

class tamr_toolbox.workflow.concurrent.PlanStatus.PlanStatus(value)[source]

A class representing the status of a tamr_toolbox.workflow.concurrent.Planner plan

tamr_toolbox.workflow.concurrent.PlanStatus.from_planner(planner)[source]

Create a PlanStatus object from a planner object :param planner: the planner to pass in

Return type

PlanStatus

Returns

the plan’s status as a PlanStatus object