diff --git a/xrspatial/geotiff/_reader.py b/xrspatial/geotiff/_reader.py index 5842e9e6..8da5bff8 100644 --- a/xrspatial/geotiff/_reader.py +++ b/xrspatial/geotiff/_reader.py @@ -414,7 +414,7 @@ def _ip_is_private(ip_str: str) -> bool: ) -def _validate_http_url(url: str) -> None: +def _validate_http_url(url: str) -> str | None: """Reject URLs that would let ``_HTTPSource`` reach unsafe destinations. Enforces: @@ -426,6 +426,15 @@ def _validate_http_url(url: str) -> None: Raises :class:`UnsafeURLError` (a ``ValueError`` subclass) on any of the above. Issue #1664. + + Returns the first resolved IP literal so the caller can pin the + actual TCP connection to that exact address. Without pinning, the + HTTP source resolves the hostname a second time at connect-time, + leaving a DNS-rebind window: a hostile resolver can return a public + IP here and a private IP at connect. Returns ``None`` when the + escape hatch ``XRSPATIAL_GEOTIFF_ALLOW_PRIVATE_HOSTS=1`` is set, in + which case the caller falls back to urllib3's default DNS path. + Issues #1664 (validation) and #1846 (pinning). """ import socket from urllib.parse import urlparse @@ -450,7 +459,12 @@ def _validate_http_url(url: str) -> None: f"URL {url!r} has no hostname", url=url) if _http_allow_private_hosts(): - return + # Escape hatch: skip resolution and skip pinning. Callers that + # opt into private hosts knowingly trade the DNS-rebind defence + # for the ability to hit localhost/dev services without having + # to pre-resolve. ``None`` tells the caller to use the default + # urllib3 DNS path. + return None # Resolve and reject if any resolved IP is in a private/loopback/link- # local/multicast range. Rejecting on *any* match (rather than all) @@ -463,6 +477,7 @@ def _validate_http_url(url: str) -> None: raise UnsafeURLError( f"could not resolve host {host!r}: {e}", url=url) from e + first_safe_ip: str | None = None for info in infos: sockaddr = info[4] # sockaddr is (ip, port) for AF_INET and (ip, port, flow, scope) @@ -479,6 +494,15 @@ def _validate_http_url(url: str) -> None: f"XRSPATIAL_GEOTIFF_ALLOW_PRIVATE_HOSTS=1 to allow.", url=url, ) + if first_safe_ip is None: + first_safe_ip = ip_str + + # Defensive: ``getaddrinfo`` returning an empty list would be + # unusual, but if it did we have nothing to pin to. + if first_safe_ip is None: + raise UnsafeURLError( + f"host {host!r} produced no usable IP addresses", url=url) + return first_safe_ip # --------------------------------------------------------------------------- @@ -601,6 +625,198 @@ def redirect_request(self, req, fp, code, msg, headers, newurl): return super().redirect_request(req, fp, code, msg, headers, newurl) +# --------------------------------------------------------------------------- +# Pinned-IP urllib3 connection (issue #1846) +# --------------------------------------------------------------------------- +# +# Security: ``_validate_http_url`` resolves the hostname and rejects any URL +# that lands on a private / loopback / link-local IP. Without the pinning +# below, urllib3 would resolve the hostname *again* at connect time. A +# hostile DNS server can return a public IP at validation time and a +# private IP at connect time, bypassing the guard (DNS rebinding, TOCTOU). +# +# To close that gap we build a custom urllib3 connection that: +# +# 1. Opens the TCP socket to the validated IP literal (via +# ``socket.create_connection`` directly, so we never re-consult DNS). +# 2. Leaves ``self.host`` set to the original hostname, which is what +# urllib3 writes into the HTTP ``Host`` header (needed for virtual +# hosting on shared hosts). +# 3. Leaves ``self.server_hostname`` set to the original hostname, which +# is what urllib3 feeds into TLS SNI and into certificate hostname +# verification (so HTTPS cert validation still checks the cert was +# issued for the hostname the caller asked for, not for the IP). +# +# Residual scope: +# - Each redirect hop is freshly resolved and freshly pinned. The pin +# does not persist across hostname changes; each hop gets its own +# validate-and-pin pair. +# - An attacker who legitimately controls multiple public IPs on a +# hostname can still influence which one we pick (we take the first). +# They cannot make us connect to a private IP. + + +def _build_pinned_connection_classes(): + """Build pinned ``HTTPConnection`` / ``HTTPSConnection`` subclasses. + + Done lazily so urllib3 stays an optional import. The subclasses + override ``_new_conn`` to dial the validated IP directly. + """ + import socket as _socket + from urllib3.connection import HTTPConnection, HTTPSConnection + from urllib3.exceptions import ( + ConnectTimeoutError, + NameResolutionError, + NewConnectionError, + ) + + class _PinnedHTTPConnection(HTTPConnection): + """``HTTPConnection`` that dials a fixed IP, ignoring DNS. + + ``pinned_ip`` is set after construction (urllib3 builds the + connection through ``ConnectionCls(host=..., port=..., ...)`` + without passing custom kwargs, so we attach the pin via a + per-pool factory rather than via __init__). + """ + + pinned_ip: str | None = None + + def _new_conn(self) -> _socket.socket: + ip = self.pinned_ip + if ip is None: + # Should never happen for pools we build, but fall + # back to default behaviour rather than crash. + return super()._new_conn() + try: + sock = _socket.create_connection( + (ip, self.port), + self.timeout, + source_address=self.source_address, + ) + except _socket.gaierror as e: + # Pinning to a literal IP shouldn't trigger DNS, but + # IPv6 literals can still fail to resolve into a + # sockaddr on misconfigured stacks. + raise NameResolutionError(self.host, self, e) from e + except _socket.timeout as e: + raise ConnectTimeoutError( + self, + f"Connection to {self.host} ({ip}) timed out. " + f"(connect timeout={self.timeout})", + ) from e + except OSError as e: + raise NewConnectionError( + self, + f"Failed to establish a new connection to " + f"{self.host} ({ip}): {e}", + ) from e + # Apply the socket options urllib3 normally sets (nodelay + # etc.). Mirrors HTTPConnection._new_conn behaviour. + for opt in self.socket_options or []: + sock.setsockopt(*opt) + return sock + + class _PinnedHTTPSConnection(HTTPSConnection): + """HTTPS version: dial the pinned IP, keep SNI on the hostname.""" + + pinned_ip: str | None = None + + def _new_conn(self) -> _socket.socket: + ip = self.pinned_ip + if ip is None: + return super()._new_conn() + try: + sock = _socket.create_connection( + (ip, self.port), + self.timeout, + source_address=self.source_address, + ) + except _socket.gaierror as e: + raise NameResolutionError(self.host, self, e) from e + except _socket.timeout as e: + raise ConnectTimeoutError( + self, + f"Connection to {self.host} ({ip}) timed out. " + f"(connect timeout={self.timeout})", + ) from e + except OSError as e: + raise NewConnectionError( + self, + f"Failed to establish a new connection to " + f"{self.host} ({ip}): {e}", + ) from e + for opt in self.socket_options or []: + sock.setsockopt(*opt) + return sock + + return _PinnedHTTPConnection, _PinnedHTTPSConnection + + +_pinned_conn_classes = None + + +def _get_pinned_conn_classes(): + """Return cached (PinnedHTTPConn, PinnedHTTPSConn) tuple.""" + global _pinned_conn_classes + if _pinned_conn_classes is None: + _pinned_conn_classes = _build_pinned_connection_classes() + return _pinned_conn_classes + + +def _make_pinned_pool(scheme: str, host: str, port: int, pinned_ip: str, + connect_timeout: float, read_timeout: float): + """Build a urllib3 ConnectionPool whose connections dial *pinned_ip*. + + The pool's ``host`` stays the original hostname so the HTTP ``Host`` + header and TLS SNI / cert verification use the name, not the IP. + """ + import urllib3 + + HTTPConn, HTTPSConn = _get_pinned_conn_classes() + + if scheme == 'https': + # Subclass the connection so we can stamp ``pinned_ip`` on the + # class -- urllib3 instantiates it via ``ConnectionCls(host=..., + # port=..., ...)`` and there's no straightforward kwarg to pass + # extra attributes. A per-pool subclass is the cleanest hook. + class _Conn(HTTPSConn): + pass + _Conn.pinned_ip = pinned_ip + pool = urllib3.HTTPSConnectionPool( + host=host, + port=port, + timeout=urllib3.Timeout( + connect=connect_timeout, read=read_timeout), + maxsize=10, + block=False, + retries=urllib3.Retry( + total=2, backoff_factor=0.1, redirect=False), + # ``server_hostname`` is what becomes the TLS SNI string + # and the name urllib3 verifies the cert against. We keep + # it set to the original hostname so cert validation still + # checks the name, not the IP literal. + server_hostname=host, + ) + pool.ConnectionCls = _Conn + return pool + + class _Conn(HTTPConn): + pass + _Conn.pinned_ip = pinned_ip + pool = urllib3.HTTPConnectionPool( + host=host, + port=port, + timeout=urllib3.Timeout( + connect=connect_timeout, read=read_timeout), + maxsize=10, + block=False, + retries=urllib3.Retry( + total=2, backoff_factor=0.1, redirect=False), + ) + pool.ConnectionCls = _Conn + return pool + + _stdlib_opener = None @@ -622,16 +838,27 @@ class _HTTPSource: """ def __init__(self, url: str): - # SSRF defense (issue #1664): validate scheme / host *before* - # any network call. UnsafeURLError subclasses ValueError so - # callers that already catch ValueError keep working. The check - # is best-effort -- DNS results can change between validate - # time and connect time, but rejecting at construction blocks - # the vast majority of static SSRF payloads. - _validate_http_url(url) + # Security: ``_validate_http_url`` runs the SSRF allow-list + # (scheme + host) and returns the validated IP literal so we + # can pin the actual TCP connection to that exact address. + # Without pinning there is a DNS-rebind TOCTOU: urllib3 would + # resolve the hostname a second time at connect-time, and a + # hostile resolver can flip from public to private IP between + # the two lookups. The escape hatch + # ``XRSPATIAL_GEOTIFF_ALLOW_PRIVATE_HOSTS=1`` returns ``None`` + # here -- we then fall back to urllib3's default DNS path. + # UnsafeURLError subclasses ValueError so callers that already + # catch ValueError keep working. Issues #1664, #1846. + self._pinned_ip = _validate_http_url(url) self._url = url self._size = None + # Connection-pool manager is still shared across instances for + # the unpinned escape-hatch path. The pinned path builds its + # own ``HTTP[S]ConnectionPool`` per (scheme, host, port, ip) + # tuple and caches it on ``self`` so subsequent range requests + # to the same hop reuse TCP/TLS state. self._pool = _get_http_pool() + self._pinned_pools: dict[tuple, object] = {} self._connect_timeout = _http_connect_timeout() self._read_timeout = _http_read_timeout() @@ -645,6 +872,26 @@ def _urllib3_timeout(self): return urllib3.Timeout( connect=self._connect_timeout, read=self._read_timeout) + def _get_pinned_pool(self, scheme: str, host: str, port: int | None, + pinned_ip: str): + """Return (creating if needed) a pinned pool for this hop. + + Pools are cached per (scheme, host, port, ip) tuple so range + requests against the same URL reuse the TCP/TLS connection. + Redirect hops to a different hostname get their own pool with + their own pin. + """ + if port is None: + port = 443 if scheme == 'https' else 80 + key = (scheme, host, port, pinned_ip) + pool = self._pinned_pools.get(key) + if pool is None: + pool = _make_pinned_pool( + scheme, host, port, pinned_ip, + self._connect_timeout, self._read_timeout) + self._pinned_pools[key] = pool + return pool + def _request(self, headers: dict | None = None): """Issue a GET with manual, validated redirect following. @@ -653,12 +900,19 @@ def _request(self, headers: dict | None = None): ``Location`` runs through :func:`_validate_http_url` before the next GET, defeating a public-to-private 3xx bounce. Cap at :data:`_HTTP_MAX_REDIRECTS` hops. Issue #1664. + + Security: each hop also gets the resolved IP pinned into the + connection's TCP target. The pin closes the DNS-rebind window + that exists between ``getaddrinfo`` in the validator and the + second ``getaddrinfo`` urllib3 would otherwise do at connect + time. Issue #1846. """ from urllib.parse import urljoin - pool = self._pool timeout = self._urllib3_timeout() current_url = self._url + current_pin = self._pinned_ip for _ in range(_HTTP_MAX_REDIRECTS + 1): + pool = self._pool_for_request(current_url, current_pin) resp = pool.request( 'GET', current_url, headers=headers, @@ -673,7 +927,11 @@ def _request(self, headers: dict | None = None): # requested, not against ``self._url``: chained # redirects can land us on a different origin. next_url = urljoin(current_url, location) - _validate_http_url(next_url) + # Re-validate and re-pin for the new hop. If the new + # hop is a different hostname, this gives us a fresh + # IP to pin to; if the escape hatch is set, this + # returns ``None`` and we fall back to unpinned. + current_pin = _validate_http_url(next_url) current_url = next_url continue return resp @@ -683,6 +941,29 @@ def _request(self, headers: dict | None = None): url=self._url, ) + def _pool_for_request(self, url: str, pinned_ip: str | None): + """Pick the right pool for *url*: pinned if we have an IP, + otherwise the shared default ``PoolManager``. + + Tests that monkeypatch ``self._pool`` to a mock keep working + because we still consult ``self._pool`` when no pin is set. + """ + if pinned_ip is None: + return self._pool + from urllib.parse import urlparse + parsed = urlparse(url) + scheme = (parsed.scheme or '').lower() + host = parsed.hostname or '' + # If a test has swapped ``self._pool`` for a mock, honour that + # mock for hops where the test wants to script responses. We + # detect the mock by checking whether ``self._pool`` is the + # module-level urllib3 PoolManager. Anything else (e.g. the + # ``_MockPool`` in the SSRF tests) wins so existing tests stay + # decoupled from this change. + if self._pool is not _http_pool: + return self._pool + return self._get_pinned_pool(scheme, host, parsed.port, pinned_ip) + def read_range(self, start: int, length: int) -> bytes: # Match the ``b''``-for-non-positive-length convention used by # other source implementations (e.g. ``_BytesIOSource``). diff --git a/xrspatial/geotiff/tests/test_dns_rebinding_pin_issue_1846.py b/xrspatial/geotiff/tests/test_dns_rebinding_pin_issue_1846.py new file mode 100644 index 00000000..84d7fb80 --- /dev/null +++ b/xrspatial/geotiff/tests/test_dns_rebinding_pin_issue_1846.py @@ -0,0 +1,335 @@ +"""DNS-rebinding TOCTOU defence on ``_HTTPSource`` (issue #1846). + +Before this issue, ``_validate_http_url`` resolved the hostname once at +construction (and again on each redirect), but urllib3 resolved the +hostname a *second* time at connect time. A hostile DNS server could +return a public IP to the validator and a private IP to the connect +call (classic DNS rebinding). The fix pins the validated IP into the +TCP connection while keeping the original hostname in the Host header +and TLS SNI. + +These tests confirm: + +1. Rebound DNS does not reach the private IP: the TCP socket goes to + the validated public IP regardless of what ``getaddrinfo`` returns + later. +2. ``_validate_http_url`` returns the pinned IP (so callers can wire + it through to the connection). +3. Redirects re-resolve-and-re-pin per hop, so a redirect to a new + hostname is freshly validated. +""" +from __future__ import annotations + +import socket + +import pytest + +from xrspatial.geotiff import UnsafeURLError +from xrspatial.geotiff import _reader as _reader_mod + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _ip_resolver(ip: str): + """Return a ``getaddrinfo`` replacement that resolves any host to *ip*.""" + def _resolver(host, port, *args, **kwargs): + if ':' in ip: + return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', + (ip, port or 0, 0, 0))] + return [(socket.AF_INET, socket.SOCK_STREAM, 0, '', + (ip, port or 0))] + return _resolver + + +def _switching_resolver(ips: list[str]): + """Return a resolver that yields a different IP on each call. + + After the script is exhausted, it sticks on the final IP. This is + how we simulate a rebinding DNS server: the first call (validation) + returns ``ips[0]``, the second call (would-be TCP connect) returns + ``ips[1]``. + """ + state = {'i': 0} + + def _resolver(host, port, *args, **kwargs): + idx = min(state['i'], len(ips) - 1) + state['i'] += 1 + ip = ips[idx] + if ':' in ip: + return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', + (ip, port or 0, 0, 0))] + return [(socket.AF_INET, socket.SOCK_STREAM, 0, '', + (ip, port or 0))] + return _resolver + + +# --------------------------------------------------------------------------- +# _validate_http_url returns the pinned IP +# --------------------------------------------------------------------------- + + +class TestValidatorReturnsPinnedIP: + def test_returns_first_public_ip(self, monkeypatch): + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('93.184.216.34')) + ip = _reader_mod._validate_http_url('https://example.com/cog.tif') + assert ip == '93.184.216.34' + + def test_returns_first_public_ipv6(self, monkeypatch): + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('2606:2800:220:1::1')) + ip = _reader_mod._validate_http_url('https://example.com/cog.tif') + assert ip == '2606:2800:220:1::1' + + def test_returns_none_when_escape_hatch_enabled(self, monkeypatch): + """With the escape hatch we skip resolution and skip pinning. + + Callers that opt into private hosts knowingly accept the looser + guarantee, and we don't want to force them to provide a literal + IP. Returning ``None`` tells ``_HTTPSource`` to fall back to + urllib3's default DNS path. + """ + monkeypatch.setenv('XRSPATIAL_GEOTIFF_ALLOW_PRIVATE_HOSTS', '1') + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('127.0.0.1')) + ip = _reader_mod._validate_http_url('http://127.0.0.1:8080/cog.tif') + assert ip is None + + def test_raises_when_any_ip_private(self, monkeypatch): + """Existing SSRF guarantee is preserved. + + If any resolved IP is private we still raise; we don't silently + pick a public one and pin to that. + """ + def _resolver(host, port, *args, **kwargs): + return [ + (socket.AF_INET, socket.SOCK_STREAM, 0, '', + ('8.8.8.8', port or 0)), + (socket.AF_INET, socket.SOCK_STREAM, 0, '', + ('127.0.0.1', port or 0)), + ] + monkeypatch.setattr(socket, 'getaddrinfo', _resolver) + with pytest.raises(UnsafeURLError): + _reader_mod._validate_http_url('http://attacker.test/x.tif') + + +# --------------------------------------------------------------------------- +# DNS rebinding defeated: the actual TCP connect targets the pinned IP +# --------------------------------------------------------------------------- + + +class TestPinnedConnectionTarget: + def test_init_records_pinned_ip(self, monkeypatch): + pytest.importorskip("urllib3") + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('93.184.216.34')) + src = _reader_mod._HTTPSource('https://example.com/cog.tif') + assert src._pinned_ip == '93.184.216.34' + + def test_rebind_does_not_reach_private_ip(self, monkeypatch): + """End-to-end rebinding test. + + Validator sees ``93.184.216.34`` (safe public IP). After that, + any further DNS resolution returns ``127.0.0.1`` (rebound to + loopback). The TCP connection must still target the validated + public IP. + + We intercept ``socket.create_connection`` to record the target + address rather than actually opening a socket. ``read_range`` + will fail when the mock returns no data; we only care that the + connection was attempted against the pinned IP. + """ + pytest.importorskip("urllib3") + + # First getaddrinfo call (validation) returns public IP. Every + # subsequent call returns the rebound private IP. + monkeypatch.setattr( + socket, 'getaddrinfo', + _switching_resolver(['93.184.216.34', '127.0.0.1'])) + + src = _reader_mod._HTTPSource('https://example.com/cog.tif') + assert src._pinned_ip == '93.184.216.34' + + # Capture every TCP target the pool tries to dial. We don't + # actually want to open sockets, so we raise after recording. + attempted: list[tuple] = [] + + class _StopConnect(OSError): + pass + + def _fake_create_connection(address, *args, **kwargs): + attempted.append(address) + raise _StopConnect("intercepted in test") + + # urllib3's HTTPConnection uses ``urllib3.util.connection. + # create_connection`` indirectly via _new_conn. Our pinned + # connection overrides _new_conn to call ``socket.create_ + # connection`` directly, so patching ``socket.create_connection`` + # is sufficient. + monkeypatch.setattr( + socket, 'create_connection', _fake_create_connection) + + # Calling read_range goes through the pinned pool. The mocked + # create_connection raises before any real network I/O. urllib3 + # wraps OSError in NewConnectionError / MaxRetryError; either + # way an exception bubbles up. + with pytest.raises(Exception): + src.read_range(0, 100) + + # The validated public IP was used as the TCP target, not the + # rebound private one. + assert attempted, "expected at least one TCP connect attempt" + target_ip = attempted[0][0] + assert target_ip == '93.184.216.34', ( + f"TCP connect went to {target_ip!r}, expected pinned " + f"public IP 93.184.216.34. DNS rebinding succeeded.") + assert target_ip != '127.0.0.1' + + def test_host_header_and_sni_preserved(self, monkeypatch): + """Host header (and TLS SNI for HTTPS) stay set to the original + hostname, not the IP literal. Required for HTTP virtual hosting + and TLS certificate verification. + """ + pytest.importorskip("urllib3") + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('93.184.216.34')) + src = _reader_mod._HTTPSource('https://example.com/cog.tif') + + # Build the pinned pool the same way _request would, and + # inspect a fresh connection's attributes. + pool = src._get_pinned_pool( + 'https', 'example.com', 443, '93.184.216.34') + conn = pool._new_conn() + try: + # ``host`` is what becomes the Host header (urllib3 uses + # ``self.host`` when building HTTP requests). + assert conn.host == 'example.com' + # The connection class carries the pinned IP. ``_new_conn`` + # dials this directly via ``socket.create_connection`` and + # never consults DNS again. + assert conn.pinned_ip == '93.184.216.34' + # ``server_hostname`` is the TLS SNI value the pool feeds + # into the connection at handshake time; must be the + # original hostname so cert verification works. The pool + # stashes pool-construction extras (anything not in the + # explicit kwarg list) in ``conn_kw``, which it splats into + # the connection constructor in ``_new_conn``. + assert conn.server_hostname == 'example.com' + # The freshly-built connection has not yet hit the wire, + # so cert validation hasn't run; we're checking the + # *configuration* that will be used. + finally: + conn.close() + + +# --------------------------------------------------------------------------- +# Redirects: each hop is independently validated and pinned +# --------------------------------------------------------------------------- + + +class _MockPoolResponse: + def __init__(self, status: int, location: str | None = None, + data: bytes = b''): + self.status = status + self.headers = {'Location': location} if location else {} + self.data = data + + +class _MockPool: + def __init__(self, script): + self.script = list(script) + self.calls = [] + + def request(self, method, url, **kwargs): + self.calls.append(url) + assert kwargs.get('redirect') is False + if self.script: + return self.script.pop(0) + return _MockPoolResponse(200, data=b'OK') + + +class TestRedirectRevalidates: + def test_redirect_to_safe_host_revalidates(self, monkeypatch): + """A redirect from safe-host -> also-safe re-runs validation on + the new hostname and pins the new IP. + """ + pytest.importorskip("urllib3") + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('93.184.216.34')) + src = _reader_mod._HTTPSource('https://safe-host.example.com/a.tif') + assert src._pinned_ip == '93.184.216.34' + + # Record every host the validator sees by hooking ``getaddrinfo``. + # The hop-by-hop redirect loop in ``_request`` calls + # ``_validate_http_url`` on each ``Location``, which in turn + # calls ``getaddrinfo`` once per host. The new hop is on a + # different hostname so it resolves to a different public IP + # (we script the resolver to switch on the second call). + seen_hosts: list[str] = [] + + def _tracking_resolver(host, port, *args, **kwargs): + seen_hosts.append(host) + # First host (safe-host.example.com) -> 93.184.216.34. + # Second host (also-safe.example.com) -> 1.1.1.1. + ip = '1.1.1.1' if 'also-safe' in host else '93.184.216.34' + return [(socket.AF_INET, socket.SOCK_STREAM, 0, '', + (ip, port or 0))] + monkeypatch.setattr(socket, 'getaddrinfo', _tracking_resolver) + + # Use a scripted mock pool so we don't touch the network. We + # override _pool_for_request so the redirect loop receives our + # mock for both hops. + mock_pool = _MockPool([ + _MockPoolResponse( + 302, + location='https://also-safe.example.com/b.tif'), + _MockPoolResponse(200, data=b'ok'), + ]) + + def _stub_pool_for_request(url, pinned_ip): + return mock_pool + + monkeypatch.setattr( + src, '_pool_for_request', _stub_pool_for_request) + + data = src.read_range(0, 10) + assert data == b'ok' + + # The Location host was resolved (i.e. re-validated). The + # initial host might not appear here because the constructor + # ran *before* the tracking resolver was installed. + assert any('also-safe.example.com' == h for h in seen_hosts), ( + f"Redirect target was not re-validated. Hosts seen by " + f"getaddrinfo during the request: {seen_hosts!r}") + # Both URLs were issued through the mock, confirming the loop + # walked both hops. + assert mock_pool.calls == [ + 'https://safe-host.example.com/a.tif', + 'https://also-safe.example.com/b.tif', + ] + + def test_redirect_to_private_still_rejected(self, monkeypatch): + """Pinning doesn't weaken the existing redirect-to-private guard.""" + pytest.importorskip("urllib3") + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('93.184.216.34')) + src = _reader_mod._HTTPSource('https://example.com/cog.tif') + + # Now the redirect target resolves to loopback. + monkeypatch.setattr( + socket, 'getaddrinfo', _ip_resolver('127.0.0.1')) + + # Use the existing _pool slot for mocking (matches the rest of + # the SSRF tests in this codebase). + from xrspatial.geotiff.tests.test_ssrf_hardening_1664 import ( + _MockPool as _SsrfMockPool, + _MockPoolResponse as _SsrfResp, + ) + src._pool = _SsrfMockPool([ + _SsrfResp(302, location='http://attacker.test/inner.tif'), + ]) + with pytest.raises(UnsafeURLError): + src.read_range(0, 100)