"""Tasks related to testing code"""
import logging
import json
import os
import re
from inspect import getfile
from pathlib import Path
from time import sleep
from typing import Callable, List, Optional, Union, Dict
from unittest import mock
from functools import wraps
from requests import Response, ConnectionError
from tamr_unify_client.operation import Operation
from tamr_toolbox import utils
LOGGER = logging.getLogger(__name__)
WINDOWS_RESERVED_CHARACTER_MAP = {
    "<": "lt",
    ">": "gt",
    ":": "colon",
    '"': "dquote",
    "/": "fslash",
    "\\": "bslash",
    "|": "pipe",
    "?": "qmark",
    "*": "asterisk",
}
def _response_to_json(resp: Response, ip_dict: Dict[str, int]) -> str:
    """Converts a Response object into json string readable by the responses mocking library
    Args:
        resp: Response from a Tamr API call
        ip_dict: Mapping of previously encountered IP addresses to their anonymization number
    Returns:
        The response represented as a json string
    """
    if resp.encoding is None:
        resp.encoding = "utf-8"
    resp_log = {
        "method": resp.request.method,
        "url": _anonymize_url(resp.request.url, ip_dict),
        "status": resp.status_code,
        "content_type": resp.headers.get("Content-Type"),
        "body": resp.text,
    }
    return json.dumps(resp_log, ensure_ascii=False)
def _anonymize_url(url: str, ip_dict: Dict[str, int]) -> str:
    """Returns a anonymized url. Updates the dictionary inplace if a new ip is encountered
    Args:
        url: A URL
        ip_dict: Previously encountered IP addresses and an assigned numeric value
    Returns:
        URL with the IP address anonymized
    """
    regex_match = re.match(r"(?i)(^https?://)(.*?)([/:].*$)", url)
    ip = regex_match.group(2)
    try:
        num = ip_dict[ip]
    except KeyError:
        ip_dict[ip] = len(ip_dict.values()) + 1
        num = ip_dict[ip]
    return f"{regex_match.group(1)}ip-{num:05d}{regex_match.group(3)}"
def _collect_operation_calls(
    *, response: Response, poll_interval_seconds: int = 3
) -> List[Response]:
    """If the provided response is an Operation, wait for the operation to complete and
    return responses related to that operation.
    Args:
        response: A previous Response generated from the same Tamr client
        poll_interval_seconds: Time interval (in seconds) between subsequent polls
    Returns:
        Responses related to polling the operation
    """
    client = utils.client._from_response(response)
    op = Operation.from_response(client, response)
    LOGGER.info(f"Waiting for operation to complete: {op}")
    request_while_pending = client.get(endpoint=f"/api/versioned/v1/operations/{op.resource_id}")
    while op.state == "PENDING":
        op = op.poll()
        sleep(poll_interval_seconds)
    request_while_running = client.get(endpoint=f"/api/versioned/v1/operations/{op.resource_id}")
    op.wait()
    request_when_complete = client.get(endpoint=f"/api/versioned/v1/operations/{op.resource_id}")
    return [request_while_pending, request_while_running, request_when_complete]
def _log_response(*, log_path: Path, ip_dict: Dict[str, int], response: Response) -> None:
    """Appends a response to a file. If the response returned is
     a Tamr Operation, poll the operation until complete and log those responses as well
    Args:
        log_path: File to write the response to
        ip_dict: Mapping of previously encountered IP addresses to their anonymization number
        response: The response to log
    """
    LOGGER.info(f"logged request: {response.url}")
    with log_path.open(mode="a", encoding="utf-8") as f:
        all_responses = [response]
        # Poll and wait for operations, if applicable
        is_operation_request = bool(
            re.match(re.compile(".*/api/versioned/v1/operations/.*"), response.url)
        )
        is_get_request = response.request.method == "GET"
        if is_get_request and is_operation_request:
            wait_resp = _collect_operation_calls(response=response)
            all_responses.extend(wait_resp)
        all_json = [_response_to_json(r, ip_dict) for r in all_responses]
        f.writelines([f"{j}\n" for j in all_json])
def _build_response_log_path(
    *, test_func: Callable, response_logs_dir: Optional[Union[str, Path]], **kwargs,
) -> Path:
    """Returns a file path for API response logs for a given test and test parameters
    Args:
        test_func: The test function
        **kwargs: Arguments to the test function
    Returns:
        File path for the API response logs
    """
    # Convert test arguments and their values to a string, skipping ignored arguments
    test_params = "_".join([f"{k}={v}" for k, v in {**kwargs}.items()])
    # Remove reserved characters from ndjson name
    for char in WINDOWS_RESERVED_CHARACTER_MAP:
        test_params = test_params.replace(char, WINDOWS_RESERVED_CHARACTER_MAP[char])
    if len(test_params) > 0:
        test_params = "__" + test_params
    if response_logs_dir is None:
        # If no directory is provided, create a directory with the name of the test file
        # in a directory called "response_logs" located in the same directory as the test file
        dir_matcher = re.match(r"(.*)(?:\\|/)(.*).py", str(Path(getfile(test_func))))
        response_logs_dir = f"{dir_matcher.group(1)}/response_logs/{dir_matcher.group(2)}"
    return Path(f"{response_logs_dir}/{test_func.__name__}{test_params}.ndjson")
