diff --git a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/messages/scp/SendMCDataRequest.java b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/messages/scp/SendMCDataRequest.java new file mode 100644 index 0000000000..8279601023 --- /dev/null +++ b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/messages/scp/SendMCDataRequest.java @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019 The University of Manchester + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.manchester.spinnaker.messages.scp; + +import static uk.ac.manchester.spinnaker.messages.sdp.SDPHeader.Flag.REPLY_EXPECTED; +import static uk.ac.manchester.spinnaker.messages.sdp.SDPPort.COPY_DATA_IN_PORT; +import static uk.ac.manchester.spinnaker.messages.scp.SCPCommand.CMD_WRITE; +import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE; + + +import java.nio.ByteBuffer; + +import uk.ac.manchester.spinnaker.machine.HasCoreLocation; +import uk.ac.manchester.spinnaker.machine.MemoryLocation; +import uk.ac.manchester.spinnaker.messages.model.UnexpectedResponseCodeException; +import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader; +import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation; + +/** + * A command message to the Data In port to write multicast data. + */ +public class SendMCDataRequest extends SCPRequest { + /** Shift of x in the coordinates for arg2. */ + private static final int X_SHIFT = 16; + + /** + * @param core + * Where to send the request. + * @param targetCore + * The target core of the write. + * @param baseAddress + * The address to write to on the target core. + * @param data + * The data to write. + */ + public SendMCDataRequest(HasCoreLocation core, HasCoreLocation targetCore, + MemoryLocation baseAddress, ByteBuffer data) { + super(header(core), CMD_WRITE, baseAddress.address, + (targetCore.getX() << X_SHIFT) | targetCore.getY(), + data.remaining() / WORD_SIZE, data); + } + + /** + * Make a variant of SDP header that talks to the packet reinjector. It + * always wants a reply and always talks to a particular SDP port + * (the port for the reinjector). + * + * @param core + * The SpiNNaker core that we want to talk to. Should be running + * the extra monitor core (not checked). + * @return The SDP header. + */ + private static SDPHeader header(HasCoreLocation core) { + return new SDPHeader(REPLY_EXPECTED, new SDPLocation(core), + COPY_DATA_IN_PORT); + } + + @Override + public EmptyResponse getSCPResponse(ByteBuffer buffer) + throws UnexpectedResponseCodeException { + return new EmptyResponse("Copy Data In", CMD_WRITE, buffer); + } +} diff --git a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/messages/sdp/SDPPort.java b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/messages/sdp/SDPPort.java index fb8bb52016..aabfd12a4c 100644 --- a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/messages/sdp/SDPPort.java +++ b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/messages/sdp/SDPPort.java @@ -30,7 +30,9 @@ public enum SDPPort { /** Extra monitor core data transfer functionality. */ EXTRA_MONITOR_CORE_DATA_SPEED_UP(5), /** Messages directed at the packet gatherer for the speed up protocols. */ - GATHERER_DATA_SPEED_UP(6); + GATHERER_DATA_SPEED_UP(6), + /** Messages directed at the data in loader to simply load data. */ + COPY_DATA_IN_PORT(7); /** The port ID. */ public final int value; diff --git a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/Transceiver.java b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/Transceiver.java index eee1d9b7c1..edfed1d6eb 100644 --- a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/Transceiver.java +++ b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/Transceiver.java @@ -2086,6 +2086,16 @@ public void writeMemoryFlood(MemoryLocation baseAddress, ByteBuffer data) } } + @Override + @ParallelSafe + public void writeMemoryMulticast(HasCoreLocation core, + HasCoreLocation targetCore, MemoryLocation baseAddress, + ByteBuffer data) + throws IOException, ProcessException, InterruptedException { + new WriteMemoryByMulticastProcess(scpSelector, this).writeMemory( + core, targetCore, baseAddress, data); + } + @Override @CheckReturnValue @ParallelSafe diff --git a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/TransceiverInterface.java b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/TransceiverInterface.java index 1aceb5d99d..232587757a 100644 --- a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/TransceiverInterface.java +++ b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/TransceiverInterface.java @@ -1866,6 +1866,34 @@ void writeMemory(@Valid HasCoreLocation core, @NotNull MemoryLocation baseAddress, @NotNull ByteBuffer data) throws IOException, ProcessException, InterruptedException; + /** + * Write to the board via multicast on the Ethernet chip. + * + * @param core + * The coordinates of the Ethernet core containing the advanced + * monitor support + * @param targetCore + * The coordinates of the core where the memory is that is to be + * written to + * @param baseAddress + * The address in SDRAM where the region of memory is to be + * written + * @param data + * The data that is to be written. The data should be from the + * position to the limit. + * @throws IOException + * If anything goes wrong with networking. + * @throws ProcessException + * If SpiNNaker rejects a message. + * @throws InterruptedException + * If the communications were interrupted. + */ + @ParallelSafe + void writeMemoryMulticast(@Valid HasCoreLocation core, + @Valid HasCoreLocation targetCore, + @NotNull MemoryLocation baseAddress, @NotNull ByteBuffer data) + throws IOException, ProcessException, InterruptedException; + /** * Write to the user0 register of a core. * diff --git a/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/WriteMemoryByMulticastProcess.java b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/WriteMemoryByMulticastProcess.java new file mode 100644 index 0000000000..c805627364 --- /dev/null +++ b/SpiNNaker-comms/src/main/java/uk/ac/manchester/spinnaker/transceiver/WriteMemoryByMulticastProcess.java @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2018 The University of Manchester + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.manchester.spinnaker.transceiver; + +import static java.lang.Math.max; +import static java.nio.ByteBuffer.allocate; +import static uk.ac.manchester.spinnaker.messages.Constants.UDP_MESSAGE_MAX_SIZE; +import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.read; +import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.sliceUp; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import uk.ac.manchester.spinnaker.connections.ConnectionSelector; +import uk.ac.manchester.spinnaker.connections.SCPConnection; +import uk.ac.manchester.spinnaker.machine.HasCoreLocation; +import uk.ac.manchester.spinnaker.machine.MemoryLocation; +import uk.ac.manchester.spinnaker.messages.scp.SendMCDataRequest; + +/** + * Write to memory on SpiNNaker via multicast (data in only). + */ +class WriteMemoryByMulticastProcess extends TxrxProcess { + + /** Timeout for a write request; longer as the write can take some time. */ + private static final int TIMEOUT = 10000; + + /** The number of simultaneous messages that can be in progress. */ + private static final int N_CHANNELS = 8; + + /** + * @param connectionSelector + * How to select how to communicate. + * @param retryTracker + * Object used to track how many retries were used in an + * operation. May be {@code null} if no suck tracking is + * required. + */ + WriteMemoryByMulticastProcess( + ConnectionSelector connectionSelector, + RetryTracker retryTracker) { + super(connectionSelector, SCP_RETRIES, TIMEOUT, N_CHANNELS, + N_CHANNELS - 1, retryTracker); + } + + /** + * @param connectionSelector + * How to select how to communicate. + * @param numChannels + * The number of parallel communications to support + * @param retryTracker + * Object used to track how many retries were used in an + * operation. May be {@code null} if no suck tracking is + * required. + */ + WriteMemoryByMulticastProcess( + ConnectionSelector connectionSelector, + int numChannels, RetryTracker retryTracker) { + super(connectionSelector, SCP_RETRIES, TIMEOUT, numChannels, + max(numChannels / 2, 1), retryTracker); + } + + /** + * Write to memory. + * + * @param core + * The location to send the message to. + * @param targetCore + * The target to write the data to. + * @param baseAddress + * The base address to write. + * @param data + * The overall block of memory to write + * @throws IOException + * If anything goes wrong with networking. + * @throws ProcessException + * If SpiNNaker rejects a message. + * @throws InterruptedException + * If the communications were interrupted. + */ + public void writeMemory( + HasCoreLocation core, HasCoreLocation targetCore, + MemoryLocation baseAddress, ByteBuffer data) + throws IOException, ProcessException, InterruptedException { + var writePosition = baseAddress; + for (var bb : sliceUp(data, UDP_MESSAGE_MAX_SIZE)) { + sendRequest(new SendMCDataRequest(core, targetCore, writePosition, + bb)); + writePosition = writePosition.add(bb.remaining()); + } + finishBatch(); + } + + /** + * Write to memory. + * + * @param core + * The location to send the message to. + * @param targetCore + * The target to write the data to. + * @param baseAddress + * The base address to write. + * @param data + * The stream of data to write. + * @throws IOException + * If anything goes wrong with networking or the input stream. + * @throws ProcessException + * If SpiNNaker rejects a message. + * @throws InterruptedException + * If the communications were interrupted. + */ + public void writeMemory( + HasCoreLocation core, HasCoreLocation targetCore, + MemoryLocation baseAddress, InputStream data) + throws IOException, ProcessException, InterruptedException { + var writePosition = baseAddress; + while (true) { + // One buffer per message; lifetime extends until batch end + var tmp = read(data, allocate(UDP_MESSAGE_MAX_SIZE), + UDP_MESSAGE_MAX_SIZE); + if (tmp == null) { + break; + } + sendRequest(new SendMCDataRequest(core, targetCore, writePosition, + tmp)); + writePosition = writePosition.add(tmp.remaining()); + } + finishBatch(); + } +} diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/CommandLineInterface.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/CommandLineInterface.java index 01f5602380..7ac0d01ae1 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/CommandLineInterface.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/CommandLineInterface.java @@ -23,8 +23,7 @@ import static uk.ac.manchester.spinnaker.alloc.client.SpallocClientFactory.getJobFromProxyInfo; import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DOWNLOAD_DESC; import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_APP_DESC; -import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_DESC; -import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_MON_DESC; +import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_MON_DESC_MC; import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.DSE_SYS_DESC; import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.GATHER_DESC; import static uk.ac.manchester.spinnaker.front_end.CommandDescriptions.IOBUF_DESC; @@ -71,7 +70,7 @@ import uk.ac.manchester.spinnaker.front_end.download.RecordingRegionDataGatherer; import uk.ac.manchester.spinnaker.front_end.download.request.Gather; import uk.ac.manchester.spinnaker.front_end.download.request.Placement; -import uk.ac.manchester.spinnaker.front_end.dse.FastExecuteDataSpecification; +import uk.ac.manchester.spinnaker.front_end.dse.FastMCExecuteDataSpecification; import uk.ac.manchester.spinnaker.front_end.dse.HostExecuteDataSpecification; import uk.ac.manchester.spinnaker.front_end.iobuf.IobufRequest; import uk.ac.manchester.spinnaker.front_end.iobuf.IobufRetriever; @@ -195,8 +194,8 @@ HostExecuteDataSpecification create(TransceiverInterface txrx, static HostDSEFactory hostFactory = HostExecuteDataSpecification::new; @FunctionalInterface - interface FastDSEFactory { - FastExecuteDataSpecification create(TransceiverInterface txrx, + interface FastMCDSEFactory { + FastMCExecuteDataSpecification create(TransceiverInterface txrx, Machine machine, List gatherers, File reportDir, DSEDatabaseEngine db) throws IOException, SpinnmanException, StorageException, @@ -204,10 +203,10 @@ FastExecuteDataSpecification create(TransceiverInterface txrx, } /** - * Makes {@link FastExecuteDataSpecification} instances. Allows for + * Makes {@link FastMCExecuteDataSpecification} instances. Allows for * injection of debugging tooling. */ - static FastDSEFactory fastFactory = FastExecuteDataSpecification::new; + static FastMCDSEFactory fastMCFactory = FastMCExecuteDataSpecification::new; /** * Run the data specifications in parallel. @@ -257,7 +256,7 @@ public void runDSEUploadingViaClassicTransfer(Machine machine, } /** - * Run the data specifications in parallel. + * Run the data specifications in parallel using monitors and multicast. * * @param gatherers * List of descriptions of gatherers. @@ -283,8 +282,8 @@ public void runDSEUploadingViaClassicTransfer(Machine machine, * @throws URISyntaxException * If a proxy URI is provided but invalid. */ - @Command(name = "dse_app_mon", description = DSE_MON_DESC) - public void runDSEForAppCoresUploadingViaMonitorStreaming( + @Command(name = "dse_app_mon_mc", description = DSE_MON_DESC_MC) + public void runDSEForAppCoresUploadingViaMulticast( @Mixin GatherersParam gatherers, @Mixin MachineParam machine, @Mixin DsFileParam dsFile, @@ -298,7 +297,7 @@ public void runDSEForAppCoresUploadingViaMonitorStreaming( var job = getJob(db); try (var txrx = getTransceiver(machine.get(), job); - var dseExec = fastFactory.create(txrx, machine.get(), + var dseExec = fastMCFactory.create(txrx, machine.get(), gatherers.get(), reportFolder.orElse(null), db)) { dseExec.loadCores(); } @@ -770,6 +769,14 @@ interface CommandDescriptions { + "Requires system cores to be fully configured, so " + "can't be used to set up system cores."; + /** Description of {@code dse_app_mon} command. */ + String DSE_MON_DESC_MC = + "Evaluate data specifications for application cores " + + "and upload the results to SpiNNaker using the fast data " + + "streaming protocol directly with multicast. " + + "Requires system cores to be fully configured, so " + + "can't be used to set up system cores."; + /** Description of {@code dse_sys} command. */ String DSE_SYS_DESC = "Evaluate data specifications for system cores and " + "upload the results to SpiNNaker (always uses the classic " diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastDataInCommandID.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastDataInCommandID.java deleted file mode 100644 index 6232432c4c..0000000000 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastDataInCommandID.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright (c) 2019 The University of Manchester - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package uk.ac.manchester.spinnaker.front_end.dse; - -import static uk.ac.manchester.spinnaker.utils.CollectionUtils.makeEnumBackingMap; - -import java.util.Map; - -/** - * Command IDs for the SDP packets for data in. - * - * @author Donal Fellows - */ -public enum FastDataInCommandID { - /** Host to Gatherer: start accepting data bound for location. */ - SEND_DATA_TO_LOCATION(200), - /** Host to Gatherer: more data with sequence number. */ - SEND_SEQ_DATA(2000), - /** Host to Gatherer: all data transmitted. */ - SEND_TELL_DATA_IN(2001), - /** Gatherer to host: there are missing sequence numbers. */ - RECEIVE_MISSING_SEQ_DATA_IN(2002), - /** Gatherer to host: all present and correct. */ - RECEIVE_FINISHED_DATA_IN(2003); - - private static final Map MAP = - makeEnumBackingMap(values(), v -> v.value); - - /** The protocol ID of this constant. */ - public final int value; - - FastDataInCommandID(int value) { - this.value = value; - } - - /** - * Get a constant by its protocol ID. - * - * @param value - * The protocol ID - * @return The matching constant. - * @throws IllegalArgumentException - * if the value isn't one of the ones accepted by this class. - */ - public static FastDataInCommandID forValue(int value) { - var id = MAP.get(value); - if (id == null) { - throw new IllegalArgumentException( - "unexpected command code: " + value); - } - return id; - } -} diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastDataInProtocol.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastDataInProtocol.java deleted file mode 100644 index 0b7b4fca4d..0000000000 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastDataInProtocol.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Copyright (c) 2019 The University of Manchester - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package uk.ac.manchester.spinnaker.front_end.dse; - -import static java.lang.Integer.toUnsignedLong; -import static java.lang.Math.min; -import static java.lang.String.format; -import static java.nio.ByteBuffer.allocate; -import static java.nio.ByteOrder.LITTLE_ENDIAN; -import static uk.ac.manchester.spinnaker.front_end.dse.FastDataInCommandID.SEND_DATA_TO_LOCATION; -import static uk.ac.manchester.spinnaker.front_end.dse.FastDataInCommandID.SEND_SEQ_DATA; -import static uk.ac.manchester.spinnaker.front_end.dse.FastDataInCommandID.SEND_TELL_DATA_IN; -import static uk.ac.manchester.spinnaker.messages.Constants.SDP_PAYLOAD_WORDS; -import static uk.ac.manchester.spinnaker.messages.Constants.WORD_SIZE; -import static uk.ac.manchester.spinnaker.messages.sdp.SDPHeader.Flag.REPLY_NOT_EXPECTED; -import static uk.ac.manchester.spinnaker.messages.sdp.SDPPort.GATHERER_DATA_SPEED_UP; -import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.slice; -import static uk.ac.manchester.spinnaker.utils.MathUtils.ceildiv; - -import java.nio.ByteBuffer; - -import uk.ac.manchester.spinnaker.machine.ChipLocation; -import uk.ac.manchester.spinnaker.machine.HasChipLocation; -import uk.ac.manchester.spinnaker.machine.HasCoreLocation; -import uk.ac.manchester.spinnaker.machine.Machine; -import uk.ac.manchester.spinnaker.machine.MemoryLocation; -import uk.ac.manchester.spinnaker.messages.sdp.SDPHeader; -import uk.ac.manchester.spinnaker.messages.sdp.SDPLocation; -import uk.ac.manchester.spinnaker.messages.sdp.SDPMessage; - -/** - * Manufactures Fast Data In protocol messages. - * - * @author Donal Fellows - * @author Alan Stokes - */ -class FastDataInProtocol { - /** Items of data a SDP packet can hold when SCP header removed. */ - static final int BYTES_PER_FULL_PACKET = SDP_PAYLOAD_WORDS * WORD_SIZE; - - // 272 bytes as removed SCP header - - /** - * size of the location data packet (command, transaction id, start sdram - * address, x and y, and max packet number. - */ - static final int BYTES_FOR_LOCATION_PACKET = 5 * WORD_SIZE; - - /** - * Offset where data in starts on first command (command, transaction id, - * seq_number), in bytes. - */ - static final int OFFSET_AFTER_COMMAND_AND_KEY = 3 * WORD_SIZE; - - /** Size for data to store when packet with command and key. */ - static final int DATA_IN_FULL_PACKET_WITH_KEY = - BYTES_PER_FULL_PACKET - OFFSET_AFTER_COMMAND_AND_KEY; - - /** - * size for data to store when sending tell packet (command id, transaction - * id). - */ - static final int BYTES_FOR_TELL_PACKET = 2 * WORD_SIZE; - - private final HasCoreLocation gathererCore; - - private final HasChipLocation boardLocalDestination; - - /** - * Create an instance of the protocol for talking to a particular extra - * monitor core on a particular board. - * - * @param machine - * The machine containing the board. - * @param gathererCore - * The gatherer core on the board that messages will be routed - * via. - * @param monitorChip - * The extra monitor core on the board that is the destination - * for the messages. - */ - FastDataInProtocol(Machine machine, HasCoreLocation gathererCore, - HasChipLocation monitorChip) { - this.gathererCore = gathererCore; - - int boardLocalX = monitorChip.getX() - gathererCore.getX(); - if (boardLocalX < 0) { - boardLocalX += machine.maxChipX() + 1; - } - int boardLocalY = monitorChip.getY() - gathererCore.getY(); - if (boardLocalY < 0) { - boardLocalY += machine.maxChipY() + 1; - } - this.boardLocalDestination = new ChipLocation(boardLocalX, boardLocalY); - } - - private SDPHeader header() { - return new SDPHeader(REPLY_NOT_EXPECTED, new SDPLocation(gathererCore), - GATHERER_DATA_SPEED_UP.value); - } - - /** - * @param baseAddress - * Where the data is to be written. - * @param numPackets - * How many SDP packets will be sent. - * @param transactionId - * The transaction id of this stream. - * @return The message indicating the start of the data. - */ - SDPMessage dataToLocation(MemoryLocation baseAddress, int numPackets, - int transactionId) { - var payload = allocate(BYTES_FOR_LOCATION_PACKET).order(LITTLE_ENDIAN); - payload.putInt(SEND_DATA_TO_LOCATION.value); - payload.putInt(transactionId); - payload.putInt(baseAddress.address); - payload.putShort((short) boardLocalDestination.getY()); - payload.putShort((short) boardLocalDestination.getX()); - payload.putInt(numPackets - 1); - payload.flip(); - return new SDPMessage(header(), payload); - } - - /** - * @param data - * The overall data to be transmitted. - * @param seqNum - * The sequence number of this chunk. - * - * @param transactionId - * The transaction id for this stream. - * @return The message containing a chunk of the data. - * @throws RuntimeException - * If the sequence number is nonsense. - */ - SDPMessage seqData(ByteBuffer data, int seqNum, int transactionId) { - var payload = allocate(BYTES_PER_FULL_PACKET).order(LITTLE_ENDIAN); - int position = calculatePositionFromSequenceNumber(seqNum); - if (position >= data.limit()) { - throw new RuntimeException(format( - "attempt to write off end of buffer due to " - + "over-large sequence number (%d) given " - + "that only %d bytes are to be sent", - seqNum, toUnsignedLong(data.limit()))); - } - payload.putInt(SEND_SEQ_DATA.value); - payload.putInt(transactionId); - payload.putInt(seqNum); - putBuffer(data, position, payload); - return new SDPMessage(header(), payload); - } - - private int putBuffer(ByteBuffer data, int position, ByteBuffer payload) { - var slice = slice(data, position, - min(data.remaining() - position, payload.remaining())); - payload.put(slice).flip(); - return slice.position(); - } - - private int calculatePositionFromSequenceNumber(int seqNum) { - return DATA_IN_FULL_PACKET_WITH_KEY * seqNum; - } - - /** - * generates the tell message. - * - * @param transactionId - * The transaction id for this stream. - * @return The message indicating the end of the data. - */ - SDPMessage tellDataIn(int transactionId) { - var payload = allocate(BYTES_FOR_TELL_PACKET).order(LITTLE_ENDIAN); - payload.putInt(SEND_TELL_DATA_IN.value); - payload.putInt(transactionId); - payload.flip(); - return new SDPMessage(header(), payload); - } - - /** - * Computes the number of packets required to send the given data. - * - * @param data - * The data being sent. (This operation only reads.) - * @return The number of packets (i.e. 1 more than the max sequence number). - */ - static int computeNumPackets(ByteBuffer data) { - return ceildiv(data.remaining(), DATA_IN_FULL_PACKET_WITH_KEY); - } -} diff --git a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastExecuteDataSpecification.java b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastMCExecuteDataSpecification.java similarity index 60% rename from SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastExecuteDataSpecification.java rename to SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastMCExecuteDataSpecification.java index cb167c3377..7ae67d565d 100644 --- a/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastExecuteDataSpecification.java +++ b/SpiNNaker-front-end/src/main/java/uk/ac/manchester/spinnaker/front_end/dse/FastMCExecuteDataSpecification.java @@ -20,12 +20,9 @@ import static java.lang.String.format; import static java.lang.System.getProperty; import static java.lang.System.nanoTime; -import static java.nio.ByteOrder.LITTLE_ENDIAN; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.stream.Collectors.toList; -import static java.util.stream.IntStream.range; import static org.slf4j.LoggerFactory.getLogger; -import static uk.ac.manchester.spinnaker.front_end.dse.FastDataInProtocol.computeNumPackets; import static uk.ac.manchester.spinnaker.messages.Constants.NBBY; import static uk.ac.manchester.spinnaker.utils.ByteBufferUtils.sliceUp; import static uk.ac.manchester.spinnaker.utils.UnitConstants.MEGABYTE; @@ -37,19 +34,14 @@ import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; -import java.net.SocketTimeoutException; import java.net.URISyntaxException; import java.nio.ByteBuffer; -import java.nio.IntBuffer; -import java.util.ArrayDeque; -import java.util.BitSet; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; -import com.google.errorprone.annotations.CheckReturnValue; import com.google.errorprone.annotations.MustBeClosed; import difflib.ChangeDelta; @@ -60,6 +52,7 @@ import uk.ac.manchester.spinnaker.front_end.download.request.Gather; import uk.ac.manchester.spinnaker.front_end.download.request.Monitor; import uk.ac.manchester.spinnaker.machine.ChipLocation; +import uk.ac.manchester.spinnaker.machine.CoreLocation; import uk.ac.manchester.spinnaker.machine.CoreSubsets; import uk.ac.manchester.spinnaker.machine.HasChipLocation; import uk.ac.manchester.spinnaker.machine.HasCoreLocation; @@ -80,23 +73,15 @@ * @author Donal Fellows * @author Alan Stokes */ -public class FastExecuteDataSpecification extends ExecuteDataSpecification { +public class FastMCExecuteDataSpecification extends ExecuteDataSpecification { private static final Logger log = - getLogger(FastExecuteDataSpecification.class); + getLogger(FastMCExecuteDataSpecification.class); private static final String SPINNAKER_COMPARE_UPLOAD = getProperty("spinnaker.compare.upload"); private static final String IN_REPORT_NAME = - "speeds_gained_in_speed_up_process.tsv"; - - private static final int TIMEOUT_RETRY_LIMIT = 100; - - /** flag for saying missing all SEQ numbers. */ - private static final int FLAG_FOR_MISSING_ALL_SEQUENCES = 0xFFFFFFFE; - - /** Sequence number that marks the end of a sequence number stream. */ - private static final int MISSING_SEQS_END = -1; + "speeds_gained_in_speed_up_process.rpt"; private final Map gathererForChip; @@ -137,7 +122,7 @@ public class FastExecuteDataSpecification extends ExecuteDataSpecification { * this constructor should not be doing that! */ @MustBeClosed - public FastExecuteDataSpecification(TransceiverInterface txrx, + public FastMCExecuteDataSpecification(TransceiverInterface txrx, Machine machine, List gatherers, File reportDir, DSEDatabaseEngine db) throws IOException, ProcessException, InterruptedException, StorageException, URISyntaxException { @@ -228,21 +213,19 @@ private void loadBoard(Ethernet board, DSEStorage storage) } log.info("loading data onto {} cores on board", cores.size()); var gather = gathererForChip.get(board.location); - try (var worker = new FastBoardWorker( - txrx, board, storage, gather)) { + var worker = new FastBoardWorker(txrx, board, storage, gather); + for (var xyp : cores) { + worker.mallocCore(xyp); + } + try (var routers = worker.systemRouterTables(); + var context = worker.dontDropPackets(gather)) { for (var xyp : cores) { - worker.mallocCore(xyp); - } - try (var routers = worker.systemRouterTables(); - var context = worker.dontDropPackets(gather)) { - for (var xyp : cores) { - worker.loadCore(xyp); - } - log.info("finished sending data in for this board"); - } catch (Exception e) { - log.warn("failure in core loading", e); - throw e; + worker.loadCore(xyp); } + log.info("finished sending data in for this board"); + } catch (Exception e) { + log.warn("failure in core loading", e); + throw e; } } @@ -358,33 +341,15 @@ private static List describeChunk(Chunk chunk) { * @author Donal Fellows * @author Alan Stokes */ - private class FastBoardWorker extends BoardWorker implements AutoCloseable { - private ThrottledConnection connection; - - private MissingRecorder missingSequenceNumbers; - - private BoardLocal logContext; - - private Gather gather; + private class FastBoardWorker extends BoardWorker { + private HasCoreLocation ethernet; - @MustBeClosed - @SuppressWarnings("MustBeClosed") FastBoardWorker(TransceiverInterface txrx, Ethernet board, DSEStorage storage, Gather gather) throws IOException, ProcessException, InterruptedException, StorageException { super(txrx, board, storage); - this.logContext = new BoardLocal(board.location); - this.connection = new ThrottledConnection(txrx, board, - gather.getIptag()); - this.gather = gather; - } - - @Override - public void close() throws IOException, ProcessException, - InterruptedException { - logContext.close(); - connection.close(); + this.ethernet = new CoreLocation(board.location, gather.getP()); } /** @@ -393,30 +358,8 @@ public void close() throws IOException, ProcessException, * * @author Donal Fellows */ - @SuppressWarnings("serial") - private class MissingRecorder extends ArrayDeque - implements AutoCloseable { - MissingRecorder() { - missingSequenceNumbers = this; - } + private final class MissingRecorder { - @Override - public void close() { - missingSequenceNumbers = null; - } - - /** - * Give me a new bitfield, recorded in this class. - * - * @param expectedMax - * How big should the bitfield be? - * @return The bitfield. - */ - BitSet issueNew(int expectedMax) { - var s = new BitSet(expectedMax); - addLast(s); - return s; - } /** * Issue the report based on what we recorded, if appropriate. @@ -466,13 +409,11 @@ protected int writeRegion(HasCoreLocation core, ByteBuffer content, MemoryLocation baseAddress) throws IOException, ProcessException, InterruptedException { int written = content.remaining(); - try (var recorder = new MissingRecorder()) { - long start = nanoTime(); - fastWrite(core, baseAddress, content); - long end = nanoTime(); - recorder.report( - core, end - start, content.limit(), baseAddress); - } + var recorder = new MissingRecorder(); + long start = nanoTime(); + fastWrite(core, baseAddress, content); + long end = nanoTime(); + recorder.report(core, end - start, content.limit(), baseAddress); if (SPINNAKER_COMPARE_UPLOAD != null) { var readBack = txrx.readMemory( core, baseAddress, content.remaining()); @@ -539,255 +480,22 @@ SystemRouterTableContext systemRouterTables() private void fastWrite(HasCoreLocation core, MemoryLocation baseAddress, ByteBuffer data) throws IOException, InterruptedException { - int timeoutCount = 0; - int numPackets = computeNumPackets(data); - var protocol = new GathererProtocol(core); - int transactionId = gather.getNextTransactionId(); - - outerLoop: while (true) { - // Do the initial blast of data - sendInitialPackets(baseAddress, data, protocol, transactionId, - numPackets); - /* - * Don't create a missing buffer until at least one packet has - * come back. - */ - BitSet missing = null; - - // Wait for confirmation and do required retransmits - innerLoop: while (true) { - try { - var buf = connection.receive(); - var received = buf.order(LITTLE_ENDIAN).asIntBuffer(); - timeoutCount = 0; // Reset the timeout counter - int command = received.get(); - try { - // read transaction id - var commandCode = - FastDataInCommandID.forValue(command); - int thisTransactionId = received.get(); - - // if wrong transaction id, ignore packet - if (thisTransactionId != transactionId) { - continue innerLoop; - } - - // Decide what to do with the packet - switch (commandCode) { - case RECEIVE_FINISHED_DATA_IN: - // We're done! - break outerLoop; - - case RECEIVE_MISSING_SEQ_DATA_IN: - if (!received.hasRemaining()) { - throw new BadDataInMessageException( - received.get(0), received); - } - log.debug( - "another packet (#{}) of missing " - + "sequence numbers;", - received.get(1)); - break; - default: - throw new BadDataInMessageException( - received.get(0), received); - } - - /* - * The currently received packet has missing - * sequence numbers. Accumulate and dispatch - * transactionId when we've got them all. - */ - if (missing == null) { - missing = missingSequenceNumbers.issueNew( - numPackets); - } - var flags = addMissedSeqNums( - received, missing, numPackets); - - /* - * Check that you've seen something that implies - * ready to retransmit. - */ - if (flags.seenAll || flags.seenEnd) { - retransmitMissingPackets(protocol, data, - missing, transactionId); - missing.clear(); - } - } catch (IllegalArgumentException e) { - log.error("Unexpected command code " + command - + " received from " - + connection.getLocation()); - } - } catch (SocketTimeoutException e) { - if (timeoutCount++ > TIMEOUT_RETRY_LIMIT) { - log.error( - "ran out of attempts on transaction {}" - + " due to timeouts.", - transactionId); - throw e; - } - /* - * If we never received a packet, we will never have - * created the buffer, so send everything again - */ - if (missing == null) { - log.debug("full timeout; resending initial " - + "packets for stream with transaction " - + "id {}", transactionId); - continue outerLoop; - } - log.info( - "timeout {} on transaction {} sending to {}" - + " via {}", - timeoutCount, transactionId, core, - gather.asCoreLocation()); - retransmitMissingPackets(protocol, data, missing, - transactionId); - missing.clear(); - } + try { + int boardLocalX = core.getX() - ethernet.getX(); + if (boardLocalX < 0) { + boardLocalX += machine.maxChipX() + 1; } - } - } - - @CheckReturnValue - private SeenFlags addMissedSeqNums(IntBuffer received, BitSet seqNums, - int expectedMax) { - var flags = new SeenFlags(); - var addedEnd = ""; - var addedAll = ""; - int actuallyAdded = 0; - while (received.hasRemaining()) { - int num = received.get(); - - if (num == MISSING_SEQS_END) { - addedEnd = "and saw END marker"; - flags.seenEnd = true; - break; - } - if (num == FLAG_FOR_MISSING_ALL_SEQUENCES) { - addedAll = "by finding ALL missing marker"; - flags.seenAll = true; - for (int seqNum = 0; seqNum < expectedMax; seqNum++) { - seqNums.set(seqNum); - actuallyAdded++; - } - break; + int boardLocalY = core.getY() - ethernet.getY(); + if (boardLocalY < 0) { + boardLocalY += machine.maxChipY() + 1; } - - seqNums.set(num); - actuallyAdded++; - if (num < 0 || num > expectedMax) { - throw new CrazySequenceNumberException(num, received); - } - } - log.debug("added {} missed packets, {}{}", actuallyAdded, addedEnd, - addedAll); - return flags; - } - - private int sendInitialPackets(MemoryLocation baseAddress, - ByteBuffer data, GathererProtocol protocol, int transactionId, - int numPackets) throws IOException { - log.info("streaming {} bytes in {} packets using transaction {}", - data.remaining(), numPackets, transactionId); - log.debug("sending packet #{}", 0); - connection.send(protocol.dataToLocation(baseAddress, numPackets, - transactionId)); - for (int seqNum = 0; seqNum < numPackets; seqNum++) { - log.debug("sending packet #{}", seqNum); - connection.send(protocol.seqData(data, seqNum, transactionId)); + var boardLocal = new CoreLocation(boardLocalX, boardLocalY, + core.getP()); + txrx.writeMemoryMulticast(ethernet, boardLocal, baseAddress, + data); + } catch (ProcessException e) { + throw new IOException(e); } - log.debug("sending terminating packet"); - connection.send(protocol.tellDataIn(transactionId)); - return numPackets; - } - - private void retransmitMissingPackets(GathererProtocol protocol, - ByteBuffer dataToSend, BitSet missingSeqNums, int transactionId) - throws IOException { - log.info("retransmitting {} packets", missingSeqNums.cardinality()); - - missingSeqNums.stream().forEach(seqNum -> { - log.debug("resending packet #{}", seqNum); - try { - connection.send(protocol.seqData(dataToSend, seqNum, - transactionId)); - } catch (IOException e) { - log.error( - "missing sequence packet with id {}-{} " - + "failed to transmit", - seqNum, transactionId, e); - } - }); - log.debug("sending terminating packet"); - connection.send(protocol.tellDataIn(transactionId)); - } - } - - /** - * Manufactures Fast Data In protocol messages. - * - * @author Donal Fellows - */ - private class GathererProtocol extends FastDataInProtocol { - private GathererProtocol(ChipLocation chip, boolean ignored) { - super(machine, gathererForChip.get(chip), monitorForChip.get(chip)); - } - - /** - * Create an instance of this for pushing data to a given chip's SDRAM. - * - * @param chip - * The chip where the data is to be pushed. What extra - * monitor and data gatherer to route it via are determined - * from the board context. - */ - GathererProtocol(HasChipLocation chip) { - this(chip.asChipLocation(), false); - } - } - - /** - * Contains flags for seen missing sequence numbers. - * - * @author Alan Stokes - */ - private static final class SeenFlags { - boolean seenEnd; - - boolean seenAll; - } - - /** - * Exception thrown when something mad comes back off SpiNNaker. - * - * @author Donal Fellows - */ - static class BadDataInMessageException extends RuntimeException { - private static final long serialVersionUID = 1L; - - BadDataInMessageException(int code, IntBuffer message) { - super("unexpected response code: " + toUnsignedLong(code)); - log.warn("bad message payload: {}", range(0, message.limit()) - .map(i -> message.get(i)).boxed().collect(toList())); - } - } - - /** - * Exception thrown when something mad comes back off SpiNNaker. - * - * @author Donal Fellows - * @author Alan Stokes - */ - static class CrazySequenceNumberException extends RuntimeException { - private static final long serialVersionUID = 1L; - - CrazySequenceNumberException(int remaining, IntBuffer message) { - super("crazy number of missing packets: " - + toUnsignedLong(remaining)); - log.warn("bad message payload: {}", range(0, message.limit()) - .map(i -> message.get(i)).boxed().collect(toList())); } } } diff --git a/SpiNNaker-front-end/src/test/java/uk/ac/manchester/spinnaker/front_end/TestFrontEnd.java b/SpiNNaker-front-end/src/test/java/uk/ac/manchester/spinnaker/front_end/TestFrontEnd.java index 9e534359a6..ab07c39825 100644 --- a/SpiNNaker-front-end/src/test/java/uk/ac/manchester/spinnaker/front_end/TestFrontEnd.java +++ b/SpiNNaker-front-end/src/test/java/uk/ac/manchester/spinnaker/front_end/TestFrontEnd.java @@ -31,7 +31,7 @@ import org.opentest4j.AssertionFailedError; import uk.ac.manchester.spinnaker.front_end.download.request.Gather; -import uk.ac.manchester.spinnaker.front_end.dse.FastExecuteDataSpecification; +import uk.ac.manchester.spinnaker.front_end.dse.FastMCExecuteDataSpecification; import uk.ac.manchester.spinnaker.front_end.dse.HostExecuteDataSpecification; import uk.ac.manchester.spinnaker.utils.ValueHolder; @@ -98,7 +98,7 @@ void testHelp() throws Exception { var msg = tapSystemOutNormalized(() -> { runMainExpecting(0, "help"); }); - var requiredSubcommands = List.of("dse_app_mon", "gather"); + var requiredSubcommands = List.of("dse_app_mon_mc", "gather"); var requiredArgs = List.of("", ""); for (var cmd: requiredSubcommands) { assertContainsInOrder(msg, cmd); @@ -175,11 +175,11 @@ void testAdvancedDSE() throws Exception { var runFolder = "target/test/AdvancedDSE"; new File(runFolder).mkdirs(); - var saved = CommandLineInterface.fastFactory; + var saved = CommandLineInterface.fastMCFactory; var called = new ValueHolder<>("none"); try { - CommandLineInterface.fastFactory = (t, m, g, r, - db) -> new FastExecuteDataSpecification(t, m, g, r, null) { + CommandLineInterface.fastMCFactory = (t, m, g, r, + db) -> new FastMCExecuteDataSpecification(t, m, g, r, null) { @Override public void loadCores() { called.setValue("mon"); @@ -192,17 +192,17 @@ protected void buildMaps(List gatherers) { }; var msg = tapSystemErrNormalized(() -> { - runMainExpecting(2, "dse_app_mon"); + runMainExpecting(2, "dse_app_mon_mc"); }); assertContainsInOrder(msg, "", "", "", "[]"); assertEquals("none", called.getValue()); - runMainExpecting(0, "dse_app_mon", gatherFile, machineFile, dsFile, - runFolder); + runMainExpecting(0, "dse_app_mon_mc", gatherFile, machineFile, + dsFile, runFolder); assertEquals("mon", called.getValue()); } finally { - CommandLineInterface.fastFactory = saved; + CommandLineInterface.fastMCFactory = saved; } }