From c2b822c2e28c143f23b089d99863908e137195da Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Mon, 16 Mar 2026 12:22:48 -0700 Subject: [PATCH 1/2] Add ARP fallback and streaming to SiLA device discovery SiLA 1 discovery relied solely on NetBIOS broadcast, which many devices don't respond to. Add an ARP table fallback (macOS, Linux, Windows) that runs in parallel with NetBIOS so devices are found even when they only have an ARP cache entry. Also fix several issues in the discovery pipeline: - Use `arp -an` instead of `arp -a` to avoid 35s DNS lookup stalls - Use deadline-based timeouts in GetDeviceIdentification instead of per-operation timeouts that could compound - Catch OSError from NetBIOS broadcast sendto - Filter ARP entries by interface to prevent cross-interface leaks - Guard all IP/interface matching against substring false positives Add `discover_iter` async generator that yields devices as they are found instead of waiting for all probes to complete. Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/io/sila/__init__.py | 2 +- pylabrobot/io/sila/discovery.py | 346 +++++++++++++++++++++++--- pylabrobot/io/sila/discovery_tests.py | 110 +++++++- 3 files changed, 416 insertions(+), 42 deletions(-) diff --git a/pylabrobot/io/sila/__init__.py b/pylabrobot/io/sila/__init__.py index f7a62876d22..18de249f6f5 100644 --- a/pylabrobot/io/sila/__init__.py +++ b/pylabrobot/io/sila/__init__.py @@ -1,4 +1,4 @@ -from pylabrobot.io.sila.discovery import SiLADevice, discover +from pylabrobot.io.sila.discovery import SiLADevice, discover, discover_iter from pylabrobot.io.sila.grpc import ( WIRE_32BIT, WIRE_64BIT, diff --git a/pylabrobot/io/sila/discovery.py b/pylabrobot/io/sila/discovery.py index 3670cd7fd1d..be6eccb665a 100644 --- a/pylabrobot/io/sila/discovery.py +++ b/pylabrobot/io/sila/discovery.py @@ -14,10 +14,14 @@ import asyncio import dataclasses import logging +import os +import re import socket import struct +import subprocess +import sys import xml.etree.ElementTree as ET -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING, AsyncGenerator, Optional try: from zeroconf import ServiceBrowser, ServiceListener, Zeroconf @@ -66,6 +70,51 @@ def _get_link_local_interfaces() -> list[str]: return result +def _interface_name_for_ip_sync(ip: str) -> Optional[str]: + """Return the OS interface name (e.g. 'en13', 'eth0') bound to the given IP, or None. + + Uses ``ifconfig`` on macOS/BSD and ``ip`` on Linux. + """ + # Build a pattern that matches the IP as a whole token (not a substring). + # Escaped so 169.254.1.1 won't match 169.254.1.10. + ip_pattern = re.compile(r"(?= 2: + return parts[1].rstrip(":") + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + else: + # macOS/BSD: parse ifconfig + try: + out = subprocess.check_output( + ["ifconfig"], + stderr=subprocess.DEVNULL, + timeout=2, + ).decode(errors="replace") + current_iface: Optional[str] = None + for line in out.splitlines(): + if not line.startswith(("\t", " ")): + # Interface header, e.g. "en13: flags=..." + current_iface = line.split(":")[0] + elif ip_pattern.search(line) and current_iface is not None: + return current_iface + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + + return None + + # --------------------------------------------------------------------------- # SiLA 1 – NetBIOS name query + GetDeviceIdentification on port 8080 # --------------------------------------------------------------------------- @@ -143,7 +192,12 @@ async def _netbios_scan(interface: str, timeout: float = 3.0) -> dict[str, str]: # Use run_in_executor for sendto/recvfrom since the async loop equivalents # (loop.sock_sendto / loop.sock_recvfrom) require Python 3.11+. - await loop.run_in_executor(None, lambda: sock.sendto(_NBNS_WILDCARD_QUERY, (broadcast, 137))) + try: + await loop.run_in_executor(None, lambda: sock.sendto(_NBNS_WILDCARD_QUERY, (broadcast, 137))) + except OSError: + logger.debug("NetBIOS broadcast failed on %s", interface) + sock.close() + return results deadline = loop.time() + timeout while loop.time() < deadline: @@ -166,6 +220,166 @@ async def _netbios_scan(interface: str, timeout: float = 3.0) -> dict[str, str]: return results +async def _arp_scan(interface: str) -> dict[str, str]: + """Read the system ARP table for link-local hosts reachable via the same interface. + + This is a fallback for devices that don't respond to NetBIOS but are present + in the OS ARP cache (e.g. because they responded to a prior ARP request). + Returns a dict mapping IP -> hostname (or empty string if unknown). + + Works on macOS (``arp -an``), Linux (``/proc/net/arp``), and Windows (``arp -a``). + """ + if sys.platform == "linux": + return await _arp_scan_linux(interface) + elif sys.platform == "win32": + return await _arp_scan_windows(interface) + else: + return await _arp_scan_bsd(interface) + + +async def _arp_scan_bsd(interface: str) -> dict[str, str]: + """Parse ``arp -an`` output (macOS / BSD), filtering to entries on the correct interface. + + Example line:: + + ? (169.254.245.237) at 0:5:51:e:e5:7e on en13 [ethernet] + """ + # Resolve our IP to an interface name (e.g. "en13") so we can filter ARP entries. + loop = asyncio.get_running_loop() + iface_name = await loop.run_in_executor(None, _interface_name_for_ip_sync, interface) + if not iface_name: + logger.debug("could not resolve interface name for %s, skipping ARP scan", interface) + return {} + + try: + proc = await asyncio.create_subprocess_exec( + "arp", + "-an", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + stdout, _ = await proc.communicate() + except FileNotFoundError: + return {} + + results: dict[str, str] = {} + for line in stdout.decode(errors="replace").splitlines(): + if "incomplete" in line: + continue + if f"on {iface_name} " not in line: + continue + m = re.search(r"\((\d+\.\d+\.\d+\.\d+)\)", line) + if not m: + continue + ip = m.group(1) + if not ip.startswith("169.254."): + continue + if ip == interface: + continue + results[ip] = "" + + return results + + +async def _arp_scan_linux(interface: str) -> dict[str, str]: + """Parse ``/proc/net/arp`` (Linux). + + Example content:: + + IP address HW type Flags HW address Mask Device + 169.254.245.237 0x1 0x2 00:05:51:0e:e5:7e * eth0 + """ + if not os.path.exists("/proc/net/arp"): + # Fall back to arp -an on non-procfs Linux systems. + return await _arp_scan_bsd(interface) + + loop = asyncio.get_running_loop() + try: + + def _read(): + with open("/proc/net/arp") as f: + return f.read() + + text = await loop.run_in_executor(None, _read) + except OSError: + return {} + + # Determine the OS-level interface name for our IP so we can filter entries. + iface_name = await loop.run_in_executor(None, _interface_name_for_ip_sync, interface) + if not iface_name: + logger.debug("could not resolve interface name for %s, skipping ARP scan", interface) + return {} + + results: dict[str, str] = {} + for line in text.splitlines()[1:]: # skip header + parts = line.split() + if len(parts) < 6: + continue + ip, flags, device = parts[0], parts[2], parts[5] + if flags == "0x0": # incomplete entry + continue + if not ip.startswith("169.254."): + continue + if ip == interface: + continue + if device != iface_name: + continue + results[ip] = "" + + return results + + +async def _arp_scan_windows(interface: str) -> dict[str, str]: + """Parse ``arp -a`` output on Windows. + + Example output:: + + Interface: 169.254.229.18 --- 0x5 + Internet Address Physical Address Type + 169.254.245.237 00-05-51-0e-e5-7e dynamic + + Windows groups entries by interface, so we find the section matching our IP. + """ + try: + proc = await asyncio.create_subprocess_exec( + "arp", + "-a", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + stdout, _ = await proc.communicate() + except FileNotFoundError: + return {} + + results: dict[str, str] = {} + in_our_interface = False + for line in stdout.decode(errors="replace").splitlines(): + line = line.strip() + if not line: + in_our_interface = False + continue + # Detect interface header: "Interface: 169.254.229.18 --- 0x5" + if line.startswith("Interface:"): + in_our_interface = f" {interface} " in line + continue + if not in_our_interface: + continue + # Skip the column header line + if "Internet Address" in line or "Physical Address" in line: + continue + parts = line.split() + if len(parts) < 3: + continue + ip_addr = parts[0] + if not ip_addr.startswith("169.254."): + continue + if ip_addr == interface: + continue + results[ip_addr] = "" + + return results + + def _parse_device_identification(host: str, port: int, xml_bytes: bytes) -> Optional[SiLADevice]: """Parse a GetDeviceIdentification SOAP response.""" try: @@ -204,7 +418,10 @@ async def _get_device_identification( interface: Optional[str] = None, timeout: float = 3.0, ) -> Optional[SiLADevice]: - """Query a single host for SiLA 1 GetDeviceIdentification.""" + """Query a single host for SiLA 1 GetDeviceIdentification. + + The entire operation (connect + send + recv) is bounded by a single ``timeout`` deadline. + """ body = _SILA1_ID_SOAP.encode("utf-8") request = ( f"POST / HTTP/1.1\r\n" @@ -218,19 +435,23 @@ async def _get_device_identification( sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: - sock.settimeout(timeout) if interface: sock.bind((interface, 0)) sock.setblocking(False) loop = asyncio.get_running_loop() - await asyncio.wait_for(loop.sock_connect(sock, (host, port)), timeout=timeout) - await asyncio.wait_for(loop.sock_sendall(sock, request), timeout=timeout) + deadline = loop.time() + timeout + + def _remaining() -> float: + return max(0.01, deadline - loop.time()) + + await asyncio.wait_for(loop.sock_connect(sock, (host, port)), timeout=_remaining()) + await asyncio.wait_for(loop.sock_sendall(sock, request), timeout=_remaining()) resp = b"" while True: try: - chunk = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=timeout) + chunk = await asyncio.wait_for(loop.sock_recv(sock, 4096), timeout=_remaining()) if not chunk: break resp += chunk @@ -260,21 +481,37 @@ async def _discover_sila1( interface: Optional[str] = None, port: int = 8080, ) -> list[SiLADevice]: - """Discover SiLA 1 devices using NetBIOS broadcast + GetDeviceIdentification. + """Discover SiLA 1 devices using NetBIOS broadcast + ARP fallback + GetDeviceIdentification. - 1. Send a broadcast NetBIOS NBSTAT query to find live hosts on the link-local network. - 2. For each responder, query port 8080 with GetDeviceIdentification. + 1. Run NetBIOS scan and ARP table lookup in parallel to find live hosts. + 2. For each discovered host, query port 8080 with GetDeviceIdentification. """ if not interface: logger.debug("no interface provided for SiLA 1 discovery, skipping") return [] - hosts = await _netbios_scan(interface, timeout=min(timeout, 3.0)) + # Run host discovery methods in parallel. + # Cap NetBIOS at 3s — any device that responds will do so within a second or two. + netbios_task = asyncio.ensure_future(_netbios_scan(interface, timeout=min(timeout, 3.0))) + arp_task = asyncio.ensure_future(_arp_scan(interface)) + scan_results = await asyncio.gather(netbios_task, arp_task, return_exceptions=True) + + hosts: dict[str, str] = {} + if isinstance(scan_results[0], dict): + hosts.update(scan_results[0]) + if isinstance(scan_results[1], dict): + for ip, name in scan_results[1].items(): + if ip not in hosts: + logger.debug("found %s via ARP (not NetBIOS)", ip) + hosts[ip] = name + if not hosts: return [] devices: list[SiLADevice] = [] - coros = [_get_device_identification(ip, port, interface=interface, timeout=3.0) for ip in hosts] + coros = [ + _get_device_identification(ip, port, interface=interface, timeout=timeout) for ip in hosts + ] results = await asyncio.gather(*coros, return_exceptions=True) for r in results: if isinstance(r, SiLADevice): @@ -327,26 +564,23 @@ def update_service(self, zc: Zeroconf, type_: str, name: str) -> None: # --------------------------------------------------------------------------- -async def discover( +async def discover_iter( timeout: float = 5.0, interface: Optional[str] = None, -) -> list[SiLADevice]: - """Discover SiLA devices on the local network. - - Runs SiLA 1 (NetBIOS + GetDeviceIdentification) and SiLA 2 (mDNS) probes in parallel. +) -> AsyncGenerator[SiLADevice, None]: + """Async generator that yields :class:`SiLADevice` instances as they are found. - For SiLA 1, the ``interface`` parameter specifies which local IP to send NetBIOS broadcasts - from. If not provided, all link-local (169.254.x.x) interfaces are scanned automatically. + Runs SiLA 1 and SiLA 2 probes concurrently and yields each device immediately + upon discovery, without waiting for all probes to finish. Args: timeout: How long to listen for responses, in seconds. interface: Local IP address of the interface to use for SiLA 1 discovery. If None, auto-detects all link-local interfaces. - Returns: - List of discovered devices. + Yields: + SiLADevice instances as they are discovered. """ - if interface: interfaces = [interface] else: @@ -354,21 +588,50 @@ async def discover( if not interfaces: logger.debug("no link-local interfaces found, SiLA 1 discovery will be skipped") - coros: list = [_discover_sila1(timeout=timeout, interface=iface) for iface in interfaces] - coros.append(_discover_sila2(timeout)) - - results = await asyncio.gather(*coros, return_exceptions=True) + tasks = [ + asyncio.ensure_future(c) + for c in [_discover_sila1(timeout=timeout, interface=iface) for iface in interfaces] + + [_discover_sila2(timeout)] + ] seen: set[tuple[str, int]] = set() - devices: list[SiLADevice] = [] - for r in results: - if isinstance(r, list): - for d in r: - key = (d.host, d.port) - if key not in seen: - seen.add(key) - devices.append(d) - return devices + pending = set(tasks) + try: + while pending: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + for t in done: + try: + result = t.result() + except Exception: + continue + if isinstance(result, list): + for d in result: + key = (d.host, d.port) + if key not in seen: + seen.add(key) + yield d + finally: + for t in pending: + t.cancel() + + +async def discover( + timeout: float = 5.0, + interface: Optional[str] = None, +) -> list[SiLADevice]: + """Discover SiLA devices on the local network. + + Convenience wrapper around :func:`discover_iter` that collects all results into a list. + + Args: + timeout: How long to listen for responses, in seconds. + interface: Local IP address of the interface to use for SiLA 1 discovery. + If None, auto-detects all link-local interfaces. + + Returns: + List of discovered devices. + """ + return [d async for d in discover_iter(timeout=timeout, interface=interface)] if __name__ == "__main__": @@ -390,12 +653,15 @@ async def discover( ) args = parser.parse_args() - found = asyncio.run(discover(args.timeout, interface=args.interface)) - if not found: - print("No SiLA devices found.") - else: - for d in found: + async def main(): + found = False + async for d in discover_iter(args.timeout, interface=args.interface): + found = True parts = [d.host, str(d.port), d.name, f"SiLA {d.sila_version}"] if d.serial_number: parts.append(d.serial_number) print("\t".join(parts)) + if not found: + print("No SiLA devices found.") + + asyncio.run(main()) diff --git a/pylabrobot/io/sila/discovery_tests.py b/pylabrobot/io/sila/discovery_tests.py index f6f8a7707f3..71cd6667365 100644 --- a/pylabrobot/io/sila/discovery_tests.py +++ b/pylabrobot/io/sila/discovery_tests.py @@ -2,10 +2,13 @@ import socket import struct import unittest -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch from pylabrobot.io.sila.discovery import ( SiLADevice, + _arp_scan_bsd, + _arp_scan_linux, + _arp_scan_windows, _decode_nbns_name, _discover_sila2, _parse_device_identification, @@ -131,6 +134,111 @@ def test_invalid_xml(self): self.assertIsNone(_parse_device_identification("10.0.0.1", 8080, b"not xml")) +class TestArpScanBsd(unittest.TestCase): + ARP_OUTPUT = ( + "? (169.254.245.237) at 0:5:51:e:e5:7e on en13 [ethernet]\n" + "? (192.168.0.1) at aa:bb:cc:dd:ee:ff on en0 ifscope [ethernet]\n" + "? (169.254.10.20) at (incomplete) on en13 [ethernet]\n" + "? (169.254.99.1) at 11:22:33:44:55:66 on en13 [ethernet]\n" + "? (169.254.50.50) at 22:33:44:55:66:77 on en7 [ethernet]\n" + ) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value="en13") + @patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) + def test_parses_link_local_entries(self, mock_exec, _mock_iface): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (self.ARP_OUTPUT.encode(), b"") + mock_exec.return_value = mock_proc + + results = asyncio.run(_arp_scan_bsd("169.254.229.18")) + self.assertIn("169.254.245.237", results) + self.assertIn("169.254.99.1", results) + # Non-link-local should be excluded + self.assertNotIn("192.168.0.1", results) + # Incomplete entries should be excluded + self.assertNotIn("169.254.10.20", results) + # Our own interface IP should be excluded + self.assertNotIn("169.254.229.18", results) + # Entry on a different interface should be excluded + self.assertNotIn("169.254.50.50", results) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value="en13") + @patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) + def test_empty_output(self, mock_exec, _mock_iface): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (b"", b"") + mock_exec.return_value = mock_proc + + results = asyncio.run(_arp_scan_bsd("169.254.229.18")) + self.assertEqual(results, {}) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value=None) + def test_returns_empty_when_interface_unknown(self, _mock_iface): + """If we can't resolve the interface name, return empty rather than all entries.""" + results = asyncio.run(_arp_scan_bsd("169.254.229.18")) + self.assertEqual(results, {}) + + +class TestArpScanLinux(unittest.TestCase): + PROC_NET_ARP = ( + "IP address HW type Flags HW address Mask Device\n" + "169.254.245.237 0x1 0x2 00:05:51:0e:e5:7e * eth0\n" + "192.168.1.1 0x1 0x2 aa:bb:cc:dd:ee:ff * eth1\n" + "169.254.10.20 0x1 0x0 00:00:00:00:00:00 * eth0\n" + ) + + @patch("pylabrobot.io.sila.discovery._interface_name_for_ip_sync", return_value="eth0") + @patch("os.path.exists", return_value=True) + def test_parses_proc_net_arp(self, _mock_exists, _mock_iface): + with patch( + "builtins.open", + MagicMock( + return_value=MagicMock( + __enter__=lambda s: s, + __exit__=MagicMock(return_value=False), + read=MagicMock(return_value=self.PROC_NET_ARP), + ) + ), + ): + results = asyncio.run(_arp_scan_linux("169.254.229.18")) + + self.assertIn("169.254.245.237", results) + # Non-link-local excluded + self.assertNotIn("192.168.1.1", results) + # Incomplete (flags=0x0) excluded + self.assertNotIn("169.254.10.20", results) + + +class TestArpScanWindows(unittest.TestCase): + ARP_OUTPUT = ( + "\r\n" + "Interface: 169.254.229.18 --- 0x5\r\n" + " Internet Address Physical Address Type\r\n" + " 169.254.245.237 00-05-51-0e-e5-7e dynamic\r\n" + " 169.254.10.20 00-aa-bb-cc-dd-ee dynamic\r\n" + "\r\n" + "Interface: 192.168.0.100 --- 0x3\r\n" + " Internet Address Physical Address Type\r\n" + " 192.168.0.1 aa-bb-cc-dd-ee-ff dynamic\r\n" + " 169.254.99.1 11-22-33-44-55-66 dynamic\r\n" + ) + + @patch("asyncio.create_subprocess_exec", new_callable=AsyncMock) + def test_parses_correct_interface_section(self, mock_exec): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (self.ARP_OUTPUT.encode(), b"") + mock_exec.return_value = mock_proc + + results = asyncio.run(_arp_scan_windows("169.254.229.18")) + self.assertIn("169.254.245.237", results) + self.assertIn("169.254.10.20", results) + # This is under a different interface section + self.assertNotIn("169.254.99.1", results) + self.assertNotIn("192.168.0.1", results) + # Our own IP should be excluded + self.assertNotIn("169.254.229.18", results) + + class TestDiscoverSila2(unittest.TestCase): @patch("pylabrobot.io.sila.discovery.HAS_ZEROCONF", False) def test_no_zeroconf_returns_empty(self): From 57558035eb4685ee1a613db9936e8a12d111a641 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Mon, 16 Mar 2026 12:37:41 -0700 Subject: [PATCH 2/2] Fix robustness issues in SiLA discovery - Catch subprocess.CalledProcessError in _interface_name_for_ip_sync - Add timeout to proc.communicate() in ARP scan functions - Handle BSD ARP lines with no trailing token after interface name - Use deadline-based timeout in _discover_sila1 to avoid double-counting - Await cancelled tasks in discover_iter for clean shutdown Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/io/sila/discovery.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/pylabrobot/io/sila/discovery.py b/pylabrobot/io/sila/discovery.py index be6eccb665a..29c0fcb01b0 100644 --- a/pylabrobot/io/sila/discovery.py +++ b/pylabrobot/io/sila/discovery.py @@ -92,7 +92,7 @@ def _interface_name_for_ip_sync(ip: str) -> Optional[str]: parts = line.split() if len(parts) >= 2: return parts[1].rstrip(":") - except (FileNotFoundError, subprocess.TimeoutExpired): + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.CalledProcessError): pass else: # macOS/BSD: parse ifconfig @@ -109,7 +109,7 @@ def _interface_name_for_ip_sync(ip: str) -> Optional[str]: current_iface = line.split(":")[0] elif ip_pattern.search(line) and current_iface is not None: return current_iface - except (FileNotFoundError, subprocess.TimeoutExpired): + except (FileNotFoundError, subprocess.TimeoutExpired, subprocess.CalledProcessError): pass return None @@ -258,15 +258,15 @@ async def _arp_scan_bsd(interface: str) -> dict[str, str]: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) - stdout, _ = await proc.communicate() - except FileNotFoundError: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) + except (FileNotFoundError, asyncio.TimeoutError): return {} results: dict[str, str] = {} for line in stdout.decode(errors="replace").splitlines(): if "incomplete" in line: continue - if f"on {iface_name} " not in line: + if f"on {iface_name} " not in line and not line.endswith(f"on {iface_name}"): continue m = re.search(r"\((\d+\.\d+\.\d+\.\d+)\)", line) if not m: @@ -347,8 +347,8 @@ async def _arp_scan_windows(interface: str) -> dict[str, str]: stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.DEVNULL, ) - stdout, _ = await proc.communicate() - except FileNotFoundError: + stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=5) + except (FileNotFoundError, asyncio.TimeoutError): return {} results: dict[str, str] = {} @@ -490,6 +490,9 @@ async def _discover_sila1( logger.debug("no interface provided for SiLA 1 discovery, skipping") return [] + loop = asyncio.get_running_loop() + deadline = loop.time() + timeout + # Run host discovery methods in parallel. # Cap NetBIOS at 3s — any device that responds will do so within a second or two. netbios_task = asyncio.ensure_future(_netbios_scan(interface, timeout=min(timeout, 3.0))) @@ -508,9 +511,10 @@ async def _discover_sila1( if not hosts: return [] + remaining = max(0.01, deadline - loop.time()) devices: list[SiLADevice] = [] coros = [ - _get_device_identification(ip, port, interface=interface, timeout=timeout) for ip in hosts + _get_device_identification(ip, port, interface=interface, timeout=remaining) for ip in hosts ] results = await asyncio.gather(*coros, return_exceptions=True) for r in results: @@ -613,6 +617,8 @@ async def discover_iter( finally: for t in pending: t.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) async def discover(