Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ This example shows:
:language: python
:caption: examples/tag_only/tag_only_examples.py

Resolver Utility Examples
--------------------------

The `examples/resolver_utils/` directory demonstrates the utility functions ported from go-multiaddr-dns for working with DNS-based multiaddr resolution.

This example shows:
- Checking for DNS components with ``matches()``
- FQDN detection and normalization with ``is_fqdn()`` and ``fqdn()``
- Counting protocol components with ``addr_len()``
- Removing leading components with ``offset_addr()``

.. literalinclude:: ../examples/resolver_utils/resolver_utils_example.py
:language: python
:caption: examples/resolver_utils/resolver_utils_example.py

Running the Examples
--------------------

Expand Down Expand Up @@ -145,4 +160,7 @@ All examples can be run directly with Python:
# Tag-only protocol examples
python examples/tag_only/tag_only_examples.py

# Resolver utility examples
python examples/resolver_utils/resolver_utils_example.py

Note: Some examples require network connectivity and may take a few seconds to complete due to DNS resolution.
90 changes: 90 additions & 0 deletions examples/resolver_utils/resolver_utils_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/usr/bin/env python3
"""
Resolver utility function examples for py-multiaddr.

This script demonstrates the utility functions available in
``multiaddr.resolvers`` that were ported from go-multiaddr-dns.

## Overview

1. **matches()**: Check if a multiaddr contains DNS components
2. **is_fqdn() / fqdn()**: FQDN detection and normalization
3. **addr_len()**: Count protocol components in a multiaddr
4. **offset_addr()**: Remove leading protocol components
5. **resolve_all()**: Iteratively resolve all DNS components

## Expected Output

When you run this script, you should see output similar to:

```
Resolver Utility Examples
==================================================

=== matches() ===
/dns4/example.com/tcp/80 -> True (contains dns4)
/ip4/127.0.0.1/tcp/80 -> False (no DNS component)
/dnsaddr/bootstrap.libp2p.io -> True (contains dnsaddr)

=== is_fqdn() / fqdn() ===
is_fqdn("example.com") -> False
is_fqdn("example.com.") -> True
fqdn("example.com") -> example.com.
fqdn("example.com.") -> example.com.

=== addr_len() ===
/ip4/127.0.0.1 -> 1 component(s)
/ip4/127.0.0.1/tcp/80 -> 2 component(s)
/ip4/1.2.3.4/udp/9/quic -> 3 component(s)

=== offset_addr() ===
offset_addr(/ip4/127.0.0.1/tcp/80/http, 0) -> /ip4/127.0.0.1/tcp/80/http
offset_addr(/ip4/127.0.0.1/tcp/80/http, 1) -> /tcp/80/http
offset_addr(/ip4/127.0.0.1/tcp/80/http, 2) -> /http
offset_addr(/ip4/127.0.0.1/tcp/80/http, 3) -> /
```
"""

from multiaddr import Multiaddr
from multiaddr.resolvers import addr_len, fqdn, is_fqdn, matches, offset_addr


def main():
print("Resolver Utility Examples")
print("=" * 50)

# --- matches() ---
print("\n=== matches() ===")
examples = [
("/dns4/example.com/tcp/80", "contains dns4"),
("/ip4/127.0.0.1/tcp/80", "no DNS component"),
("/dnsaddr/bootstrap.libp2p.io", "contains dnsaddr"),
]
for addr_str, desc in examples:
ma = Multiaddr(addr_str)
result = matches(ma)
print(f" {addr_str:<35} -> {result!s:<6} ({desc})")

# --- is_fqdn() / fqdn() ---
print("\n=== is_fqdn() / fqdn() ===")
for domain in ("example.com", "example.com."):
print(f' is_fqdn("{domain}") -> {is_fqdn(domain)}')
for domain in ("example.com", "example.com."):
print(f' fqdn("{domain}") -> {fqdn(domain)}')

# --- addr_len() ---
print("\n=== addr_len() ===")
for addr_str in ("/ip4/127.0.0.1", "/ip4/127.0.0.1/tcp/80", "/ip4/1.2.3.4/udp/9/quic"):
ma = Multiaddr(addr_str)
print(f" {addr_str:<30} -> {addr_len(ma)} component(s)")

# --- offset_addr() ---
print("\n=== offset_addr() ===")
ma = Multiaddr("/ip4/127.0.0.1/tcp/80/http")
for n in range(4):
result = offset_addr(ma, n)
print(f" offset_addr({ma}, {n}) -> {result}")


