8000 Python Wrapper Release 2.7.0 by jbabac · Pull Request #187 · DomainTools/python_api · GitHub
[go: up one dir, main page]

Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-build-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- uses: actions/checkout@v2
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.6.1
2.7.0
2 changes: 1 addition & 1 deletion domaintools/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@

"""

current = "2.6.1"
current = "2.7.0"
80 changes: 49 additions & 31 deletions domaintools/api.py
< 5C94 td id="diff-fa559069a3ea3213b49086e776971306cac9c6a115f063686b9d335c8e9552fdR121" data-line-number="121" class="blob-num blob-num-addition js-linkable-line-number js-blob-rnum">
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

import re
import ssl
import yaml

from domaintools.constants import (
Endpoint,
OutputFormat,
ENDPOINT_TO_SOURCE_MAP,
RTTF_PRODUCTS_LIST,
RTTF_PRODUCTS_CMD_MAPPING,
SPECS_MAPPING,
)
from domaintools._version import current as version
from domaintools.results import (
Expand All @@ -22,6 +24,7 @@
Results,
FeedsResults,
)
from domaintools.decorators import api_endpoint, auto_patch_docstrings
from domaintools.filters import (
filter_by_riskscore,
filter_by_expire_date,
Expand All @@ -40,6 +43,7 @@ def delimited(items, character="|"):
return character.join(items) if type(items) in (list, tuple, set) else items


@auto_patch_docstrings
class API(object):
"""Enables interacting with the DomainTools API via Python:

