diff --git a/pylabrobot/io/sila/__init__.py b/pylabrobot/io/sila/__init__.py index f7a62876d22..18de249f6f5 100644 --- a/pylabrobot/io/sila/__init__.py +++ b/pylabrobot/io/sila/__init__.py @@ -1,4 +1,4 @@ -from pylabrobot.io.sila.discovery import SiLADevice, discover +from pylabrobot.io.sila.discovery import SiLADevice, discover, discover_iter from pylabrobot.io.sila.grpc import ( WIRE_32BIT, WIRE_64BIT, diff --git a/pylabrobot/io/sila/discovery.py b/pylabrobot/io/sila/discovery.py index 3670cd7fd1d..29c0fcb01b0 100644 --- a/pylabrobot/io/sila/discovery.py +++ b/pylabrobot/io/sila/discovery.py @@ -14,10 +14,14 @@ import asyncio import dataclasses import logging +import os +import re import socket import struct +import subprocess +import sys import xml.etree.ElementTree as ET -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, AsyncGenerator, Optional try: from zeroconf import ServiceBrowser, ServiceListener, Zeroconf @@ -66,6 +70,51 @@ def _get_link_local_interfaces() -> list[str]: return result +def _interface_name_for_ip_sync(ip: str) -> Optional[str]: + """Return the OS interface name (e.g. 'en13', 'eth0') bound to the given IP, or None. + + Uses ``ifconfig`` on macOS/BSD and ``ip`` on Linux. + """ + # Build a pattern that matches the IP as a whole token (not a substring). + # Escaped so 169.254.1.1 won't match 169.254.1.10. + ip_pattern = re.compile(r"(?= 2: + return parts[1].rstrip(":") + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.CalledProcessError): + pass + else: + # macOS/BSD: parse ifconfig + try: + out = subprocess.check_output( + ["ifconfig"], + stderr=subprocess.DEVNULL, + timeout=2, + ).decode(errors="replace") + current_iface: Optional[str] = None + for line in out.splitlines(): + if not line.startswith(("\t", " ")): + # Interface header, e.g. "en13: flags=..." + current_iface = line.split(":")[0] + elif ip_pattern.search(line) and current_iface is not None: + return current_iface + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.CalledProcessError): + pass + + return None + + # --------------------------------------------------------------------------- # SiLA 1 – NetBIOS name query + GetDeviceIdentification on port 8080 # --------------------------------------------------------------------------- @@ -143,7 +192,12 @@ async def _netbios_scan(interface: str, timeout: float = 3.0) -> dict[str, str]: # Use run_in_executor for sendto/recvfrom since the async loop equivalents # (loop.sock_sendto / loop.sock_recvfrom) require Python 3.11+. - await loop.run_in_executor(None, lambda: sock.sendto(_NBNS_WILDCARD_QUERY, (broadcast, 137))) + try: + await loop.run_in_executor(None, lambda: sock.sendto(_NBNS_WILDCARD_QUERY, (broadcast, 137))) + except OSError: + logger.debug("NetBIOS broadcast failed on %s", interface) + sock.close() + return results deadline = loop.time() + timeout while loop.time() < deadline: @@ -166,6 +220,166 @@ async def _netbios_scan(interface: str, timeout: float = 3.0) -> dict[str, str]: return results +async def _arp_scan(interface: str) -> dict[str, str]: + """Read the system ARP table for link-local hosts reachable via the same interface. + + This is a fallback for devices that don't respond to NetBIOS but are present + in the OS ARP cache (e.g. because they responded to a prior ARP request). + Returns a dict mapping IP -> hostname (or empty string if unknown). + + Works on macOS (``arp -an``), Linux (``/proc/net/arp``), and Windows (``arp -a``). + """ + if sys.platform == "linux": + return await _arp_scan_linux(interface) + elif sys.platform == "win32": + return await _arp_scan_windows(interface) + else: + return await _arp_scan_bsd(interface) + + +async def _arp_scan_bsd(interface: str) -> dict[str, str]: + """Parse ``arp -an`` output (macOS / BSD), filtering to entries on the correct interface. + + Example line:: + + ? (169.254.245.237) at 0:5:51:e:e5:7e on en13 [ethernet] + """ + # Resolve our IP to an interface name (e.g. "en13") so we can filter ARP entries. + loop = asyncio.get_running_loop() + iface_name = await loop.run_in_executor(None, _interface_name_for_ip_sync, interface) + if not iface_name: + logger.debug("could not resolve interface name for %s, skipping ARP scan", interface) + return {} + + try: + proc = await asyncio.create_subprocess_exec( + "arp", + "-an", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) + except (FileNotFoundError, asyncio.TimeoutError): + return {} + + results: dict[str, str] = {} + for line in stdout.decode(errors="replace").splitlines(): + if "incomplete" in line: + continue + if f"on {iface_name} " not in line and not line.endswith(f"on {iface_name}"): + continue + m = re.search(r"\((\d+\.\d+\.\d+\.\d+)\)", line) + if not m: + continue + ip = m.group(1) + if not ip.startswith("169.254."): + continue + if ip == interface: + continue + results[ip] = "" + + return results + + +async def _arp_scan_linux(interface: str) -> dict[str, str]: + """Parse ``/proc/net/arp`` (Linux). + + Example content:: + + IP address HW type Flags HW address Mask Device + 169.254.245.237 0x1 0x2 00:05:51:0e:e5:7e * eth0 + """ + if not os.path.exists("/proc/net/arp"): + # Fall back to arp -an on non-procfs Linux systems. + return await _arp_scan_bsd(interface) + + loop = asyncio.get_running_loop() + try: + + def _read(): + with open("/proc/net/arp") as f: + return f.read() + + text = await loop.run_in_executor(None, _read) + except OSError: + return {} + + # Determine the OS-level interface name for our IP so we can filter entries. + iface_name = await loop.run_in_executor(None, _interface_name_for_ip_sync, interface) + if not iface_name: + logger.debug("could not resolve interface name for %s, skipping ARP scan", interface) + return {} + + results: dict[str, str] = {} + for line in text.splitlines()[1:]: # skip header + parts = line.split() + if len(parts) < 6: + continue + ip, flags, device = parts[0], parts[2], parts[5] + if flags == "0x0": # incomplete entry + continue + if not ip.startswith("169.254."): + continue + if ip == interface: + continue + if device != iface_name: + continue + results[ip] = "" + + return results + + +async def _arp_scan_windows(interface: str) -> dict[str, str]: + """Parse ``arp -a`` output on Windows. + + Example output:: + + Interface: 169.254.229.18 --- 0x5 + Internet Address Physical Address Type + 169.254.245.237 00-05-51-0e-e5-7e dynamic + + Windows groups entries by interface, so we find the section matching our IP. + """ + try: + proc = await asyncio.create_subprocess_exec( + "arp", + "-a", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) + except (FileNotFoundError, asyncio.TimeoutError): + return {} + + results: dict[str, str] = {} + in_our_interface = False + for line in stdout.decode(errors="replace").splitlines(): + line = line.strip() + if not line: + in_our_interface = False + continue + # Detect interface header: "Interface: 169.254.229.18 --- 0x5" + if line.startswith("Interface:"): + in_our_interface = f" {interface} " in line + continue + if not in_our_interface: + continue + # Skip the column header line + if "Internet Address" in line or "Physical Address" in line: + continue + parts = line.split() + if len(parts) < 3: + continue + ip_addr = parts[0] + if not ip_addr.startswith("169.254."): + continue + if ip_addr == interface: + continue + results[ip_addr] = "" + + return results + + def _parse_device_identification(host: str, port: int, xml_bytes: bytes) -> Optional[SiLADevice]: """Parse a GetDeviceIdentification SOAP response.""" try: @@ -204,7 +418,10 @@ async def _get_device_identification( interface: Optional[str] = None, timeout: float = 3.0, ) -> Optional[SiLADevice]: - """Query a single host for SiLA 1 GetDeviceIdentification.""" + """Query a single host for SiLA 1 GetDeviceIdentification. + + The entire operation (connect + send + recv) is bounded by a single ``timeout`` deadline. + """ body = _SILA1_ID_SOAP.encode("utf-8") request = ( f"POST / HTTP/1.1\r\n" @@ -218,19 +435,23 @@ async def _get_device_identification( sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: - sock.settimeout(timeout) if interface: sock.bind((interface, 0)) sock.setblocking(False) loop = asyncio.get_running_loop() - await asyncio.wait_for(loop.sock_connect(sock, (host, port)), timeout=timeout) - await asyncio.wait_for(loop.sock_sendall(sock, request), timeout=timeout) + deadline = loop.time() + timeout + + def _remaining() -> float: + return max(0.01, deadline - loop.time()) + + await asyncio.wait_for(loop.sock_connect(sock, (host, port)), timeout=_remaining()) + await asyncio.wait_for(loop.sock_sendall(sock, request), timeout=_remaining()) resp = b"" while True: try: - chunk = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout) + chunk = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=_remaining()) if not chunk: break resp += chunk @@ -260,21 +481,41 @@ async def _discover_sila1( interface: Optional[str] = None, port: int = 8080, ) -> list[SiLADevice]: - """Discover SiLA 1 devices using NetBIOS broadcast + GetDeviceIdentification. + """Discover SiLA 1 devices using NetBIOS broadcast + ARP fallback + GetDeviceIdentification. - 1. Send a broadcast NetBIOS NBSTAT query to find live hosts on the link-local network. - 2. For each responder, query port 8080 with GetDeviceIdentification. + 1. Run NetBIOS scan and ARP table lookup in parallel to find live hosts. + 2. For each discovered host, query port 8080 with GetDeviceIdentification. """ if not interface: logger.debug("no interface provided for SiLA 1 discovery, skipping") return [] - hosts = await _netbios_scan(interface, timeout=min(timeout, 3.0)) + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + + # Run host discovery methods in parallel. + # Cap NetBIOS at 3s — any device that responds will do so within a second or two. + netbios_task = asyncio.ensure_future(_netbios_scan(interface, timeout=min(timeout, 3.0))) + arp_task = asyncio.ensure_future(_arp_scan(interface)) + scan_results = await asyncio.gather(netbios_task, arp_task, return_exceptions=True) + + hosts: dict[str, str] = {} + if isinstance(scan_results[0], dict): + hosts.update(scan_results[0]) + if isinstance(scan_results[1], dict): + for ip, name in scan_results[1].items(): + if ip not in hosts: + logger.debug("found %s via ARP (not NetBIOS)", ip) + hosts[ip] = name + if not hosts: return [] + remaining = max(0.01, deadline - loop.time()) devices: list[SiLADevice] = [] - coros = [_get_device_identification(ip, port, interface=interface, timeout=3.0) for ip in hosts] + coros = [ + _get_device_identification(ip, port, interface=interface, timeout=remaining) for ip in hosts + ] results = await asyncio.gather(*coros, return_exceptions=True) for r in results: if isinstance(r, SiLADevice): @@ -327,26 +568,23 @@ def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: # --------------------------------------------------------------------------- -async def discover( +async def discover_iter( timeout: float = 5.0, interface: Optional[str] = None, -) -> list[SiLADevice]: - """Discover SiLA devices on the local network. - - Runs SiLA 1 (NetBIOS + GetDeviceIdentification) and SiLA 2 (mDNS) probes in parallel. +) -> AsyncGenerator[SiLADevice, None]: + """Async generator that yields :class:`SiLADevice` instances as they are found. - For SiLA 1, the ``interface`` parameter specifies which local IP to send NetBIOS broadcasts - from. If not provided, all link-local (169.254.x.x) interfaces are scanned automatically. + Runs SiLA 1 and SiLA 2 probes concurrently and yields each device immediately + upon discovery, without waiting for all probes to finish. Args: timeout: How long to listen for responses, in seconds. interface: Local IP address of the interface to use for SiLA 1 discovery. If None, auto-detects all link-local interfaces. - Returns: - List of discovered devices. + Yields: + SiLADevice instances as they are discovered. """ - if interface: interfaces = [interface] else: @@ -354,21 +592,52 @@ async def discover( if not interfaces: logger.debug("no link-local interfaces found, SiLA 1 discovery will be skipped") - coros: list = [_discover_sila1(timeout=timeout, interface=iface) for iface in interfaces] - coros.append(_discover_sila2(timeout)) - - results = await asyncio.gather(*coros, return_exceptions=True) + tasks = [ + asyncio.ensure_future(c) + for c in [_discover_sila1(timeout=timeout, interface=iface) for iface in interfaces] + + [_discover_sila2(timeout)] + ] seen: set[tuple[str, int]] = set() - devices: list[SiLADevice] = [] - for r in results: - if isinstance(r, list): - for d in r: - key = (d.host, d.port) - if key not in seen: - seen.add(key) - devices.append(d) - return devices + pending = set(tasks) + try: + while pending: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + for t in done: + try: + result = t.result() + except Exception: + continue + if isinstance(result, list): + for d in result: + key = (d.host, d.port) + if key not in seen: + seen.add(key) + yield d + finally: + for t in pending: + t.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) + + +async def discover( + timeout: float = 5.0, + interface: Optional[str] = None, +) -> list[SiLADevice]: + """Discover SiLA devices on the local network. + + Convenience wrapper around :func:`discover_iter` that collects all results into a list. + + Args: + timeout: How long to listen for responses, in seconds. + interface: Local IP address of the interface to use for SiLA 1 discovery. + If None, auto-detects all link-local interfaces. + + Returns: + List of discovered devices. + """ + return [d async for d in discover_iter(timeout=timeout, interface=interface)] if __name__ == "__main__": @@ -390,12 +659,15 @@ async def discover( ) args = parser.parse_args() - found = asyncio.run(discover(args.timeout, interface=args.interface)) - if not found: - print("No SiLA devices found.") - else: - for d in found: + async def main(): + found = False + async for d in discover_iter(args.timeout, interface=args.interface): + found = True parts = [d.host, str(d.port), d.name, f"SiLA {d.sila_version}"] if d.serial_number: parts.append(d.serial_number) print("\t".join(parts)) + if not found: + print("No SiLA devices found.") + + asyncio.run(main()) diff --git a/pylabrobot/io/sila/discovery_tests.py b/pylabrobot/io/sila/discovery_tests.py index 568f330727e..5f96552b8bc 100644 --- a/pylabrobot/io/sila/discovery_tests.py +++ b/pylabrobot/io/sila/discovery_tests.py @@ -2,11 +2,14 @@ import socket import struct import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch from pylabrobot.io.sila.discovery import ( HAS_ZEROCONF, SiLADevice, + _arp_scan_bsd, + _arp_scan_linux, + _arp_scan_windows, _decode_nbns_name, _discover_sila2, _parse_device_identification, @@ -132,6 +135,111 @@ def test_invalid_xml(self): self.assertIsNone(_parse_device_identification("10.0.0.1", 8080, b"not xml")) +class TestArpScanBsd(unittest.TestCase): + ARP_OUTPUT = ( + "? (169.254.245.237) at 0:5:51:e:e5:7e on en13 [ethernet]\n" + "? (192.168.0.1) at aa:bb:cc:dd:ee:ff on en0 ifscope [ethernet]\n" + "? (169.254.10.20) at (incomplete) on en13 [ethernet]\n" + "? (169.254.99.1) at 11:22:33:44:55:66 on en13 [ethernet]\n" + "? (169.254.50.50) at 22:33:44:55:66:77 on en7 [ethernet]\n" + ) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value="en13") + @patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) + def test_parses_link_local_entries(self, mock_exec, _mock_iface): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (self.ARP_OUTPUT.encode(), b"") + mock_exec.return_value = mock_proc + + results = asyncio.run(_arp_scan_bsd("169.254.229.18")) + self.assertIn("169.254.245.237", results) + self.assertIn("169.254.99.1", results) + # Non-link-local should be excluded + self.assertNotIn("192.168.0.1", results) + # Incomplete entries should be excluded + self.assertNotIn("169.254.10.20", results) + # Our own interface IP should be excluded + self.assertNotIn("169.254.229.18", results) + # Entry on a different interface should be excluded + self.assertNotIn("169.254.50.50", results) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value="en13") + @patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) + def test_empty_output(self, mock_exec, _mock_iface): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (b"", b"") + mock_exec.return_value = mock_proc + + results = asyncio.run(_arp_scan_bsd("169.254.229.18")) + self.assertEqual(results, {}) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value=None) + def test_returns_empty_when_interface_unknown(self, _mock_iface): + """If we can't resolve the interface name, return empty rather than all entries.""" + results = asyncio.run(_arp_scan_bsd("169.254.229.18")) + self.assertEqual(results, {}) + + +class TestArpScanLinux(unittest.TestCase): + PROC_NET_ARP = ( + "IP address HW type Flags HW address Mask Device\n" + "169.254.245.237 0x1 0x2 00:05:51:0e:e5:7e * eth0\n" + "192.168.1.1 0x1 0x2 aa:bb:cc:dd:ee:ff * eth1\n" + "169.254.10.20 0x1 0x0 00:00:00:00:00:00 * eth0\n" + ) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value="eth0") + @patch("os.path.exists", return_value=True) + def test_parses_proc_net_arp(self, _mock_exists, _mock_iface): + with patch( + "builtins.open", + MagicMock( + return_value=MagicMock( + __enter__=lambda s: s, + __exit__=MagicMock(return_value=False), + read=MagicMock(return_value=self.PROC_NET_ARP), + ) + ), + ): + results = asyncio.run(_arp_scan_linux("169.254.229.18")) + + self.assertIn("169.254.245.237", results) + # Non-link-local excluded + self.assertNotIn("192.168.1.1", results) + # Incomplete (flags=0x0) excluded + self.assertNotIn("169.254.10.20", results) + + +class TestArpScanWindows(unittest.TestCase): + ARP_OUTPUT = ( + "\r\n" + "Interface: 169.254.229.18 --- 0x5\r\n" + " Internet Address Physical Address Type\r\n" + " 169.254.245.237 00-05-51-0e-e5-7e dynamic\r\n" + " 169.254.10.20 00-aa-bb-cc-dd-ee dynamic\r\n" + "\r\n" + "Interface: 192.168.0.100 --- 0x3\r\n" + " Internet Address Physical Address Type\r\n" + " 192.168.0.1 aa-bb-cc-dd-ee-ff dynamic\r\n" + " 169.254.99.1 11-22-33-44-55-66 dynamic\r\n" + ) + + @patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) + def test_parses_correct_interface_section(self, mock_exec): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (self.ARP_OUTPUT.encode(), b"") + mock_exec.return_value = mock_proc + + results = asyncio.run(_arp_scan_windows("169.254.229.18")) + self.assertIn("169.254.245.237", results) + self.assertIn("169.254.10.20", results) + # This is under a different interface section + self.assertNotIn("169.254.99.1", results) + self.assertNotIn("192.168.0.1", results) + # Our own IP should be excluded + self.assertNotIn("169.254.229.18", results) + + class TestDiscoverSila2(unittest.TestCase): @patch("pylabrobot.io.sila.discovery.HAS_ZEROCONF", False) def test_no_zeroconf_returns_empty(self):