if __name__ == "__main__":
main()
4 changes: 2 additions & 2 deletions multiaddr/codecs/certhash.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any
from typing import Any, cast

import multibase
import multihash
Expand Down Expand Up @@ -54,7 +54,7 @@ def to_bytes(self, proto: Any, string: str) -> bytes:
"""
try:
# Decode the multibase string to get the raw multihash bytes.
decoded_bytes = multibase.decode(string)
decoded_bytes = cast(bytes, multibase.decode(string))
except Exception as e:
raise ValueError(f"Failed to decode multibase string: {string}") from e

Expand Down
15 changes: 13 additions & 2 deletions multiaddr/resolvers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
"""DNS resolution support for multiaddr."""

from .base import Resolver
from .dns import DNSResolver
from .dns import DNSADDR_TXT_PREFIX, DNSResolver
from .util import addr_len, fqdn, is_fqdn, matches, offset_addr, resolve_all

__all__ = ["DNSResolver", "Resolver"]
__all__ = [
"DNSADDR_TXT_PREFIX",
"DNSResolver",
"Resolver",
"addr_len",
"fqdn",
"is_fqdn",
"matches",
"offset_addr",
"resolve_all",
]
9 changes: 6 additions & 3 deletions multiaddr/resolvers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
from ..protocols import P_DNS, P_DNS4, P_DNS6, P_DNSADDR, Protocol
from .base import Resolver

DNSADDR_TXT_PREFIX = "dnsaddr="
"""Prefix for dnsaddr TXT records (e.g. ``dnsaddr=/ip4/...``)."""


class DNSResolver(Resolver):
"""
Expand Down Expand Up @@ -226,8 +229,8 @@ async def _query_dnsaddr_txt_records(
else:
txt_data = str(txt_data_raw)
logging.debug(f"{indent} TXT: {txt_data}")
if txt_data.startswith("dnsaddr="):
multiaddr_str = txt_data[8:]
if txt_data.startswith(DNSADDR_TXT_PREFIX):
multiaddr_str = txt_data[len(DNSADDR_TXT_PREFIX) :]
multiaddr_str = self._clean_quotes(multiaddr_str).strip()
logging.debug(f"{indent} Parsed multiaddr: {multiaddr_str}")
if not multiaddr_str:
Expand Down Expand Up @@ -405,4 +408,4 @@ async def _resolve_dns_with_stack(
return results


__all__ = ["DNSResolver"]
__all__ = ["DNSADDR_TXT_PREFIX", "DNSResolver"]
173 changes: 173 additions & 0 deletions multiaddr/resolvers/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""Utility functions for multiaddr resolution."""

from __future__ import annotations

from typing import TYPE_CHECKING

from ..exceptions import RecursionLimitError
from ..multiaddr import Multiaddr
from ..protocols import P_DNS, P_DNS4, P_DNS6, P_DNSADDR

if TYPE_CHECKING:
from .base import Resolver

__all__ = ["addr_len", "fqdn", "is_fqdn", "matches", "offset_addr", "resolve_all"]

_DNS_PROTOCOLS = frozenset({P_DNS, P_DNS4, P_DNS6, P_DNSADDR})


def is_fqdn(s: str) -> bool:
"""Check if string is a fully qualified domain name (ends with unescaped dot).

Args:
s: The domain name string to check.

Returns:
``True`` if *s* ends with an unescaped ``'.'``, ``False`` otherwise.
"""
if not s:
return False
# Count trailing backslashes before the final character
if s[-1] != ".":
return False
# Check if the trailing dot is escaped
num_backslashes = 0
for i in range(len(s) - 2, -1, -1):
if s[i] == "\\":
num_backslashes += 1
else:
break
# Odd number of backslashes means the dot is escaped
return num_backslashes % 2 == 0


def fqdn(s: str) -> str:
"""Append a trailing dot to *s* if it is not already a FQDN.

Args:
s: The domain name string.

Returns:
The domain name with a trailing ``'.'`` appended if needed.

Example::

>>> fqdn("example.com")
'example.com.'
>>> fqdn("example.com.")
'example.com.'
"""
if is_fqdn(s):
return s
return s + "."


def addr_len(maddr: Multiaddr) -> int:
"""Count the number of protocol components in a multiaddr.

Args:
maddr: The multiaddr to measure.

Returns:
The number of protocol components.
"""
return len(list(maddr.protocols()))


def offset_addr(maddr: Multiaddr, n: int) -> Multiaddr:
"""Return a new multiaddr with the first *n* protocol components removed.

Args:
maddr: The source multiaddr.
n: Number of leading components to skip. Must be >= 0.

Returns:
A new :class:`~multiaddr.Multiaddr` without the first *n* components,
or ``Multiaddr("/")`` if *n* >= the total number of components.

Raises:
ValueError: If *n* is negative.
"""
if n < 0:
raise ValueError(f"offset must be non-negative, got {n}")
parts = maddr.split(n)
if len(parts) <= n:
return Multiaddr("/")
return parts[n]


def matches(maddr: Multiaddr) -> bool:
"""Check if a multiaddr contains any DNS protocol component.

Args:
maddr: The multiaddr to inspect.

Returns:
``True`` if *maddr* contains a ``dns``, ``dns4``, ``dns6``, or
``dnsaddr`` component.

Example::

>>> matches(Multiaddr("/dns4/example.com/tcp/80"))
True
>>> matches(Multiaddr("/ip4/127.0.0.1/tcp/80"))
False
"""
return any(p.code in _DNS_PROTOCOLS for p in maddr.protocols())


async def resolve_all(
resolver: Resolver,
maddr: Multiaddr,
*,
max_iterations: int = 32,
) -> list[Multiaddr]:
"""Resolve all DNS components in a multiaddr iteratively.

Calls ``resolver.resolve()`` in a loop until every returned address is
free of DNS components.

Args:
resolver: A resolver instance implementing an async ``resolve()`` method.
maddr: The multiaddr to resolve.
max_iterations: Safety limit on resolution rounds to prevent infinite
loops (default ``32``).

Returns:
A list of fully-resolved :class:`~multiaddr.Multiaddr` instances.

Raises:
RecursionLimitError: If DNS components remain after *max_iterations*
rounds.

Example::

>>> import trio
>>> result = trio.run(resolve_all, my_resolver, Multiaddr("/dns4/example.com/tcp/80"))
"""
queue = [maddr]
resolved: list[Multiaddr] = []

for _ in range(max_iterations):
if not queue:
break
next_queue: list[Multiaddr] = []
for addr in queue:
if not matches(addr):
resolved.append(addr)
continue
results = await resolver.resolve(addr)
for r in results:
if matches(r):
next_queue.append(r)
else:
resolved.append(r)
queue = next_queue

if queue:
raise RecursionLimitError(
f"resolve_all exceeded {max_iterations} iterations; "
f"{len(queue)} addresses still contain DNS components"
)

return resolved
1 change: 1 addition & 0 deletions newsfragments/101.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add resolver utility functions ported from go-multiaddr-dns: ``matches()``, ``resolve_all()``, ``is_fqdn()``, ``fqdn()``, ``addr_len()``, and ``offset_addr()``. These utilities support DNS-based multiaddr resolution workflows including iterative resolution and FQDN handling.
5 changes: 3 additions & 2 deletions tests/test_multiaddr.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@
"/garlic32/566niximlxdzpanmn4qouucvua3k7neniwss47li5r6ugoertzu:80",
"/garlic32/566niximlxdzpanmn4qouucvua3k7neniwss47li5r6ugoertzuq:-1",
"/garlic32/566niximlxdzpanmn4qouucvua3k7neniwss47li5r6ugoertzu@",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmpy/certhash/zQmbWTwYGcmdyK9CYfNBcfs9nhZs17a6FQ4Y8oea278xx41",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/b2uaraocy6yrdblb4sfptaddgimjmmp",
"/udp/1234/sctp",
"/udp/1234/udt/1234",
"/udp/1234/utp/1234",
Expand Down Expand Up @@ -142,6 +141,8 @@ def test_invalid(addr_str):
"/ip4/127.0.0.1/tcp/127/webrtc",
"/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g"
"/ip4/127.0.0.1/udp/9090/webrtc-direct/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw",
"/ip4/127.0.0.1/udp/1234/quic-v1/webtransport/certhash/u1QEQOFj2IjCsPJFfMAxmQxLGPw/certhash/uEiDDq4_xNyDorZBH3TlGazyJdOWSwvo4PUo5YHFMrvDE8g",
],
) # nopep8
def test_valid(addr_str):
Expand Down
Loading
Loading