diff --git a/src/underworld3/discretisation/discretisation_mesh_variables.py b/src/underworld3/discretisation/discretisation_mesh_variables.py index 13359073..814eb2cc 100644 --- a/src/underworld3/discretisation/discretisation_mesh_variables.py +++ b/src/underworld3/discretisation/discretisation_mesh_variables.py @@ -1155,90 +1155,149 @@ def read_timestep( ): """ Read a mesh variable from an arbitrary vertex-based checkpoint file - and reconstruct/interpolate the data field accordingly. The data sizes / meshes can be - different and will be matched using a kd-tree / inverse-distance weighting - to the new mesh. + and reconstruct/interpolate the data field accordingly. The saved + mesh and the live mesh may have different sizes/decompositions; the + values are matched by nearest-neighbour kd-tree interpolation to + the live mesh nodes. - """ + Parallel-safe and memory-bounded. Two transient swarms route the + work without ever holding the full file on more than one rank: + + 1. **Source swarm** — rank 0 reads the file; saved + ``(coord, value)`` pairs migrate to whichever rank owns the + centroid-domain of each location. - # Fix this to match the write_timestep function + 2. **Query swarm** — each rank inserts *its own* live DOF + coordinates. They migrate using the same centroid logic, so + a live DOF and a saved point at the same coordinate land on + the same rank regardless of how PETSc partitioned the DM. + Each rank then runs a rank-local KDTree against the saved + data it received, and the interpolated values migrate back to + the live DOF's home rank. - # mesh.write_timestep( "test", meshUpdates=False, meshVars=[X], outputPath="", index=0) - # swarm.write_timestep("test", "swarm", swarmVars=[var], outputPath="", index=0) + Per-rank memory is bounded by ``file_size / n_ranks`` rather than + ``file_size`` per rank. + """ output_base_name = os.path.join(outputPath, data_filename) data_file = output_base_name + f".mesh.{data_name}.{index:05}.h5" - # check if data_file exists - if os.path.isfile(os.path.abspath(data_file)): - pass - else: + if not os.path.isfile(os.path.abspath(data_file)): raise RuntimeError(f"{os.path.abspath(data_file)} does not exist") import h5py import numpy as np - # Keep vector available for future access - pass - - ## Sub functions that are used to read / interpolate the mesh. - def field_from_checkpoint( - data_file=None, - data_name=None, - ): - """Read the mesh data as a swarm-like value""" + # ``self.num_components`` is correct for SCALAR (1), VECTOR (dim), + # TENSOR (dim**2) and SYM_TENSOR (dim*(dim+1)/2). ``self.shape[1]`` + # would silently drop components for tensor types because shape is + # ``(N, dim, dim)`` and ``[1]`` returns just ``dim``. + n_components = self.num_components + dim = self.mesh.dim + + # ---- Phase 1: source swarm carries saved (coord, value) pairs ---- + source_swarm = uw.swarm.Swarm(self.mesh) + saved = uw.swarm.SwarmVariable( + "_read_timestep_saved", + source_swarm, + vtype=uw.VarType.MATRIX, + size=(1, n_components), + dtype=float, + _proxy=False, + varsymbol=r"\cal{S}", + ) - if verbose and uw.mpi.rank == 0: + if uw.mpi.rank == 0: + if verbose: print(f"Reading data file {data_file}", flush=True) + with h5py.File(data_file, "r") as h5f: + X_src = h5f["fields"]["coordinates"][()].reshape(-1, dim) + D_src = h5f["fields"][data_name][()].reshape(-1, n_components) + else: + X_src = np.empty((0, dim), dtype=np.double) + D_src = np.empty((0, n_components), dtype=np.double) + + src_size_before = max(source_swarm.dm.getLocalSize(), 0) + source_swarm.add_particles_with_global_coordinates(X_src, migrate=False) + source_swarm._invalidate_canonical_data() + saved.array[src_size_before:, 0, :] = D_src[:, :] + # Deterministic centroid-distance routing: nearest rank-centroid + # owns the point. Both swarms (source + query below) use the same + # rule, so a saved point at coord X and a live-DOF query at the + # same X always land on the same rank — exact match restored. + # ``Swarm.migrate``'s ``points_in_domain`` test isn't enough on its + # own: at partition boundaries it can return True on multiple ranks + # (vertex DOFs are shared) and source/query end up apart. + source_swarm._route_by_nearest_centroid() + + landed_X = source_swarm._particle_coordinates.array[...].reshape(-1, dim) + landed_D = saved.array[:, 0, :] + + # ---- Phase 2: query swarm round-trips live DOFs to source rank ---- + query_coords = self.coords + if hasattr(query_coords, "magnitude"): + query_coords = query_coords.magnitude + n_query_local = query_coords.shape[0] + original_index = np.arange(n_query_local).reshape(-1, 1, 1) + + query_swarm = uw.swarm.Swarm(self.mesh) + origin_rank = uw.swarm.SwarmVariable( + "rank", query_swarm, + vtype=uw.VarType.SCALAR, dtype=int, _proxy=False, + varsymbol=r"\cal{R}_o", + ) + origin_index_var = uw.swarm.SwarmVariable( + "index", query_swarm, + vtype=uw.VarType.SCALAR, dtype=int, _proxy=False, + varsymbol=r"\cal{I}", + ) + result = uw.swarm.SwarmVariable( + "_read_timestep_result", query_swarm, + vtype=uw.VarType.MATRIX, size=(1, n_components), + dtype=float, _proxy=False, varsymbol=r"\cal{D}", + ) - h5f = h5py.File(data_file) - D = h5f["fields"][data_name][()].reshape(-1, self.shape[1]) - X = h5f["fields"]["coordinates"][()].reshape(-1, self.mesh.dim) - - h5f.close() - - if len(D.shape) == 1: - D = D.reshape(-1, 1) - - return X, D - - def map_to_vertex_values(X, D, nnn=4, p=2, verbose=False): - # Map from "swarm" of points to nodal points - # This is a permutation if we building on the checkpointed - # mesh file - - mesh_kdt = uw.kdtree.KDTree(X) - - # Strip pint units from query coords — the KDTree was built - # from plain HDF5 floats (same physical units, no metadata). - query_coords = self.coords - if hasattr(query_coords, "magnitude"): - query_coords = query_coords.magnitude - - return mesh_kdt.rbf_interpolator_local(query_coords, D, nnn, p, verbose) - - def values_to_mesh_var(mesh_variable, Values): - mesh = mesh_variable.mesh - - # This should be trivial but there may be problems if - # the kdtree does not have enough neighbours to allocate - # values for every point. We handle that here. - - mesh_variable.data[...] = Values[...] + q_size_before = max(query_swarm.dm.getLocalSize(), 0) + query_swarm.add_particles_with_global_coordinates(query_coords, migrate=False) + query_swarm._invalidate_canonical_data() + origin_rank.array[q_size_before:, 0, 0] = uw.mpi.rank + origin_index_var.array[q_size_before:, 0, 0] = original_index[:, 0, 0] - return + # Forward: live DOF coords go to the rank whose centroid is + # closest — same deterministic rule as the source swarm above. + query_swarm._route_by_nearest_centroid() - ## Read file information + local_query = query_swarm._particle_coordinates.array[...].reshape(-1, dim) - X, D = field_from_checkpoint( - data_file, - data_name, - ) + if landed_X.shape[0] > 0 and local_query.shape[0] > 0: + kdt = uw.kdtree.KDTree(landed_X) + # ``nnn=1`` — exact match for round-trip reads, sensible + # nearest-neighbour fallback for cross-mesh reads. + result.array[:, 0, :] = kdt.rbf_interpolator_local( + local_query, landed_D, 1, 2, verbose + ) + elif local_query.shape[0] > 0: + # No saved data landed on this rank — leave query payload zero + # and warn; callers can detect via a missing-rank pattern. + if verbose: + print( + f"[rank {uw.mpi.rank}] read_timestep: no saved points landed; " + f"queries from this rank will receive zeros", + flush=True, + ) - remapped_D = map_to_vertex_values(X, D) + # Reverse: stamp the destination rank from origin_rank and use the + # bare DMSwarm migrate to send each query particle back home. + query_swarm._rank_var.array[...] = origin_rank.array[...] + query_swarm.dm.migrate(remove_sent_points=True) + uw.mpi.barrier() + query_swarm._invalidate_canonical_data() - # This is empty at the moment - values_to_mesh_var(self, remapped_D) + # Reorder by original_index and write into self.data + idx = origin_index_var.array[:, 0, 0] + out = np.zeros((n_query_local, n_components), dtype=np.double) + out[idx, :] = result.array[:, 0, :] + self.data[...] = out return diff --git a/src/underworld3/swarm.py b/src/underworld3/swarm.py index 83fadb38..fe080680 100644 --- a/src/underworld3/swarm.py +++ b/src/underworld3/swarm.py @@ -1910,37 +1910,75 @@ def read_timestep( else: raise RuntimeError(f"{os.path.abspath(filename)} does not exist") - ### open up file with coords on all procs and open up data on all procs. May be problematic for large problems. - with ( - h5py.File(f"{filename}", "r") as h5f_data, - h5py.File(f"{swarmFilename}", "r") as h5f_swarm, - ): - - # with self.swarm.access(self): - var_dtype = self.dtype - file_dtype = h5f_data["data"][:].dtype - file_length = h5f_data["data"][:].shape[0] - - if var_dtype != file_dtype: - if comm.rank == 0: + # Memory-bounded parallel read. Rank 0 reads the saved file in one + # shot (this is not chunked — the file is the smallest copy of the + # data and we hold it for the duration of one ``add_particles`` call; + # streaming hyperslabs is a follow-up if rank-0 RAM becomes the + # binding constraint). Saved (coord, value) pairs are then pushed + # into a transient routing swarm and migrated *with the same rule + # the live swarm uses* — ``points_in_domain`` plus the centroid + # fallback inside ``Swarm.migrate``. That co-locates a saved point + # and the corresponding live particle on the same rank, so the + # rank-local KDTree always sees the right neighbour. + + n_components = self.num_components + dim = self.swarm.mesh.dim + + if uw.mpi.rank == 0: + with ( + h5py.File(f"{filename}", "r") as h5f_data, + h5py.File(f"{swarmFilename}", "r") as h5f_swarm, + ): + file_dtype = h5f_data["data"].dtype + if self.dtype != file_dtype: warnings.warn( - f"{os.path.basename(filename)} dtype ({file_dtype}) does not match {self.name} swarm variable dtype ({var_dtype}) which may result in a loss of data.", + f"{os.path.basename(filename)} dtype ({file_dtype}) " + f"does not match {self.name} swarm variable dtype " + f"({self.dtype}) which may result in a loss of data.", stacklevel=2, ) + X_chunk = h5f_swarm["coordinates"][()].reshape(-1, dim) + D_chunk = h5f_data["data"][()].reshape(-1, n_components) + else: + X_chunk = np.empty((0, dim), dtype=np.double) + D_chunk = np.empty((0, n_components), dtype=np.double) + + tmp_swarm = uw.swarm.Swarm(self.swarm.mesh) + saved = SwarmVariable( + "_read_timestep_saved", + tmp_swarm, + vtype=uw.VarType.MATRIX, + size=(1, n_components), + dtype=float, + _proxy=False, + varsymbol=r"\cal{S}", + ) - # First work out which are local points and ignore the rest - # This might help speed up the load by dropping lots of particles - - all_coords = h5f_swarm["coordinates"][()] - all_data = h5f_data["data"][()] + size_before = max(tmp_swarm.dm.getLocalSize(), 0) + tmp_swarm.add_particles_with_global_coordinates(X_chunk, migrate=False) + tmp_swarm._invalidate_canonical_data() + saved.array[size_before:, 0, :] = D_chunk[:, :] - local_coords = all_coords # [local] - local_data = all_data # [local] + # Use the same migration rule as the live swarm so saved points and + # live particles at the same coordinate land on the same rank. + # ``delete_lost_points=False`` keeps points that ``points_in_domain`` + # rejects on every rank; they fall through to the centroid fallback + # inside ``Swarm.migrate`` and end up somewhere deterministic. + tmp_swarm.migrate(remove_sent_points=True, delete_lost_points=False) - kdt = uw.kdtree.KDTree(local_coords) + landed_X = tmp_swarm._particle_coordinates.array[...].reshape(-1, dim) + landed_D = saved.array[:, 0, :] + if landed_X.shape[0] == 0: + warnings.warn( + f"[rank {uw.mpi.rank}] read_timestep: no saved swarm points " + f"landed locally; '{self.name}' on this rank will not be updated", + stacklevel=2, + ) + else: + kdt = uw.kdtree.KDTree(landed_X) self.array[:, 0, :] = kdt.rbf_interpolator_local( - self.swarm._particle_coordinates.data, local_data, nnn=1 + self.swarm._particle_coordinates.data, landed_D, nnn=1 ) return @@ -2557,6 +2595,39 @@ def _invalidate_canonical_data(self): if hasattr(var, "_canonical_data"): var._canonical_data = None + def _route_by_nearest_centroid(self): + """Migrate every particle to the rank whose domain-centroid is closest. + + This is a deterministic alternative to :meth:`migrate`: the destination + is a pure function of the coordinate, computed identically on every + rank. Two swarms migrated this way are guaranteed to place equal + coordinates on the same rank — which the standard ``migrate`` does not + guarantee at partition boundaries (vertex DOFs sitting on a shared face + can return ``True`` from ``points_in_domain`` on multiple ranks). + + Used by checkpoint readers and any consumer that needs source data and + query coordinates to converge on the same rank without relying on + PETSc's DOF distribution. + """ + centroids = self.mesh._get_domain_centroids() + centroid_kdt = uw.kdtree.KDTree(centroids) + + coords = self.dm.getField("DMSwarmPIC_coor").reshape(-1, self.dim).copy() + self.dm.restoreField("DMSwarmPIC_coor") + + if coords.shape[0] > 0: + _, owner_rank = centroid_kdt.query(coords, k=1, sqr_dists=False) + rank_arr = self.dm.getField("DMSwarm_rank") + # ``DMSwarm_rank`` shape varies by PETSc version: 1-D ``(N,)`` on + # 3.21, 2-D ``(N, 1)`` on 3.25+. ``reshape(-1)`` flattens either + # form into a writable view into the same buffer. + rank_arr.reshape(-1)[:] = owner_rank.astype(rank_arr.dtype, copy=False) + self.dm.restoreField("DMSwarm_rank") + + self.dm.migrate(remove_sent_points=True) + uw.mpi.barrier() + self._invalidate_canonical_data() + @property def mesh(self): """The mesh this swarm operates on""" diff --git a/tests/parallel/ptest_0762_read_timestep_swarm_routed.py b/tests/parallel/ptest_0762_read_timestep_swarm_routed.py new file mode 100644 index 00000000..fc5c1ae7 --- /dev/null +++ b/tests/parallel/ptest_0762_read_timestep_swarm_routed.py @@ -0,0 +1,113 @@ +"""Parallel regression test for the swarm-routed read_timestep path. + +The new ``MeshVariable.read_timestep`` uses a transient swarm to ship saved +``(coord, value)`` pairs to the rank that owns each location. This test +verifies the parallel round-trip: write a known field on N ranks, read it +back on N ranks, assert allclose. + +Run with: + mpirun -n 2 python -m pytest --with-mpi tests/parallel/ptest_0762_read_timestep_swarm_routed.py +""" + +import shutil +import tempfile + +import numpy as np +import pytest + +import underworld3 as uw +from underworld3.meshing import UnstructuredSimplexBox + +pytestmark = [pytest.mark.mpi(min_size=2), pytest.mark.timeout(120)] + + +def _shared_tmpdir(prefix): + """Make rank 0 a tmp directory and broadcast the path to every rank.""" + if uw.mpi.rank == 0: + tmpdir = tempfile.mkdtemp(prefix=prefix) + else: + tmpdir = None + return uw.mpi.comm.bcast(tmpdir, root=0) + + +def _cleanup_shared_tmpdir(tmpdir): + """Drop the shared tmp directory once all ranks are done with it.""" + uw.mpi.comm.barrier() + if uw.mpi.rank == 0: + shutil.rmtree(tmpdir, ignore_errors=True) + + +@pytest.mark.mpi(min_size=2) +@pytest.mark.level_1 +@pytest.mark.tier_a +def test_meshvariable_read_timestep_parallel_round_trip(): + """Write field on N ranks, read back on same N ranks, expect allclose.""" + tmpdir = _shared_tmpdir("uw3_read_timestep_") + try: + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 16.0, + ) + X = uw.discretisation.MeshVariable("X", mesh, 1, degree=2) + X2 = uw.discretisation.MeshVariable("X2", mesh, 1, degree=2) + + # Analytic field — varies in both dims so a faulty rank-local + # read-everywhere implementation could not silently pass on a + # constant field. + X.array[:, 0, 0] = X.coords[:, 0] + 0.5 * X.coords[:, 1] + + mesh.write_timestep( + "test", meshUpdates=False, meshVars=[X], + outputPath=tmpdir, index=0, + ) + + X2.read_timestep("test", "X", 0, outputPath=tmpdir) + + assert np.allclose(X.array, X2.array, atol=1e-8), ( + f"[rank {uw.mpi.rank}] read_timestep round-trip failed:\n" + f"max diff = {np.abs(np.asarray(X.array) - np.asarray(X2.array)).max():.3e}" + ) + finally: + _cleanup_shared_tmpdir(tmpdir) + + +@pytest.mark.skip( + reason="Pre-existing: Swarm.write_timestep hangs on N>1 with parallel h5py " + "(unrelated to read_timestep refactor). Serial round-trip is covered by " + "tests/test_0003_save_load.py::test_swarmvariable_save_and_load." +) +@pytest.mark.mpi(min_size=2) +@pytest.mark.level_1 +@pytest.mark.tier_a +def test_swarmvariable_read_timestep_parallel_round_trip(): + """Same round-trip check for SwarmVariable.read_timestep.""" + tmpdir = _shared_tmpdir("uw3_read_timestep_swarm_") + try: + mesh = UnstructuredSimplexBox( + minCoords=(0.0, 0.0), + maxCoords=(1.0, 1.0), + cellSize=1.0 / 16.0, + ) + swarm = uw.swarm.Swarm(mesh) + var = swarm.add_variable(name="X", size=1) + var2 = swarm.add_variable(name="X2", size=1) + swarm.populate(fill_param=2) + + # Each particle stores its own x-coord + var.array[:, 0, 0] = swarm._particle_coordinates.data[:, 0] + + swarm.write_timestep( + "test", "swarm", swarmVars=[var], + outputPath=tmpdir, index=0, + ) + + var2.read_timestep("test", "swarm", "X", 0, outputPath=tmpdir) + + # nnn=1 → exact match (nearest-neighbour) for the round-trip + assert np.allclose(var.array, var2.array, atol=1e-8), ( + f"[rank {uw.mpi.rank}] swarm read_timestep round-trip failed:\n" + f"max diff = {np.abs(np.asarray(var.array) - np.asarray(var2.array)).max():.3e}" + ) + finally: + _cleanup_shared_tmpdir(tmpdir)