Expand Down Expand Up @@ -94,8 +98,10 @@ def __init__(
self.key_sign_hash = key_sign_hash
self.default_parameters["app_name"] = app_name
self.default_parameters["app_version"] = app_version
self.specs = {}

self._build_api_url(api_url, api_port)
self._initialize_specs()

if not https:
raise Exception(
Expand All @@ -104,8 +110,25 @@ def __init__(
if proxy_url and not isinstance(proxy_url, str):
raise Exception("Proxy URL must be a string. For example: '127.0.0.1:8888'")

def _initialize_specs(self):
for spec_name, file_path in SPECS_MAPPING.items():
try:
with open(file_path, "r", encoding="utf-8") as f:
spec_content = yaml.safe_load(f)
if not spec_content:
raise ValueError("Spec file is empty or invalid.")

self.specs[spec_name] = spec_content

except Exception as e:
print(f"Error loading {file_path}: {e}")

def _get_ssl_default_context(self, verify_ssl: Union[str, bool]):
return ssl.create_default_context(cafile=verify_ssl) if isinstance(verify_ssl, str) else verify_ssl
return (
ssl.create_default_context(cafile=verify_ssl)
if isinstance(verify_ssl, str)
else verify_ssl
)

def _build_api_url(self, api_url=None, api_port=None):
"""Build the API url based on the given url and port. Defaults to `https://api.domaintools.com`"""
Expand Down Expand Up @@ -133,11 +156,18 @@ def _rate_limit(self, product):
hours = limit_hours and 3600 / float(limit_hours)
minutes = limit_minutes and 60 / float(limit_minutes)

self.limits[product["id"]] = {"interval": timedelta(seconds=minutes or hours or default)}
self.limits[product["id"]] = {
"interval": timedelta(seconds=minutes or hours or default)
}

def _results(self, product, path, cls=Results, **kwargs):
"""Returns _results for the specified API path with the specified **kwargs parameters"""
if product != "account-information" and self.rate_limit and not self.limits_set and not self.limits:
if (
product != "account-information"
and self.rate_limit
and not self.limits_set
and not self.limits
):
always_sign_api_key_previous_value = self.always_sign_api_key
header_authentication_previous_value = self.header_authentication
self._rate_limit(product)
Expand Down Expand Up @@ -181,7 +211,9 @@ def handle_api_key(self, is_rttf_product, path, parameters):
else:
raise ValueError(
"Invalid value '{0}' for 'key_sign_hash'. "
"Values available are {1}".format(self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES))
"Values available are {1}".format(
self.key_sign_hash, ",".join(AVAILABLE_KEY_SIGN_HASHES)
)
)

parameters["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
Expand All @@ -193,7 +225,9 @@ def handle_api_key(self, is_rttf_product, path, parameters):

def account_information(self, **kwargs):
"""Provides a snapshot of your accounts current API usage"""
return self._results("account-information", "/v1/account", items_path=("products",), **kwargs)
return self._results(
"account-information", "/v1/account", items_path=("products",), **kwargs
)

def available_api_calls(self):
"""Provides a list of api calls that you can use based on your account information."""
Expand Down Expand Up @@ -396,7 +430,9 @@ def reputation(self, query, include_reasons=False, **kwargs):

def reverse_ip(self, domain=None, limit=None, **kwargs):
"""Pass in a domain name."""
return self._results("reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs)
return self._results(
"reverse-ip", "/v1/{0}/reverse-ip".format(domain), limit=limit, **kwargs
)

def host_domains(self, ip=None, limit=None, **kwargs):
"""Pass in an IP address."""
Expand Down Expand Up @@ -570,8 +606,12 @@ def iris_enrich(self, *domains, **kwargs):
younger_than_date = kwargs.pop("younger_than_date", {}) or None
older_than_date = kwargs.pop("older_than_date", {}) or None
updated_after = kwargs.pop("updated_after", {}) or None
include_domains_with_missing_field = kwargs.pop("include_domains_with_missing_field", {}) or None
exclude_domains_with_missing_field = kwargs.pop("exclude_domains_with_missing_field", {}) or None
include_domains_with_missing_field = (
kwargs.pop("include_domains_with_missing_field", {}) or None
)
exclude_domains_with_missing_field = (
kwargs.pop("exclude_domains_with_missing_field", {}) or None
)

filtered_results = DTResultFilter(result_set=results).by(
[
Expand Down Expand Up @@ -624,6 +664,7 @@ def iris_enrich_cli(self, domains=None, **kwargs):
**kwargs,
)

@api_endpoint(spec_name="iris", path="/v1/iris-investigate/", methods="post")
def iris_investigate(
self,
domains=None,
Expand All @@ -641,29 +682,6 @@ def iris_investigate(
**kwargs,
):
"""Returns back a list of domains based on the provided filters.
The following filters are available beyond what is parameterized as kwargs:

- ip: Search for domains having this IP.
- email: Search for domains with this email in their data.
- email_domain: Search for domains where the email address uses this domain.
- nameserver_host: Search for domains with this nameserver.
- nameserver_domain: Search for domains with a nameserver that has this domain.
- nameserver_ip: Search for domains with a nameserver on this IP.
- registrar: Search for domains with this registrar.
- registrant: Search for domains with this registrant name.
- registrant_org: Search for domains with this registrant organization.
- mailserver_host: Search for domains with this mailserver.
- mailserver_domain: Search for domains with a mailserver that has this domain.
- mailserver_ip: Search for domains with a mailserver on this IP.
- redirect_domain: Search for domains which redirect to this domain.
- ssl_hash: Search for domains which have an SSL certificate with this hash.
- ssl_subject: Search for domains which have an SSL certificate with this subject string.
- ssl_email: Search for domains which have an SSL certificate with this email in it.
- ssl_org: Search for domains which have an SSL certificate with this organization in it.
- google_analytics: Search for domains which have this Google Analytics code.
- adsense: Search for domains which have this AdSense code.
- tld: Filter by TLD. Must be combined with another parameter.
- search_hash: Use search hash from Iris to bring back domains.

You can loop over results of your investigation as if it was a native Python list:

Expand Down
7 changes: 4 additions & 3 deletions domaintools/base_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ def _get_session_params_and_headers(self):
headers["accept"] = HEADER_ACCEPT_KEY_CSV_FORMAT

if self.api.header_authentication:
header_key_for_api_key = "X-Api-Key" if is_rttf_product else "X-API-Key"
headers[header_key_for_api_key] = self.api.key
headers["X-Api-Key"] = self.api.key

session_param_and_headers = {"parameters": parameters, "headers": headers}
return session_param_and_headers
Expand Down Expand Up @@ -342,7 +341,9 @@ def html(self):
)

def as_list(self):
return "\n".join([json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()])
return "\n".join(
[json.dumps(item, indent=4, separators=(",", ": ")) for item in self._items()]
)

def __str__(self):
return str(
Expand Down
5 changes: 5 additions & 0 deletions domaintools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,8 @@ class OutputFormat(Enum):
"real-time-domain-discovery-feed-(api)": "domaindiscovery",
"real-time-domain-discovery-feed-(s3)": "domaindiscovery",
}

SPECS_MAPPING = {
"iris": "domaintools/specs/iris-openapi.yaml",
# "rttf": "domaintools/specs/feeds-openapi.yaml",
}
105 changes: 105 additions & 0 deletions domaintools/decorators.py
6B28
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import functools
import inspect

from typing import List, Union

from domaintools.docstring_patcher import DocstringPatcher
from domaintools.request_validator import RequestValidator


def api_endpoint(spec_name: str, path: str, methods: Union[str, List[str]]):
"""
Decorator to tag a method as an API endpoint AND validate inputs.

Args:
spec_name: The key for the spec in api_instance.specs
path: The API path (e.g., "/users")
methods: A single method ("get") or list of methods (["get", "post"])
"""

def decorator(func):
func._api_spec_name = spec_name
func._api_path = path

# Normalize methods to a list
normalized_methods = [methods] if isinstance(methods, str) else methods
func._api_methods = normalized_methods

# Get the signature of the original function ONCE
sig = inspect.signature(func)

@functools.wraps(func)
def wrapper(self, *args, **kwargs):

try:
bound_args = sig.bind(*args, **kwargs)
except TypeError:
# If arguments don't match signature, let the actual func raise the error
return func(*args, **kwargs)

arguments = bound_args.arguments

# Robustly find 'self' (it's usually the first argument in bound_args)
# We look for the first value in arguments, or try to get 'self' explicitly.
instance = arguments.pop("self", None)
if not instance and args:
instance = args[0]

# Retrieve the Spec from the instance
# We assume 'self' has a .specs attribute (like DocstringPatcher expects)
spec = getattr(self, "specs", {}).get(spec_name)

if "domains" in arguments.keys():
domains = arguments.pop("domains")
arguments["domain"] = (
",".join(domains) if isinstance(domains, (list, tuple)) else domains
)

if spec:
# Determine which HTTP method is currently being executed.
# If the function allows dynamic methods (e.g. method="POST"), use that.
# Otherwise, default to the first method defined in the decorator.
current_method = kwargs.get("method", normalized_methods[0])

# Run Validation
# This will raise a ValueError and stop execution if validation fails.
try:
RequestValidator.validate(
spec=spec,
path=path,
method=current_method,
parameters=arguments,
)
except ValueError as e:
print(f"[Validation Error] {e}")
raise e

# Proceed with the original function call
return func(*args, **kwargs)

# Copy tags to wrapper for the DocstringPatcher to find
wrapper._api_spec_name = func._api_spec_name
wrapper._api_path = func._api_path
wrapper._api_methods = func._api_methods

return wrapper

return decorator


def auto_patch_docstrings(cls):
original_init = cls.__init__

@functools.wraps(original_init)
def new_init(self, *args, **kwargs):
original_init(self, *args, **kwargs)
try:
# We instantiate our patcher and run it
patcher = DocstringPatcher()
patcher.patch(self)
except Exception as e:
print(f"Auto-patching failed: {e}")

cls.__init__ = new_init

return cls
Loading
0