"""Tasks related to testing code"""
import logging
import json
import os
import re
from inspect import getfile
from pathlib import Path
from time import sleep
from typing import Callable, List, Optional, Union, Dict
from unittest import mock
from functools import wraps
from requests import Response, ConnectionError
from tamr_unify_client.operation import Operation
from tamr_toolbox import utils
LOGGER = logging.getLogger(__name__)
"<": "lt",
">": "gt",
":": "colon",
'"': "dquote",
"/": "fslash",
"\\": "bslash",
"|": "pipe",
"?": "qmark",
"*": "asterisk",
def _response_to_json(resp: Response, ip_dict: Dict[str, int]) -> str:
"""Converts a Response object into json string readable by the responses mocking library
resp: Response from a Tamr API call
ip_dict: Mapping of previously encountered IP addresses to their anonymization number
The response represented as a json string
if resp.encoding is None:
resp.encoding = "utf-8"
resp_log = {
"method": resp.request.method,
"url": _anonymize_url(resp.request.url, ip_dict),
"status": resp.status_code,
"content_type": resp.headers.get("Content-Type"),
"body": resp.text,
return json.dumps(resp_log, ensure_ascii=False)
def _anonymize_url(url: str, ip_dict: Dict[str, int]) -> str:
"""Returns a anonymized url. Updates the dictionary inplace if a new ip is encountered
url: A URL
ip_dict: Previously encountered IP addresses and an assigned numeric value
URL with the IP address anonymized
regex_match = re.match(r"(?i)(^https?://)(.*?)([/:].*$)", url)
ip = regex_match.group(2)
num = ip_dict[ip]
except KeyError:
ip_dict[ip] = len(ip_dict.values()) + 1
num = ip_dict[ip]
return f"{regex_match.group(1)}ip-{num:05d}{regex_match.group(3)}"
def _collect_operation_calls(
*, response: Response, poll_interval_seconds: int = 3
) -> List[Response]:
"""If the provided response is an Operation, wait for the operation to complete and
return responses related to that operation for cases where we are running synchronously.
response: A previous Response generated from the same Tamr client
poll_interval_seconds: Time interval (in seconds) between subsequent polls
Responses related to polling the operation
client = utils.client._from_response(response)
op = Operation.from_response(client, response)
LOGGER.info(f"Waiting for operation to complete: {op}")
request_while_pending = client.get(endpoint=f"/api/versioned/v1/operations/{op.resource_id}")
while op.state == "PENDING":
op = op.poll()
request_while_running = client.get(endpoint=f"/api/versioned/v1/operations/{op.resource_id}")
request_when_complete = client.get(endpoint=f"/api/versioned/v1/operations/{op.resource_id}")
return [request_while_pending, request_while_running, request_when_complete]
def _log_response(
*, log_path: Path, ip_dict: Dict[str, int], response: Response, asynchronous: bool = False
) -> None:
"""Appends a response to a file. If the response returned is
a Tamr Operation, poll the operation until complete and log those responses as well
unless asynchronous is set to True.
log_path: File to write the response to
ip_dict: Mapping of previously encountered IP addresses to their anonymization number
response: The response to log
asynchoronous: Whether or not to `wait` for Operations called during the running of tests
LOGGER.info(f"logged request: {response.url}")
with log_path.open(mode="a", encoding="utf-8") as f:
all_responses = [response]
# Poll and wait for operations, if applicable
is_operation_request = bool(
re.match(re.compile(".*/api/versioned/v1/operations/.*"), response.url)
is_get_request = response.request.method == "GET"
is_not_error = response.ok
if is_get_request and is_operation_request and is_not_error and not asynchronous:
wait_resp = _collect_operation_calls(response=response)
all_json = [_response_to_json(r, ip_dict) for r in all_responses]
f.writelines([f"{j}\n" for j in all_json])
def _build_response_log_path(
*, test_func: Callable, response_logs_dir: Optional[Union[str, Path]], **kwargs
) -> Path:
"""Returns a file path for API response logs for a given test and test parameters
test_func: The test function
**kwargs: Arguments to the test function
File path for the API response logs
# Convert test arguments and their values to a string, skipping ignored arguments
test_params = "_".join([f"{k}={v}" for k, v in {**kwargs}.items()])
# Remove reserved characters from ndjson name
test_params = test_params.replace(char, WINDOWS_RESERVED_CHARACTER_MAP[char])
if len(test_params) > 0:
test_params = "__" + test_params
if response_logs_dir is None:
# If no directory is provided, create a directory with the name of the test file
# in a directory called "response_logs" located in the same directory as the test file
dir_matcher = re.match(r"(.*)(?:\\|/)(.*).py", str(Path(getfile(test_func))))
response_logs_dir = f"{dir_matcher.group(1)}/response_logs/{dir_matcher.group(2)}"
return Path(f"{response_logs_dir}/{test_func.__name__}{test_params}.ndjson")
[docs]def mock_api(
response_logs_dir: Optional[Union[str, Path]] = None,
enforce_online_test: bool = False,
asynchronous: bool = False,
) -> Callable:
"""Decorator for `pytest` tests that mocks API requests by reading a file of
pre-generated responses. Will generate responses file based on a real connection
if pre-generated responses are not found.
response_logs_dir: Directory to read/write response logs
enforce_online_test: Whether an online test should be run, even if a response log
already exists
asynchronous: Whether or not to `wait` for Operations called during the running of tests
Decorated function
def wrap(test_function: Callable):
def wrapped(**kwargs):
response_log_path = _build_response_log_path(
test_func=test_function, response_logs_dir=response_logs_dir, **kwargs
if response_log_path.exists() and enforce_online_test:
# Delete the file to enforce an online test
if response_log_path.exists():
LOGGER.info(f"Running offline test based on file at {response_log_path}")
response_log_path=response_log_path, test_function=test_function, **kwargs
except ConnectionError as e:
msg = (
f"A required API call was missing from response logs file for this "
f"offline test ({response_log_path}). The response log file must be "
f"regenerated. Delete the existing file to automatically regenerate a "
f"new one. Caused by: {e}"
raise ConnectionError(e)
return wrapped
return wrap
# Handle ModuleNotFoundError to allow tamr_toolbox to be used when the optional dependency
# `responses` is not installed
import responses
# Stores the original _real_send function of requests
_BASE_FIND_MATCH = responses.RequestsMock._find_match
# Stores the original _real_send function of responses
_BASE_SEND_REAL = responses._real_send
@mock.patch.object(Operation.wait, "__defaults__", (0, None)) # sets operation wait time to 0
def _run_offline_test(response_log_path: Path, test_function: Callable, **kwargs) -> None:
"""Runs a test function against saved API responses located in a file
response_log_path: Location of saved API responses
test_function: The function to test
**kwargs: Keyword arguments for the test function
with response_log_path.open(encoding="utf-8") as f:
for line in f:
response = json.loads(line)
ip_lookup = {}
def _find_anonymized_match(self, request):
"""Allows responses library to match requests for an ip address to match to an
anonymized ip address
request.url = _anonymize_url(request.url, ip_lookup)
return _BASE_FIND_MATCH(self, request)
with mock.patch("responses.RequestsMock._find_match", new=_find_anonymized_match):
def _run_online_test(
response_log_path: Path, test_function: Callable, asynchronous: bool = False, **kwargs
) -> None:
"""Runs a test function against a Tamr instance and saves the API responses to a file
response_log_path: Location to save API responses
test_function: The function to test
asynchoronous: Whether or not to run asynchronously
(i.e. don't `wait` for operations to finish)
**kwargs: Keyword arguments for the test function
f"Online test running against Tamr instance. "
f"Creating new file at {response_log_path}. This may take a while ..."
os.makedirs(response_log_path.parent, exist_ok=True)
# Each time an API call is made, allow it to pass through responses and make a real call
# Each time a real call is made, log the response in the response file
ip_lookup = {}
def _send_real_with_log(*args, **kwargs) -> Response:
"""Logs the response from BASE_SEND_REAL
*args: The positional arguments for BASE_SEND_REAL
**kwargs: The keyword arguments for BASE_SEND_REAL
The response from the call
response = _BASE_SEND_REAL(*args, **kwargs)
# Prevent recursion
with mock.patch("responses._real_send", new=_BASE_SEND_REAL):
return response
with mock.patch("responses._real_send", new=_send_real_with_log):
# Setting the passthru above permanently changes state for online testing
# Reset passthru to default
responses.mock.passthru_prefixes = ()
responses._default_mock.passthru_prefixes = ()
except ModuleNotFoundError as err:
# Ensure exception is due to responses package being missing
if err.msg != "No module named 'responses'":
raise err
def _run_offline_test(*args, **kwargs):
"""Dummy function to raise the appropriate exception if the function is called without the
necessary package installed
import responses # noqa: F401
def _run_online_test(*args, **kwargs):
"""Dummy function to raise the appropriate exception if the function is called without the
necessary package installed
import responses # noqa: F401