[docs]def mock_api(
    *, response_logs_dir: Optional[Union[str, Path]] = None, enforce_online_test=False
) -> Callable:
    """Decorator for `pytest` tests that mocks API requests by reading a file of
    pre-generated responses. Will generate responses file based on a real connection
    if pre-generated responses are not found.
    Args:
        response_logs_dir: Directory to read/write response logs
        enforce_online_test: Whether an online test should be run, even if a response log
            already exists
    Returns:
        Decorated function
    """
    def wrap(test_function: Callable):
        @wraps(test_function)
        def wrapped(**kwargs):
            response_log_path = _build_response_log_path(
                test_func=test_function, response_logs_dir=response_logs_dir, **kwargs,
            )
            if response_log_path.exists() and enforce_online_test:
                # Delete the file to enforce an online test
                response_log_path.unlink()
            if response_log_path.exists():
                try:
                    LOGGER.info(f"Running offline test based on file at {response_log_path}")
                    _run_offline_test(
                        response_log_path=response_log_path, test_function=test_function, **kwargs,
                    )
                except ConnectionError as e:
                    msg = (
                        f"A required API call was missing from response logs file for this "
                        f"offline test ({response_log_path}). The response log file must be "
                        f"regenerated. Delete the existing file to automatically regenerate a "
                        f"new one. Caused by: {e}"
                    )
                    LOGGER.error(msg)
                    raise ConnectionError(e)
            else:
                _run_online_test(
                    response_log_path=response_log_path, test_function=test_function, **kwargs
                )
        return wrapped
    return wrap 
# Handle ModuleNotFoundError to allow tamr_toolbox to be used when the optional dependency
# `responses` is not installed
try:
    import responses
    # Stores the original _real_send function of requests
    _BASE_FIND_MATCH = responses.RequestsMock._find_match
    # Stores the original _real_send function of responses
    _BASE_SEND_REAL = responses._real_send
    @responses.activate
    @mock.patch.object(Operation.wait, "__defaults__", (0, None))  # sets operation wait time to 0
    def _run_offline_test(response_log_path: Path, test_function: Callable, **kwargs) -> None:
        """Runs a test function against saved API responses located in a file
        Args:
            response_log_path: Location of saved API responses
            test_function: The function to test
            **kwargs: Keyword arguments for the test function
        """
        with response_log_path.open(encoding="utf-8") as f:
            for line in f:
                response = json.loads(line)
                responses.add(**response)
        ip_lookup = {}
        def _find_anonymized_match(self, request):
            """Allows responses library to match requests for an ip address to match to an
            anonymized ip address
            """
            request.url = _anonymize_url(request.url, ip_lookup)
            return _BASE_FIND_MATCH(self, request)
        with mock.patch("responses.RequestsMock._find_match", new=_find_anonymized_match):
            test_function(**kwargs)
    @responses.activate
    def _run_online_test(response_log_path: Path, test_function: Callable, **kwargs) -> None:
        """Runs a test function against a Tamr instance and saves the API responses to a file
        Args:
            response_log_path: Location to save API responses
            test_function: The function to test
            **kwargs: Keyword arguments for the test function
        """
        LOGGER.info(
            f"Online test running against Tamr instance. "
            f"Creating new file at {response_log_path}. This may take a while ..."
        )
        os.makedirs(response_log_path.parent, exist_ok=True)
        response_log_path.touch()
        # Each time an API call is made, allow it to pass through responses and make a real call
        # Each time a real call is made, log the response in the response file
        responses.add_passthru(re.compile(".*"))
        ip_lookup = {}
        def _send_real_with_log(*args, **kwargs) -> Response:
            """Logs the response from BASE_SEND_REAL
            Args:
                *args: The positional arguments for BASE_SEND_REAL
                **kwargs: The keyword arguments for BASE_SEND_REAL
            Returns:
                The response from the call
            """
            response = _BASE_SEND_REAL(*args, **kwargs)
            # Prevent recursion
            with mock.patch("responses._real_send", new=_BASE_SEND_REAL):
                _log_response(log_path=response_log_path, response=response, ip_dict=ip_lookup)
            return response
        with mock.patch("responses._real_send", new=_send_real_with_log):
            test_function(**kwargs)
        # Setting the passthru above permanently changes state for online testing
        # Reset passthru to default
        responses.mock.passthru_prefixes = ()
        responses._default_mock.passthru_prefixes = ()
except ModuleNotFoundError as err:
    # Ensure exception is due to responses package being missing
    if err.msg != "No module named 'responses'":
        raise err
    def _run_offline_test(*args, **kwargs):
        """Dummy function to raise the appropriate exception if the function is called without the
        necessary package installed
        """
        import responses  # noqa: F401
    def _run_online_test(*args, **kwargs):
        """Dummy function to raise the appropriate exception if the function is called without the
        necessary package installed
        """
        import responses  # noqa: F401