diff --git a/.pylintrc b/.pylintrc index cfbff33..f789fc6 100644 --- a/.pylintrc +++ b/.pylintrc @@ -11,7 +11,6 @@ disable= too-many-instance-attributes, unnecessary-pass, too-many-arguments, - too-many-positional-arguments, too-few-public-methods, [TYPECHECK] diff --git a/CHANGELOG.md b/CHANGELOG.md index 711a96f..11a746d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ nylas-python Changelog ====================== Unreleased ---------- +* Added Manage Domains (`Client.domains`, `/v3/admin/domains`): list, create, find, update, delete, `get_info`, and `verify` with models in `nylas.models.domains`; optional `ServiceAccountSigner` (`nylas.handler.service_account`) for service-account headers (`X-Nylas-Kid`, `X-Nylas-Nonce`, `X-Nylas-Timestamp`, `X-Nylas-Signature`) on each `Domains` method; new `cryptography` dependency, RSA signing, and `HttpClient` `serialized_json_body` so signed payloads match the wire body * Added Transactional Send: `Client.transactional_send.send()` for `POST /v3/domains/{domain_name}/messages/send`, with `TransactionalSendMessageRequest` and `TransactionalTemplate` models (JSON and multipart send behavior aligned with grant `messages.send`) v6.14.3 diff --git a/nylas/client.py b/nylas/client.py index 50cee02..1af8d53 100644 --- a/nylas/client.py +++ b/nylas/client.py @@ -13,6 +13,7 @@ from nylas.resources.webhooks import Webhooks from nylas.resources.contacts import Contacts from nylas.resources.drafts import Drafts +from nylas.resources.domains import Domains from nylas.resources.grants import Grants from nylas.resources.scheduler import Scheduler from nylas.resources.notetakers import Notetakers @@ -113,6 +114,16 @@ def drafts(self) -> Drafts: """ return Drafts(self.http_client) + @property + def domains(self) -> Domains: + """ + Access the Manage Domains API. + + Returns: + The Manage Domains API. + """ + return Domains(self.http_client) + @property def events(self) -> Events: """ diff --git a/nylas/handler/api_resources.py b/nylas/handler/api_resources.py index 25af6a6..6badcf4 100644 --- a/nylas/handler/api_resources.py +++ b/nylas/handler/api_resources.py @@ -49,10 +49,14 @@ def create( query_params=None, request_body=None, overrides=None, + serialized_json_body=None, ) -> Response: + kwargs = {"overrides": overrides} + if serialized_json_body is not None: + kwargs["serialized_json_body"] = serialized_json_body response_json, response_headers = self._http_client._execute( - "POST", path, headers, query_params, request_body, overrides=overrides + "POST", path, headers, query_params, request_body, **kwargs ) return Response.from_dict(response_json, response_type, response_headers) @@ -68,9 +72,13 @@ def update( request_body=None, method="PUT", overrides=None, + serialized_json_body=None, ): + kwargs = {"overrides": overrides} + if serialized_json_body is not None: + kwargs["serialized_json_body"] = serialized_json_body response_json, response_headers = self._http_client._execute( - method, path, headers, query_params, request_body, overrides=overrides + method, path, headers, query_params, request_body, **kwargs ) return Response.from_dict(response_json, response_type, response_headers) @@ -86,9 +94,13 @@ def patch( request_body=None, method="PATCH", overrides=None, + serialized_json_body=None, ): + kwargs = {"overrides": overrides} + if serialized_json_body is not None: + kwargs["serialized_json_body"] = serialized_json_body response_json, response_headers = self._http_client._execute( - method, path, headers, query_params, request_body, overrides=overrides + method, path, headers, query_params, request_body, **kwargs ) return Response.from_dict(response_json, response_type, response_headers) diff --git a/nylas/handler/http_client.py b/nylas/handler/http_client.py index 9027736..023e2d1 100644 --- a/nylas/handler/http_client.py +++ b/nylas/handler/http_client.py @@ -81,9 +81,17 @@ def _execute( request_body=None, data=None, overrides=None, + serialized_json_body=None, ) -> dict: request = self._build_request( - method, path, headers, query_params, request_body, data, overrides + method, + path, + headers, + query_params, + request_body, + data, + overrides, + serialized_json_body=serialized_json_body, ) timeout = self.timeout @@ -93,8 +101,12 @@ def _execute( # Serialize request_body to JSON with ensure_ascii=False to preserve UTF-8 characters # and allow_nan=True to support NaN/Infinity values (matching default json.dumps behavior). # Encode as UTF-8 bytes to avoid Latin-1 encoding errors with special characters. + # When serialized_json_body is set (e.g. Nylas service account signing), send those exact + # bytes so the wire body matches the payload that was signed. json_data = None - if request_body is not None and data is None: + if serialized_json_body is not None and data is None: + json_data = serialized_json_body + elif request_body is not None and data is None: json_data = json.dumps(request_body, ensure_ascii=False, allow_nan=True).encode("utf-8") try: response = requests.request( @@ -151,6 +163,7 @@ def _build_request( request_body=None, data=None, overrides=None, + serialized_json_body=None, ) -> dict: api_server = self.api_server if overrides and overrides.get("api_uri"): @@ -158,7 +171,10 @@ def _build_request( base_url = f"{api_server}{path}" url = _build_query_params(base_url, query_params) if query_params else base_url - headers = self._build_headers(headers, request_body, data, overrides) + body_for_content_type = ( + request_body if request_body is not None else serialized_json_body + ) + headers = self._build_headers(headers, body_for_content_type, data, overrides) return { "method": method, diff --git a/nylas/handler/service_account.py b/nylas/handler/service_account.py new file mode 100644 index 0000000..ce86b8d --- /dev/null +++ b/nylas/handler/service_account.py @@ -0,0 +1,136 @@ +""" +Nylas Service Account request signing for organization admin APIs. + +See https://developer.nylas.com/docs/v3/auth/nylas-service-account/ + +If you set X-Nylas-* headers manually via RequestOverrides, the HTTP request body must be +byte-identical to the canonical JSON string used when computing the signature. +""" + +from __future__ import annotations + +import base64 +import json +import secrets +import string +import time +from typing import Any, Dict, Optional, Tuple + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding, rsa + +_NONCE_ALPHABET = string.ascii_letters + string.digits +_NONCE_LENGTH = 20 + + +def canonical_json(data: Dict[str, Any]) -> str: + """ + Deterministic JSON with sorted keys at each object level, matching Nylas's reference + implementation for service account signing. + """ + keys = sorted(data.keys()) + parts = [] + for k in keys: + key_json = json.dumps(k, ensure_ascii=False, allow_nan=False) + v = data[k] + if isinstance(v, dict): + val_json = canonical_json(v) + else: + val_json = json.dumps( + v, ensure_ascii=False, allow_nan=False, separators=(",", ":") + ) + parts.append(f"{key_json}:{val_json}") + return "{" + ",".join(parts) + "}" + + +def load_rsa_private_key_from_pem(pem: str) -> rsa.RSAPrivateKey: + """Load an RSA private key from a PEM string (PKCS#1 or PKCS#8).""" + key_bytes = pem.encode("utf-8") if isinstance(pem, str) else pem + loaded = serialization.load_pem_private_key(key_bytes, password=None) + if not isinstance(loaded, rsa.RSAPrivateKey): + raise ValueError("Private key must be RSA") + return loaded + + +def _signing_envelope_bytes( + path: str, + method: str, + timestamp: int, + nonce: str, + body: Optional[Dict[str, Any]], +) -> bytes: + method_l = method.lower() + envelope: Dict[str, Any] = { + "method": method_l, + "nonce": nonce, + "path": path, + "timestamp": timestamp, + } + if method_l in ("post", "put", "patch") and body is not None: + envelope["payload"] = canonical_json(body) + canonical = canonical_json(envelope) + return canonical.encode("utf-8") + + +def sign_bytes(private_key: rsa.RSAPrivateKey, message: bytes) -> str: + """RSA PKCS#1 v1.5 signature over SHA-256(message), Base64-encoded.""" + signature = private_key.sign(message, padding.PKCS1v15(), hashes.SHA256()) + return base64.b64encode(signature).decode("ascii") + + +def generate_nonce(length: int = _NONCE_LENGTH) -> str: + """Cryptographically secure nonce (alphanumeric), default length 20.""" + return "".join(secrets.choice(_NONCE_ALPHABET) for _ in range(length)) + + +class ServiceAccountSigner: + """ + Builds the four required Nylas service account headers for a single request. + + Args: + private_key_pem: RSA private key in PEM text form (from the service account JSON). + private_key_id: Value for X-Nylas-Kid (``private_key_id`` in the JSON credentials). + """ + + def __init__(self, private_key_pem: str, private_key_id: str): + self._private_key = load_rsa_private_key_from_pem(private_key_pem) + self._private_key_id = private_key_id + + def build_headers( + self, + method: str, + path: str, + body: Optional[Dict[str, Any]] = None, + *, + timestamp: Optional[int] = None, + nonce: Optional[str] = None, + ) -> Tuple[Dict[str, str], Optional[bytes]]: + """ + Produce signing headers and optional canonical JSON body bytes. + + For POST/PUT/PATCH, ``body`` must be the same dict that will be sent; returned bytes + should be passed to HttpClient as ``serialized_json_body`` so the wire body matches + the signed payload. + + Returns: + (headers, serialized_json_body) where serialized_json_body is set for + POST/PUT/PATCH when body is not None, else None. + """ + ts = int(time.time()) if timestamp is None else int(timestamp) + n = generate_nonce() if nonce is None else nonce + + serialized: Optional[bytes] = None + body_for_sign: Optional[Dict[str, Any]] = body + if method.lower() in ("post", "put", "patch") and body is not None: + serialized = canonical_json(body).encode("utf-8") + + envelope = _signing_envelope_bytes(path, method, ts, n, body_for_sign) + signature_b64 = sign_bytes(self._private_key, envelope) + + headers = { + "X-Nylas-Kid": self._private_key_id, + "X-Nylas-Nonce": n, + "X-Nylas-Timestamp": str(ts), + "X-Nylas-Signature": signature_b64, + } + return headers, serialized diff --git a/nylas/models/domains.py b/nylas/models/domains.py new file mode 100644 index 0000000..cca1b7c --- /dev/null +++ b/nylas/models/domains.py @@ -0,0 +1,100 @@ +from dataclasses import dataclass, field +from typing import Any, Literal, Optional + +from dataclasses_json import config, dataclass_json +from typing_extensions import TypedDict + +from nylas.models.list_query_params import ListQueryParams + +DomainVerificationType = Literal["ownership", "dkim", "spf", "feedback", "mx"] + + +class ListDomainsQueryParams(ListQueryParams): + """ + Query parameters for listing domains. + + Attributes: + limit: Maximum number of objects to return. + page_token: Cursor for the next page (from ``next_cursor`` on the previous response). + """ + + pass + + +class CreateDomainRequest(TypedDict): + """Request body for registering a domain.""" + + name: str + domain_address: str + + +class UpdateDomainRequest(TypedDict, total=False): + """Request body for updating a domain (currently only ``name`` is supported).""" + + name: str + + +class GetDomainInfoRequest(TypedDict): + """Request body for retrieving DNS records for a verification type.""" + + type: DomainVerificationType + + +class VerifyDomainRequest(TypedDict): + """Request body for triggering DNS verification.""" + + type: DomainVerificationType + + +@dataclass_json +@dataclass +class Domain: + """ + A domain registered for Transactional Send or Nylas Inbound. + """ + + id: str + name: str + branded: bool + domain_address: str + organization_id: str + region: str + verified_ownership: bool + verified_dkim: bool + verified_spf: bool + verified_mx: bool + verified_feedback: bool + verified_dmarc: bool + verified_arc: bool + created_at: int + updated_at: int + + +@dataclass_json +@dataclass +class DomainVerificationAttempt: + """ + DNS verification attempt or required records for a verification type. + """ + + verification_type: Optional[str] = field( + default=None, metadata=config(field_name="type") + ) + options: Optional[Any] = None + host: Optional[str] = None + value: Optional[str] = None + status: Optional[str] = None + + +@dataclass_json +@dataclass +class DomainVerificationDetails: + """ + Response data from get domain info or verify domain endpoints. + """ + + domain_id: str + attempt: Optional[DomainVerificationAttempt] = None + created_at: Optional[int] = None + expires_at: Optional[int] = None + message: Optional[str] = None diff --git a/nylas/resources/domains.py b/nylas/resources/domains.py new file mode 100644 index 0000000..bde153e --- /dev/null +++ b/nylas/resources/domains.py @@ -0,0 +1,222 @@ +from typing import Optional + +from nylas.config import RequestOverrides +from nylas.handler.api_resources import ( + CreatableApiResource, + DestroyableApiResource, + FindableApiResource, + ListableApiResource, + UpdatableApiResource, +) +from nylas.handler.service_account import ServiceAccountSigner +from nylas.models.domains import ( + CreateDomainRequest, + Domain, + DomainVerificationDetails, + GetDomainInfoRequest, + ListDomainsQueryParams, + UpdateDomainRequest, + VerifyDomainRequest, +) +from nylas.models.response import DeleteResponse, ListResponse, Response + + +def _merge_signer_headers( + overrides: Optional[RequestOverrides], signer_headers: Optional[dict] +) -> Optional[RequestOverrides]: + if not signer_headers: + return overrides + merged: RequestOverrides = dict(overrides) if overrides else {} + headers = dict(merged.get("headers") or {}) + headers.update(signer_headers) + merged["headers"] = headers + return merged + + +class Domains( + ListableApiResource, + FindableApiResource, + CreatableApiResource, + UpdatableApiResource, + DestroyableApiResource, +): + """ + Nylas Manage Domains API (``/v3/admin/domains``). + + Organization admin endpoints for registering and verifying email domains used with + Transactional Send and Nylas Inbound. Optional :class:`ServiceAccountSigner` adds the + required ``X-Nylas-*`` headers; you can also supply those headers via ``RequestOverrides``. + """ + + def list( + self, + query_params: Optional[ListDomainsQueryParams] = None, + signer: Optional[ServiceAccountSigner] = None, + overrides: RequestOverrides = None, + ) -> ListResponse[Domain]: + path = "/v3/admin/domains" + merged = overrides + if signer: + hdrs, _ = signer.build_headers("GET", path, None) + merged = _merge_signer_headers(overrides, hdrs) + return super().list( + path=path, + response_type=Domain, + query_params=query_params, + overrides=merged, + ) + + def create( + self, + request_body: CreateDomainRequest, + signer: Optional[ServiceAccountSigner] = None, + overrides: RequestOverrides = None, + ) -> Response[Domain]: + path = "/v3/admin/domains" + merged = overrides + serialized = None + body_arg = request_body + if signer: + hdrs, serialized = signer.build_headers("POST", path, dict(request_body)) + merged = _merge_signer_headers(overrides, hdrs) + if serialized is not None: + body_arg = None + return super().create( + path=path, + request_body=body_arg, + response_type=Domain, + overrides=merged, + serialized_json_body=serialized, + ) + + def find( + self, + domain_id: str, + signer: Optional[ServiceAccountSigner] = None, + overrides: RequestOverrides = None, + ) -> Response[Domain]: + path = f"/v3/admin/domains/{domain_id}" + merged = overrides + if signer: + hdrs, _ = signer.build_headers("GET", path, None) + merged = _merge_signer_headers(overrides, hdrs) + return super().find( + path=path, + response_type=Domain, + overrides=merged, + ) + + def update( + self, + domain_id: str, + request_body: UpdateDomainRequest, + signer: Optional[ServiceAccountSigner] = None, + overrides: RequestOverrides = None, + ) -> Response[Domain]: + path = f"/v3/admin/domains/{domain_id}" + merged = overrides + serialized = None + body_arg = request_body + if signer: + hdrs, serialized = signer.build_headers("PUT", path, dict(request_body)) + merged = _merge_signer_headers(overrides, hdrs) + if serialized is not None: + body_arg = None + return super().update( + path=path, + request_body=body_arg, + response_type=Domain, + overrides=merged, + serialized_json_body=serialized, + ) + + def destroy( + self, + domain_id: str, + signer: Optional[ServiceAccountSigner] = None, + overrides: RequestOverrides = None, + ) -> DeleteResponse: + path = f"/v3/admin/domains/{domain_id}" + merged = overrides + if signer: + hdrs, _ = signer.build_headers("DELETE", path, None) + merged = _merge_signer_headers(overrides, hdrs) + return super().destroy(path=path, overrides=merged) + + def get_info( + self, + domain_id: str, + request_body: GetDomainInfoRequest, + signer: Optional[ServiceAccountSigner] = None, + overrides: RequestOverrides = None, + ) -> Response[DomainVerificationDetails]: + """ + Return DNS record information and verification status for the given verification type. + + Args: + domain_id: The domain ID. + request_body: Body with ``type`` (for example ``ownership`` or ``dkim``). + signer: Optional service account signer for ``X-Nylas-*`` headers. + overrides: Request overrides (for example extra headers). + + Returns: + Verification details including required DNS records. + """ + path = f"/v3/admin/domains/{domain_id}/info" + body = dict(request_body) + merged = overrides + serialized = None + if signer: + hdrs, serialized = signer.build_headers("POST", path, body) + merged = _merge_signer_headers(overrides, hdrs) + exec_kwargs = {"overrides": merged} + if serialized is not None: + exec_kwargs["serialized_json_body"] = serialized + res, headers = self._http_client._execute( + "POST", + path, + None, + None, + None if serialized is not None else body, + **exec_kwargs, + ) + return Response.from_dict(res, DomainVerificationDetails, headers) + + def verify( + self, + domain_id: str, + request_body: VerifyDomainRequest, + signer: Optional[ServiceAccountSigner] = None, + overrides: RequestOverrides = None, + ) -> Response[DomainVerificationDetails]: + """ + Trigger a verification check for the specified DNS record type. + + Args: + domain_id: The domain ID. + request_body: Body with ``type`` of verification to run. + signer: Optional service account signer for ``X-Nylas-*`` headers. + overrides: Request overrides (for example extra headers). + + Returns: + Verification attempt details and status. + """ + path = f"/v3/admin/domains/{domain_id}/verify" + body = dict(request_body) + merged = overrides + serialized = None + if signer: + hdrs, serialized = signer.build_headers("POST", path, body) + merged = _merge_signer_headers(overrides, hdrs) + exec_kwargs = {"overrides": merged} + if serialized is not None: + exec_kwargs["serialized_json_body"] = serialized + res, headers = self._http_client._execute( + "POST", + path, + None, + None, + None if serialized is not None else body, + **exec_kwargs, + ) + return Response.from_dict(res, DomainVerificationDetails, headers) diff --git a/pyproject.toml b/pyproject.toml index c46c1af..42b4f9f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "requests-toolbelt>=1.0.0", "dataclasses-json>=0.5.9", "typing_extensions>=4.7.1", + "cryptography>=42.0.0", ] [project.optional-dependencies] diff --git a/setup.py b/setup.py index 937d397..bbe3173 100644 --- a/setup.py +++ b/setup.py @@ -20,6 +20,7 @@ "requests-toolbelt>=1.0.0", "dataclasses-json>=0.5.9", "typing_extensions>=4.7.1", + "cryptography>=42.0.0", ] TEST_DEPENDENCIES = ["pytest>=7.4.0", "pytest-cov>=4.1.0", "setuptools>=69.0.3"] @@ -112,7 +113,6 @@ def main(): packages=find_packages(), install_requires=RUN_DEPENDENCIES, dependency_links=[], - tests_require=TEST_DEPENDENCIES, extras_require={ "test": TEST_DEPENDENCIES, "docs": DOCS_DEPENDENCIES, diff --git a/tests/handler/test_http_client.py b/tests/handler/test_http_client.py index f9511d4..a0ae7d6 100644 --- a/tests/handler/test_http_client.py +++ b/tests/handler/test_http_client.py @@ -306,6 +306,49 @@ def test_execute(self, http_client, patched_version_and_sys, patched_request): timeout=30, ) + def test_execute_with_serialized_json_body( + self, http_client, patched_version_and_sys, patched_request + ): + """Pre-serialized body bytes are sent as-is (e.g. Nylas service account signing).""" + mock_response = Mock() + mock_response.json.return_value = {"ok": True} + mock_response.headers = {} + mock_response.status_code = 200 + patched_request.return_value = mock_response + + canonical = b'{"a":1,"b":2}' + response_json, response_headers = http_client._execute( + method="POST", + path="/v3/admin/domains", + request_body=None, + serialized_json_body=canonical, + ) + + assert response_json == {"ok": True} + patched_request.assert_called_once_with( + "POST", + "https://test.nylas.com/v3/admin/domains", + headers={ + "X-Nylas-API-Wrapper": "python", + "User-Agent": "Nylas Python SDK 2.0.0 - 1.2.3", + "Authorization": "Bearer test-key", + "Content-type": "application/json; charset=utf-8", + }, + data=canonical, + timeout=30, + ) + + def test_build_request_sets_content_type_for_serialized_json_body( + self, http_client, patched_version_and_sys + ): + request = http_client._build_request( + method="POST", + path="/signed", + request_body=None, + serialized_json_body=b"{}", + ) + assert request["headers"]["Content-type"] == "application/json; charset=utf-8" + def test_execute_override_timeout( self, http_client, patched_version_and_sys, patched_request ): diff --git a/tests/handler/test_service_account.py b/tests/handler/test_service_account.py new file mode 100644 index 0000000..bfcb406 --- /dev/null +++ b/tests/handler/test_service_account.py @@ -0,0 +1,177 @@ +import base64 +import string + +import pytest +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa + +from nylas.handler.service_account import ( + ServiceAccountSigner, + _signing_envelope_bytes, + canonical_json, + generate_nonce, + load_rsa_private_key_from_pem, + sign_bytes, +) + + +class TestCanonicalJson: + def test_sorted_keys_flat(self): + assert canonical_json({"b": 1, "a": 2}) == '{"a":2,"b":1}' + + def test_nested_dict_sorted(self): + assert ( + canonical_json({"z": {"b": 1, "a": 2}, "y": 0}) + == '{"y":0,"z":{"a":2,"b":1}}' + ) + + def test_string_escaping(self): + s = canonical_json({"msg": 'quote"here'}) + assert s.startswith("{") + assert '"msg":' in s + assert "quote" in s + + def test_list_and_bool_values_use_json_dumps(self): + s = canonical_json({"ok": True, "items": [3, 1, 2]}) + assert '"items":[3,1,2]' in s + assert '"ok":true' in s + + +class TestServiceAccountSigning: + @pytest.fixture + def rsa_pem(self): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + return ( + key, + key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("ascii"), + ) + + def test_load_pkcs8_pem(self, rsa_pem): + _, pem = rsa_pem + loaded = load_rsa_private_key_from_pem(pem) + assert isinstance(loaded, rsa.RSAPrivateKey) + + def test_load_pkcs1_traditional_pem(self, rsa_pem): + private_key, _ = rsa_pem + pem_pkcs1 = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + loaded = load_rsa_private_key_from_pem(pem_pkcs1) + assert isinstance(loaded, rsa.RSAPrivateKey) + + def test_load_pem_accepts_bytes(self, rsa_pem): + _, pem = rsa_pem + loaded = load_rsa_private_key_from_pem(pem.encode("ascii")) + assert isinstance(loaded, rsa.RSAPrivateKey) + + def test_load_non_rsa_raises(self): + ec_key = ec.generate_private_key(ec.SECP256R1()) + pem = ec_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("ascii") + with pytest.raises(ValueError, match="Private key must be RSA"): + load_rsa_private_key_from_pem(pem) + + def test_sign_bytes_verifies_with_public_key(self, rsa_pem): + private_key, pem = rsa_pem + message = b"hello nylas canonical envelope" + sig_b64 = sign_bytes(private_key, message) + sig = base64.b64decode(sig_b64) + public_key = private_key.public_key() + public_key.verify(sig, message, padding.PKCS1v15(), hashes.SHA256()) + + def test_golden_envelope_signature_round_trip(self, rsa_pem): + """Fixed inputs: signature must verify (independent of ServiceAccountSigner time).""" + private_key, pem = rsa_pem + path = "/v3/admin/domains" + ts = 1742932766 + nonce = "abcdefabcdefabcdefab" + body = {"type": "ownership"} + envelope = _signing_envelope_bytes(path, "POST", ts, nonce, body) + sig_b64 = sign_bytes(private_key, envelope) + sig = base64.b64decode(sig_b64) + private_key.public_key().verify(sig, envelope, padding.PKCS1v15(), hashes.SHA256()) + + def test_service_account_signer_build_headers_post(self, rsa_pem): + private_key, pem = rsa_pem + signer = ServiceAccountSigner(pem, "test-kid-uuid") + headers, body_bytes = signer.build_headers( + "POST", + "/v3/admin/domains", + {"name": "My domain", "domain_address": "mail.example.com"}, + timestamp=1700000000, + nonce="nonce123456789012345", + ) + assert headers["X-Nylas-Kid"] == "test-kid-uuid" + assert headers["X-Nylas-Nonce"] == "nonce123456789012345" + assert headers["X-Nylas-Timestamp"] == "1700000000" + assert len(headers["X-Nylas-Signature"]) > 0 + assert body_bytes == canonical_json( + {"name": "My domain", "domain_address": "mail.example.com"} + ).encode("utf-8") + + envelope = _signing_envelope_bytes( + "/v3/admin/domains", + "POST", + 1700000000, + "nonce123456789012345", + {"name": "My domain", "domain_address": "mail.example.com"}, + ) + sig = base64.b64decode(headers["X-Nylas-Signature"]) + private_key.public_key().verify(sig, envelope, padding.PKCS1v15(), hashes.SHA256()) + + def test_service_account_signer_get_no_body_bytes(self, rsa_pem): + _, pem = rsa_pem + signer = ServiceAccountSigner(pem, "kid") + headers, body_bytes = signer.build_headers( + "GET", "/v3/admin/domains", None, timestamp=1, nonce="n" * 20 + ) + assert body_bytes is None + assert "X-Nylas-Signature" in headers + + def test_signing_envelope_get_omits_payload(self, rsa_pem): + private_key, _ = rsa_pem + env = _signing_envelope_bytes("/v3/admin/domains", "GET", 1, "n" * 20, None) + assert b"payload" not in env + sig_b64 = sign_bytes(private_key, env) + private_key.public_key().verify( + base64.b64decode(sig_b64), env, padding.PKCS1v15(), hashes.SHA256() + ) + + def test_signing_envelope_put_and_patch_include_payload(self, rsa_pem): + private_key, _ = rsa_pem + for method in ("PUT", "patch"): + env = _signing_envelope_bytes( + "/v3/admin/domains/x", method, 2, "m" * 20, {"name": "n"} + ) + assert b"payload" in env + sig_b64 = sign_bytes(private_key, env) + private_key.public_key().verify( + base64.b64decode(sig_b64), env, padding.PKCS1v15(), hashes.SHA256() + ) + + def test_generate_nonce_custom_length(self): + n = generate_nonce(12) + assert len(n) == 12 + assert all(c in (string.ascii_letters + string.digits) for c in n) + + def test_build_headers_patch(self, rsa_pem): + _, pem = rsa_pem + signer = ServiceAccountSigner(pem, "kid") + headers, body_bytes = signer.build_headers( + "PATCH", + "/v3/admin/example", + {"op": "replace"}, + timestamp=9, + nonce="z" * 20, + ) + assert body_bytes == canonical_json({"op": "replace"}).encode("utf-8") + assert headers["X-Nylas-Timestamp"] == "9" diff --git a/tests/resources/test_domains.py b/tests/resources/test_domains.py new file mode 100644 index 0000000..fe9d1f9 --- /dev/null +++ b/tests/resources/test_domains.py @@ -0,0 +1,334 @@ +from unittest.mock import patch + +import pytest +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +from nylas.handler.service_account import ServiceAccountSigner +from nylas.models.domains import Domain, DomainVerificationDetails +from nylas.models.response import ListResponse, Response +from nylas.resources import domains as domains_module +from nylas.resources.domains import Domains + + +def _test_rsa_pem(): + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + return key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("ascii") + + +@pytest.fixture +def domain_data(): + return { + "id": "dom_123", + "name": "My domain", + "branded": False, + "domain_address": "mail.example.com", + "organization_id": "org_1", + "region": "us", + "verified_ownership": False, + "verified_dkim": False, + "verified_spf": False, + "verified_mx": False, + "verified_feedback": False, + "verified_dmarc": False, + "verified_arc": False, + "created_at": 1, + "updated_at": 2, + } + + +class TestMergeSignerHeaders: + def test_returns_overrides_when_no_signer_headers(self): + assert domains_module._merge_signer_headers({"timeout": 5}, None) == {"timeout": 5} + assert domains_module._merge_signer_headers(None, {}) is None + + def test_merges_headers_preserving_existing(self): + merged = domains_module._merge_signer_headers( + {"headers": {"X-Existing": "keep"}, "timeout": 30}, + {"X-Nylas-Kid": "kid", "X-Nylas-Signature": "sig"}, + ) + assert merged["timeout"] == 30 + assert merged["headers"]["X-Existing"] == "keep" + assert merged["headers"]["X-Nylas-Kid"] == "kid" + assert merged["headers"]["X-Nylas-Signature"] == "sig" + + def test_creates_overrides_when_none(self): + merged = domains_module._merge_signer_headers( + None, {"X-Nylas-Kid": "kid"} + ) + assert merged == {"headers": {"X-Nylas-Kid": "kid"}} + + +class TestDomains: + def test_domain_model_from_dict(self, domain_data): + d = Domain.from_dict(domain_data) + assert d.id == "dom_123" + assert d.domain_address == "mail.example.com" + + def test_domain_verification_details_from_dict(self): + raw = { + "domain_id": "d1", + "attempt": {"type": "dkim", "status": "pending"}, + "message": "add TXT", + } + d = DomainVerificationDetails.from_dict(raw, infer_missing=True) + assert d.domain_id == "d1" + assert d.attempt is not None + assert d.attempt.verification_type == "dkim" + assert d.message == "add TXT" + + def test_list_without_signer(self, http_client_list_response): + with patch( + "nylas.models.response.ListResponse.from_dict", + return_value=ListResponse([], "rid", None, {}), + ): + domains = Domains(http_client_list_response) + domains.list() + http_client_list_response._execute.assert_called_once_with( + "GET", + "/v3/admin/domains", + None, + None, + None, + overrides=None, + ) + + def test_list_with_query_and_signer(self, http_client_list_response): + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-1") + with patch( + "nylas.models.response.ListResponse.from_dict", + return_value=ListResponse([], "rid", None, {}), + ): + domains = Domains(http_client_list_response) + domains.list(query_params={"limit": 10}, signer=signer) + args, kwargs = http_client_list_response._execute.call_args + assert args[0] == "GET" + assert "/v3/admin/domains" in args[1] + ov = kwargs.get("overrides") or {} + assert "X-Nylas-Signature" in (ov.get("headers") or {}) + + def test_create_without_signer(self, http_client_response, domain_data): + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(domain_data, "rid", {}), + ): + domains = Domains(http_client_response) + domains.create( + {"name": "My domain", "domain_address": "mail.example.com"}, + ) + http_client_response._execute.assert_called_once_with( + "POST", + "/v3/admin/domains", + None, + None, + {"name": "My domain", "domain_address": "mail.example.com"}, + overrides=None, + ) + + def test_find_without_signer(self, http_client_response, domain_data): + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(domain_data, "rid", {}), + ): + domains = Domains(http_client_response) + domains.find("dom_abc") + http_client_response._execute.assert_called_once_with( + "GET", + "/v3/admin/domains/dom_abc", + None, + None, + None, + overrides=None, + ) + + def test_find_with_signer(self, http_client_response, domain_data): + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-x") + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(domain_data, "rid", {}), + ): + domains = Domains(http_client_response) + domains.find("dom_abc", signer=signer) + ov = http_client_response._execute.call_args.kwargs.get("overrides") or {} + assert "X-Nylas-Signature" in (ov.get("headers") or {}) + + def test_update_without_signer(self, http_client_response, domain_data): + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(domain_data, "rid", {}), + ): + domains = Domains(http_client_response) + domains.update("dom_123", {"name": "Renamed"}) + http_client_response._execute.assert_called_once_with( + "PUT", + "/v3/admin/domains/dom_123", + None, + None, + {"name": "Renamed"}, + overrides=None, + ) + + def test_update_with_signer(self, http_client_response, domain_data): + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-1") + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(domain_data, "rid", {}), + ): + domains = Domains(http_client_response) + domains.update("dom_123", {"name": "Renamed"}, signer=signer) + kwargs = http_client_response._execute.call_args.kwargs + assert "serialized_json_body" in kwargs + assert http_client_response._execute.call_args[0][4] is None + + def test_create_with_signer_sends_serialized_body(self, http_client_response, domain_data): + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-1") + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(domain_data, "rid", {}), + ): + domains = Domains(http_client_response) + domains.create( + {"name": "My domain", "domain_address": "mail.example.com"}, + signer=signer, + ) + kwargs = http_client_response._execute.call_args.kwargs + assert "serialized_json_body" in kwargs + assert kwargs["serialized_json_body"].startswith(b"{") + pos = http_client_response._execute.call_args[0] + assert pos[4] is None + + def test_destroy_with_signer(self, http_client_delete_response): + from nylas.models.response import DeleteResponse + + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-del") + http_client_delete_response._execute.return_value = ( + {"request_id": "del-rid"}, + {}, + ) + domains = Domains(http_client_delete_response) + domains.destroy("dom_123", signer=signer) + ov = http_client_delete_response._execute.call_args.kwargs["overrides"] + assert "X-Nylas-Signature" in ov["headers"] + + def test_destroy(self, http_client_delete_response): + from nylas.models.response import DeleteResponse + + http_client_delete_response._execute.return_value = ( + {"request_id": "del-rid"}, + {}, + ) + domains = Domains(http_client_delete_response) + out = domains.destroy("dom_123") + assert isinstance(out, DeleteResponse) + http_client_delete_response._execute.assert_called_once_with( + "DELETE", + "/v3/admin/domains/dom_123", + None, + None, + None, + overrides=None, + ) + + def test_get_info_with_signer(self, http_client_response): + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-i") + info = {"domain_id": "dom_123", "attempt": {"type": "spf"}} + http_client_response._execute.return_value = ( + {"request_id": "r1", "data": info}, + {}, + ) + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(info, "r1", {}), + ): + domains = Domains(http_client_response) + domains.get_info("dom_123", {"type": "spf"}, signer=signer) + kwargs = http_client_response._execute.call_args.kwargs + assert "serialized_json_body" in kwargs + assert http_client_response._execute.call_args[0][4] is None + + def test_verify_without_signer(self, http_client_response): + info = {"domain_id": "dom_123", "attempt": {"type": "mx"}} + http_client_response._execute.return_value = ( + {"request_id": "rv", "data": info}, + {}, + ) + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(info, "rv", {}), + ): + domains = Domains(http_client_response) + domains.verify("dom_123", {"type": "mx"}) + http_client_response._execute.assert_called_once_with( + "POST", + "/v3/admin/domains/dom_123/verify", + None, + None, + {"type": "mx"}, + overrides=None, + ) + + def test_verify_with_signer(self, http_client_response): + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-v") + info = {"domain_id": "dom_123"} + http_client_response._execute.return_value = ( + {"request_id": "rv", "data": info}, + {}, + ) + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(info, "rv", {}), + ): + domains = Domains(http_client_response) + domains.verify("dom_123", {"type": "dkim"}, signer=signer) + assert "serialized_json_body" in http_client_response._execute.call_args.kwargs + + def test_get_info(self, http_client_response): + info = { + "domain_id": "dom_123", + "attempt": {"type": "ownership", "status": "pending"}, + } + http_client_response._execute.return_value = ( + {"request_id": "r1", "data": info}, + {}, + ) + with patch( + "nylas.models.response.Response.from_dict", + return_value=Response(info, "r1", {}), + ): + domains = Domains(http_client_response) + domains.get_info("dom_123", {"type": "ownership"}) + http_client_response._execute.assert_called_once_with( + "POST", + "/v3/admin/domains/dom_123/info", + None, + None, + {"type": "ownership"}, + overrides=None, + ) + + def test_merge_signer_with_existing_headers(self, http_client_list_response): + pem = _test_rsa_pem() + signer = ServiceAccountSigner(pem, "kid-1") + with patch( + "nylas.models.response.ListResponse.from_dict", + return_value=ListResponse([], "rid", None, {}), + ): + domains = Domains(http_client_list_response) + domains.list( + signer=signer, + overrides={"headers": {"X-Custom": "precedence"}}, + ) + headers = http_client_list_response._execute.call_args.kwargs["overrides"]["headers"] + assert headers["X-Custom"] == "precedence" + assert "X-Nylas-Kid" in headers diff --git a/tests/test_client.py b/tests/test_client.py index ac3d23b..44b3a11 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -6,6 +6,7 @@ from nylas.resources.connectors import Connectors from nylas.resources.contacts import Contacts from nylas.resources.drafts import Drafts +from nylas.resources.domains import Domains from nylas.resources.events import Events from nylas.resources.folders import Folders from nylas.resources.grants import Grants @@ -64,6 +65,10 @@ def test_client_drafts_property(self, client): assert client.drafts is not None assert type(client.drafts) is Drafts + def test_client_domains_property(self, client): + assert client.domains is not None + assert type(client.domains) is Domains + def test_client_events_property(self, client): assert client.events is not None assert type(client.events) is Events