Concurrent¶
Graph¶
- class tamr_toolbox.workflow.concurrent.Graph.Graph(edges, directed_graph)[source]¶
A dataclass for holding a set of Tamr project dependencies (edges), and the generated graph from them.
- tamr_toolbox.workflow.concurrent.Graph.from_edges(edges)[source]¶
Directly build a graph from a list of edges - tuples of format (source, target) dependencies
- tamr_toolbox.workflow.concurrent.Graph.from_project_list(projects, client)[source]¶
Creates a graph from a list of datasets :type projects:
List
[Project
] :param projects: list of Tamr dataset objects :type client:Client
:param client: tamr client- Return type
- Returns
A Graph object built from the dependencies of the datasets passed
- tamr_toolbox.workflow.concurrent.Graph.get_source_nodes(graph)[source]¶
Gives all source nodes in a graph
- tamr_toolbox.workflow.concurrent.Graph.get_end_nodes(graph)[source]¶
Returns all end nodes in a directed graph
- Parameters
graph (
Graph
) – Graph for which to find end nodes- Returns
List of names of all end nodes
- tamr_toolbox.workflow.concurrent.Graph.get_projects_by_tier(graph)[source]¶
Find the different projects at each tier
- Parameters
graph (
Graph
) – the Graph for which to generate the tiers- Returns
[paths_at_that_tier], …} e.g. {1: [‘SM_project_1’, ‘Classification_project_1’], 2: [‘Mastering_project’], 3: [‘Golden_records_project’]}
- Return type
A json dict who’s structure is {‘tier’
- tamr_toolbox.workflow.concurrent.Graph.get_all_downstream_nodes(graph, node)[source]¶
Get all nodes downstream of this one (i.e. they have a path from this node to them)
- tamr_toolbox.workflow.concurrent.Graph.get_successors(graph, node)[source]¶
Get all successor nodes to the current node
Planner¶
- class tamr_toolbox.workflow.concurrent.Planner.Planner(plan, starting_tier, graph)[source]¶
A dataclass to hold the plan, the starting tier, and the mode of execution. The plan is a json dict where each key is a project name and the value is a PlanNode object
- The starting tier is the tier at which to start execution. All jobs at lower tiers are
marked as skippable.
The graph is the graph that contains the backing project dependencies.
- tamr_toolbox.workflow.concurrent.Planner.from_graph(graph, *, tamr_client, starting_tier=0, train=False)[source]¶
Creates a Planner class from a Graph. The plan object is a json dict specifying how the plan can be executed and its status.
- Parameters
graph (
Graph
) – the dataset dependency graph to use to create the plannertamr_client (
Client
) – the tamr client object associated with the instance for which to create the planstarting_tier (
int
) – the tier at which to start executing the plan, every job at lower tiers is skipped and marked as skippabletrain – global config for whether or not to ‘apply feedback’/train the model in the workflows
- Return type
- Returns
Planner instance
- tamr_toolbox.workflow.concurrent.Planner.update_plan(planner, *, plan_node)[source]¶
Create an new planner object with updated status from an updated PlanNode object :type planner:
Planner
:param planner: the original planner :type plan_node:PlanNode
:param plan_node: an updated set of plan nodes- Return type
- Returns
a copy of the original planner object with an updated status
- tamr_toolbox.workflow.concurrent.Planner.to_json(planner)[source]¶
Convert planner to a json dict :type planner:
Planner
:param planner: the planner to convert
- tamr_toolbox.workflow.concurrent.Planner.execute(planner, tamr, *, concurrency_level=2, save_state=False, polling_interval=30)[source]¶
Executes the plan
- Parameters
planner (
Planner
) – The planner object whose plan will be executedtamr (
Client
) – the tamr client to useconcurrency_level (
int
) – the number of concurrent jobs to run at oncesave_state (
bool
) – whether or not to save the plan state to json after each updatepolling_interval (
int
) – the amount of time in seconds to wait between polling
- Return type
- Returns
the planner object after execution
PlanNode¶
- class tamr_toolbox.workflow.concurrent.PlanNode.PlanNode(name, operations, project, priority, current_op, status=PlanNodeStatus.PLANNED, train=False, current_step=None, steps_to_run=None)[source]¶
Dataclass for the node of a Planner object - contains method for setting list of operations based on project type
- tamr_toolbox.workflow.concurrent.PlanNode.poll(plan_node)[source]¶
Polls the status of the current_op object and returns a new PlanNode with updated status
- tamr_toolbox.workflow.concurrent.PlanNode.run_next_step(plan_node)[source]¶
Takes a plan node and runs the project
- tamr_toolbox.workflow.concurrent.PlanNode.monitor(nodes, *, timeout=12, polling_interval=30)[source]¶
- Monitors the status of a list of PlanNodes, when one on that list changes to
either failed/succeeded/cancelled it returns the list
- Parameters
- Return type
- Returns
the same list of planNodes with updated statuses
PlanNodeStatus¶
Enum representing job status
- class tamr_toolbox.workflow.concurrent.PlanNodeStatus.PlanNodeStatus(value)[source]¶
A class representing job status for executing a concurrent pipeline
PlanStatus¶
Enum representing plan status
- class tamr_toolbox.workflow.concurrent.PlanStatus.PlanStatus(value)[source]¶
A class representing the status of a tamr_toolbox.workflow.concurrent.Planner plan