Source code for tamr_toolbox.realtime.matching

import json
import logging
import time
from collections import defaultdict
from typing import Dict, List, Optional, Union

import requests
from tamr_unify_client import Client
from tamr_unify_client.mastering.project import MasteringProject
from tamr_unify_client.operation import Operation

from tamr_toolbox.models.data_type import JsonDict
from tamr_toolbox.utils.operation import from_resource_id
from tamr_toolbox.utils.version import requires_tamr_version

LOGGER = logging.getLogger(__name__)


[docs]def update_realtime_match_data( *, project: MasteringProject, do_update_clusters: bool = True, do_use_manual_clustering: bool = False, **options, ) -> Operation: """ Updates data for RealTime match queries if needed, based on latest published clusters. Args: project: project to be updated do_update_clusters: whether to update clusters, default True do_use_manual_clustering: whether to use externally managed clustering, default False options: Options passed to underlying :class:`~tamr_unify_client.operation.Operation` Returns: an operation object describing the update operation Raises: RuntimeError: if update API call fails """ # Make sure we have the original name of the project to use with the match endpoint project_name = _get_internal_project_name(project) url = ( f"projects/{project_name}:updateLLM?updateClusters={do_update_clusters}" f"&useManualClustering={do_use_manual_clustering}" ) try: response = project.client.post(url).successful() except requests.exceptions.HTTPError as e: message = f"Match data update for {project_name} failed at submission time: {e}" LOGGER.error(message) raise RuntimeError(message) operation_id = response.content.decode("latin1") operation = from_resource_id(project.client, job_id=operation_id) return operation.apply_options(**options)
[docs]def poll_realtime_match_status( *, project: MasteringProject, match_client: Client, num_tries: int = 10, wait_sec: int = 1 ) -> bool: """ Check if match service is queryable. Try up to `num_tries` times at 1 sec (or user-specified) interval. Args: project: the mastering project whose status to check match_client: a Tamr client set to use the port of the Match API num_tries: max number of times to poll endpoint, default 10 wait_sec: number of seconds to wait between tries, default 1 Returns: bool indicating whether project is queryable """ project_name = _get_internal_project_name(project) url = f"/api/v1/projects/{project_name}:isQueryable" counter = 0 # Poll endpoint to until project is queryable or num_tries reached while counter < num_tries: response = match_client.get(url) queryable = response.content == b"true" if queryable: break counter += 1 time.sleep(wait_sec) # call api at wait_sec interval if project isn't yet queryable return queryable
[docs]def match_query( *, project: MasteringProject, match_client: Client, records: List[JsonDict], type: str, primary_key: Optional[str] = None, batch_size: Optional[int] = None, min_match_prob: Optional[float] = None, max_num_matches: Optional[int] = None, ) -> Dict[Union[int, str], List[JsonDict]]: """ Find the best matching clusters or records for each supplied record. Returns a dictionary where each key correpsonds to an input record and the value is a list of the RealTime match results for that record. An empty result list indicates a null response from matching (or no responses above the `min_match_prob`, if that parameter was supplied). Args: project: the mastering project to query for matches match_client: a Tamr client set to use the port of the Match API records: list of records to match type: one of "records" or "clusters" -- whether to pull record or cluster matches primary_key: a primary key for the data; if supplied, this must be a field in input records batch_size: split input into this batch size for match query calls (e.g. to prevent network timeouts), default None sends a single query with all records min_match_prob: if set, only matches with probability above minimum will be returned, default None max_num_matches: if set, at most max_num_matches will be returned for each input record in records, default None Returns: Dict keyed by integers (indices of inputs), or by `primary_key` if `primary_key` is supplied, with value a list containing matched data Raises: ValueError: if match `type` is not "records" or "clusters", or if `batch_size` is invalid RuntimeError: if query fails """ result_dict = defaultdict(lambda: []) # dict which defaults to empty list to hold results project_name = _get_internal_project_name(project) url = f"/api/v1/projects/{project_name}:match?type={type}" # Set up keys to read results if type == "records": record_key = "queryRecordId" prob_key = "matchProbability" elif type == "clusters": record_key = "entityId" prob_key = "avgMatchProb" else: raise ValueError(f"Unsupported match type {type}.") # Check batch size and set if not supplied if batch_size is None: batch_size = len(records) if batch_size == 0: LOGGER.warn("No input supplied to match_query -- returning empty result.") return result_dict elif batch_size <= 0: raise ValueError(f"Batch size must be non-negative: received {batch_size}") # Split into batches and convert to match query format for j in range(0, len(records), batch_size): json_recs = _prepare_json(records[j : j + batch_size], primary_key=primary_key, offset=j) try: response = match_client.post(url, json=json_recs).successful() except requests.exceptions.HTTPError as e: message = f"RealTime match query failed: {e}" LOGGER.error(message) raise RuntimeError(message) # Process responses if response.content == b"": # handle null response continue # If data was found, decode, identify source record, and add match to corresponding # list of results in the result dictionary for resp_block in response.content.decode("utf-8").split("\n"): if resp_block: result = json.loads(resp_block) index = int(result[record_key]) if primary_key is None else result[record_key] if max_num_matches and len(result_dict[index]) >= max_num_matches: continue if min_match_prob and result[prob_key] < min_match_prob: continue result_dict[index].append(result) return result_dict
[docs]@requires_tamr_version(min_version="2022.009.0") def transform_and_match_query( *, project: MasteringProject, match_client: Client, records: List[JsonDict], type: str, primary_key: Optional[str] = None, batch_size: Optional[int] = None, min_match_prob: Optional[float] = None, max_num_matches: Optional[int] = None, default_source_name: Optional[str] = None, ) -> Dict[Union[int, str], List[JsonDict]]: """ Find the best matching clusters or records for each supplied record. Returns a dictionary where each key correpsonds to an input record and the value is a list of the RealTime match results for that record. An empty result list indicates a null response from matching (or no responses above the `min_match_prob`, if that parameter was supplied). Will run schema mapping and transformations prior to realtime match. If LLT, is not enabled will just run default LLM with no transformation or schema mapping Args: project: the mastering project to query for matches match_client: a Tamr client set to use the port of the Match API records: list of records to match type: one of "records" or "clusters" -- whether to pull record or cluster matches primary_key: a primary key for the data; if supplied, this must be a field in input records batch_size: split input into this batch size for match query calls (e.g. to prevent network timeouts), default None sends a single query with all records min_match_prob: if set, only matches with probability above minimum will be returned, default None max_num_matches: if set, at most max_num_matches will be returned for each input record in records, default None default_source_name: the default source name used for schema mapping in LLT, default None Returns: Dict keyed by integers (indices of inputs), or by `primary_key` if `primary_key` is supplied, with value a list containing matched data Raises: ValueError: if match `type` is not "records" or "clusters", or if `batch_size` is invalid RuntimeError: if query fails """ result_dict = defaultdict(lambda: []) # dict which defaults to empty list to hold results project_name = _get_internal_project_name(project) # Set up keys to read results if type == "records": url = f"/api/v1/projects/{project_name}:matchRecords?transform=true" record_key = "queryRecordId" prob_key = "matchProbability" elif type == "clusters": url = f"/api/v1/projects/{project_name}:matchClusters?transform=true" record_key = "entityId" prob_key = "avgMatchProb" else: raise ValueError(f"Unsupported match type {type}.") if default_source_name: url = url + f"&defaultSourceName={default_source_name}" # Check batch size and set if not supplied if batch_size is None: batch_size = len(records) if batch_size == 0: LOGGER.warn("No input supplied to match_query -- returning empty result.") return result_dict elif batch_size <= 0: raise ValueError(f"Batch size must be non-negative: received {batch_size}") # Split into batches and convert to match query format for j in range(0, len(records), batch_size): json_recs = _prepare_json(records[j : j + batch_size], primary_key=primary_key, offset=j) try: print(url) response = match_client.post(url, json=json_recs).successful() print(response.content) except requests.exceptions.HTTPError as e: message = f"RealTime match query failed: {e}" LOGGER.error(message) raise RuntimeError(message) # Process responses if response.content == b"": # handle null response continue # If data was found, decode, identify source record, and add match to corresponding # list of results in the result dictionary for resp_block in response.content.decode("utf-8").split("\n"): if resp_block: result = json.loads(resp_block) index = int(result[record_key]) if primary_key is None else result[record_key] if max_num_matches and len(result_dict[index]) >= max_num_matches: continue if min_match_prob and result[prob_key] < min_match_prob: continue result_dict[index].append(result) return result_dict
def _prepare_json( records: List[JsonDict], *, primary_key: Union[str, None], offset: int ) -> List[JsonDict]: """ Put records into JSON format expected by RealTime match endpoint Args: records: list of records to match primary_key: a primary key for the data; if not None, must be a field in the input records offset: offset to apply to generated integer `recordId` -- this is necessary for batching Returns: List of formatted records Raises: ValueError: if primary_key is supplied but some supplied record(s) do not have the primary_key field """ if primary_key is not None: try: json_records = [{"recordId": rec.pop(primary_key), "record": rec} for rec in records] except KeyError: raise ValueError(f"Not all input records had a primary key field {primary_key}.") else: # use integers as recordId json_records = [ {"recordId": str(offset + k), "record": rec} for k, rec in enumerate(records) ] return json_records def _get_internal_project_name(project: MasteringProject) -> str: """Get project's internal name (as opposed to the `displayName`, which can be changed by users) Args: project: the mastering project of which to retrieve the internal name Returns: the project's internal name """ # Get usage data for unified dataset of input project usage_resp = project.unified_dataset().usage() # Get project internal name from first output_from_project_step of unified dataset useage name = usage_resp.usage.output_from_project_steps[0].project_name return name