From e9e34a22fe4460bbbcb7c04fa93e8b16f3b673d5 Mon Sep 17 00:00:00 2001 From: "wangjiahua.wjh" Date: Tue, 9 Jun 2026 11:38:31 +0800 Subject: [PATCH] [ISSUE #10442] Optimize message properties encode/decode to reduce allocation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Focuses on the properties encode/decode hot path (broker send + dispatch): - MessageDecoder.string2messageProperties: right-size HashMap from separator count instead of fixed capacity 128; intern well-known property keys via length-bucketed regionMatches (skip substring+hash); intern frequent values ("0","1","true","false","DefaultRegion"). - MessageDecoder.messageProperties2Bytes: direct UTF-8 byte serialization of Map entries, skipping the StringBuilder→String→getBytes round-trip. - MessageDecoder.bytes2messageProperties: parse directly from byte[] with ASCII key matching, skipping intermediate String allocation. - MessageDecoder.messageProperties2String: reuse ThreadLocal StringBuilder with capacity cap to bound per-thread memory. - MessageConst: add STRING_INTERN_MAP and STRING_INTERN_BY_LEN for canonical key interning. - MessageExtBrokerInner: add propertiesData byte[] cache field to short-circuit propertiesString→byte[] re-encoding on the write path. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../rocketmq/common/message/MessageConst.java | 21 + .../common/message/MessageDecoder.java | 371 ++++++++++++++++-- .../common/message/MessageExtBrokerInner.java | 37 +- .../common/message/MessageDecoderTest.java | 125 ++++++ 4 files changed, 522 insertions(+), 32 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java index 77ab3f2cb9f..435cdc4f6b5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java @@ -16,7 +16,11 @@ */ package org.apache.rocketmq.common.message; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; +import java.util.Map; public class MessageConst { public static final String PROPERTY_KEYS = "KEYS"; @@ -102,6 +106,8 @@ public class MessageConst { public static final HashSet STRING_HASH_SET = new HashSet<>(64); + static final String[][] STRING_INTERN_BY_LEN; + public static final String PROPERTY_TIMER_ENQUEUE_MS = "TIMER_ENQUEUE_MS"; public static final String PROPERTY_TIMER_DEQUEUE_MS = "TIMER_DEQUEUE_MS"; public static final String PROPERTY_TIMER_ROLL_TIMES = "TIMER_ROLL_TIMES"; @@ -173,5 +179,20 @@ public class MessageConst { STRING_HASH_SET.add(PROPERTY_CRC32); STRING_HASH_SET.add(PROPERTY_PRIORITY); STRING_HASH_SET.add(PROPERTY_LITE_TOPIC); + int maxLen = 0; + for (String key : STRING_HASH_SET) { + if (key.length() > maxLen) { + maxLen = key.length(); + } + } + String[][] byLen = new String[maxLen + 1][]; + Map> bucketBuilder = new HashMap<>(); + for (String key : STRING_HASH_SET) { + bucketBuilder.computeIfAbsent(key.length(), k -> new ArrayList<>()).add(key); + } + for (Map.Entry> e : bucketBuilder.entrySet()) { + byLen[e.getKey()] = e.getValue().toArray(new String[0]); + } + STRING_INTERN_BY_LEN = byLen; } } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java index 713f9405ea9..e09b5dfeed7 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageDecoder.java @@ -602,29 +602,35 @@ public static List decodes(ByteBuffer byteBuffer, final boolean read return msgExts; } + /** + * Per-thread reusable {@link StringBuilder} used by {@link #messageProperties2String} on + * the hot encode path. The retained capacity is capped by {@link #REUSABLE_SB_CAP_LIMIT} + * to avoid unbounded growth after a single oversized message. + */ + private static final ThreadLocal REUSABLE_SB = + ThreadLocal.withInitial(() -> new StringBuilder(256)); + + /** Maximum retained capacity (in chars) of {@link #REUSABLE_SB}. */ + private static final int REUSABLE_SB_CAP_LIMIT = 64 * 1024; + public static String messageProperties2String(Map properties) { if (properties == null) { return ""; } - int len = 0; - for (final Map.Entry entry : properties.entrySet()) { - final String name = entry.getKey(); - final String value = entry.getValue(); - if (value == null) { - continue; - } - if (name != null) { - len += name.length(); - } - len += value.length(); - len += 2; // separator + StringBuilder sb = REUSABLE_SB.get(); + // Trim long-lived per-thread memory if a previous message inflated the buffer. + if (sb.capacity() > REUSABLE_SB_CAP_LIMIT) { + sb = new StringBuilder(256); + REUSABLE_SB.set(sb); } - StringBuilder sb = new StringBuilder(len); + sb.setLength(0); for (final Map.Entry entry : properties.entrySet()) { final String name = entry.getKey(); final String value = entry.getValue(); - if (value == null) { + // Skip entries with null name or value to keep output well-formed and to + // match {@link #messageProperties2Bytes}. + if (name == null || value == null) { continue; } sb.append(name); @@ -635,31 +641,333 @@ public static String messageProperties2String(Map properties) { return sb.toString(); } - public static Map string2messageProperties(final String properties) { - Map map = new HashMap<>(128); - if (properties != null) { - int len = properties.length(); - int index = 0; - while (index < len) { - int newIndex = properties.indexOf(PROPERTY_SEPARATOR, index); - if (newIndex < 0) { - newIndex = len; + /** + * UTF-8 byte serialization of properties, equivalent in content to + * {@code messageProperties2String(properties).getBytes(UTF_8)} but skipping the + * StringBuilder + String + String.getBytes() round-trip on the broker write hot path. + * Both this method and {@link #messageProperties2String} skip entries whose key or value + * is {@code null}, so the encoded bytes are identical for any input map. + *

Returns {@code null} for null/empty maps (encoders treat null and 0-length + * identically); callers that need a 0-length array should check for null. + */ + public static byte[] messageProperties2Bytes(Map properties) { + if (properties == null || properties.isEmpty()) { + return null; + } + int totalLen = 0; + for (final Map.Entry entry : properties.entrySet()) { + final String name = entry.getKey(); + final String value = entry.getValue(); + if (name == null || value == null) { + continue; + } + totalLen += utf8ByteLength(name); + totalLen += utf8ByteLength(value); + totalLen += 2; + } + if (totalLen == 0) { + return null; + } + byte[] out = new byte[totalLen]; + int idx = 0; + for (final Map.Entry entry : properties.entrySet()) { + final String name = entry.getKey(); + final String value = entry.getValue(); + if (name == null || value == null) { + continue; + } + idx = writeUtf8(name, out, idx); + out[idx++] = (byte) NAME_VALUE_SEPARATOR; + idx = writeUtf8(value, out, idx); + out[idx++] = (byte) PROPERTY_SEPARATOR; + } + return out; + } + + /** + * UTF-8 byte length of {@code s}. Matches {@link String#getBytes(java.nio.charset.Charset) + * String.getBytes(StandardCharsets.UTF_8)} semantics: unpaired surrogate code units are + * each replaced by a single {@code '?'} byte (the JDK's hard-coded substitution for + * malformed UTF-8 in {@code java.lang.StringCoding}). + */ + static int utf8ByteLength(String s) { + int len = s.length(); + int byteLen = 0; + for (int i = 0; i < len; i++) { + char c = s.charAt(i); + if (c < 0x80) { + byteLen++; + } else if (c < 0x800) { + byteLen += 2; + } else if (Character.isHighSurrogate(c)) { + if (i + 1 < len && Character.isLowSurrogate(s.charAt(i + 1))) { + byteLen += 4; + i++; + } else { + // Unpaired high surrogate -> '?' (1 byte), matching JDK behavior. + byteLen += 1; + } + } else if (Character.isLowSurrogate(c)) { + // Unpaired low surrogate -> '?' (1 byte), matching JDK behavior. + byteLen += 1; + } else { + byteLen += 3; + } + } + return byteLen; + } + + /** + * UTF-8 encode {@code s} into {@code out} starting at {@code offset}. Matches + * {@link String#getBytes(java.nio.charset.Charset) String.getBytes(StandardCharsets.UTF_8)} + * semantics: unpaired surrogate code units are each replaced by a single {@code '?'} byte + * (the JDK's hard-coded substitution for malformed UTF-8 in {@code java.lang.StringCoding}). + */ + static int writeUtf8(String s, byte[] out, int offset) { + int len = s.length(); + for (int i = 0; i < len; i++) { + char c = s.charAt(i); + if (c < 0x80) { + out[offset++] = (byte) c; + } else if (c < 0x800) { + out[offset++] = (byte) (0xC0 | (c >>> 6)); + out[offset++] = (byte) (0x80 | (c & 0x3F)); + } else if (Character.isHighSurrogate(c)) { + if (i + 1 < len && Character.isLowSurrogate(s.charAt(i + 1))) { + int cp = Character.toCodePoint(c, s.charAt(++i)); + out[offset++] = (byte) (0xF0 | (cp >>> 18)); + out[offset++] = (byte) (0x80 | ((cp >>> 12) & 0x3F)); + out[offset++] = (byte) (0x80 | ((cp >>> 6) & 0x3F)); + out[offset++] = (byte) (0x80 | (cp & 0x3F)); + } else { + // Unpaired high surrogate -> '?', matching JDK behavior. + out[offset++] = (byte) '?'; } - if (newIndex - index >= 3) { - int kvSepIndex = properties.indexOf(NAME_VALUE_SEPARATOR, index); - if (kvSepIndex > index && kvSepIndex < newIndex - 1) { - String k = properties.substring(index, kvSepIndex); - String v = properties.substring(kvSepIndex + 1, newIndex); - map.put(k, v); + } else if (Character.isLowSurrogate(c)) { + // Unpaired low surrogate -> '?', matching JDK behavior. + out[offset++] = (byte) '?'; + } else { + out[offset++] = (byte) (0xE0 | (c >>> 12)); + out[offset++] = (byte) (0x80 | ((c >>> 6) & 0x3F)); + out[offset++] = (byte) (0x80 | (c & 0x3F)); + } + } + return offset; + } + + public static Map string2messageProperties(final String properties) { + return string2messageProperties(properties, 0); + } + + /** + * Variant of {@link #string2messageProperties(String)} that reserves capacity for + * {@code extraEntries} additional entries the caller intends to put afterwards. Used + * on the broker send path where MSG_REGION/TRACE_SWITCH/CLUSTER/... are appended to + * the decoded Map; pre-sizing avoids a HashMap resize when those puts cross the load + * factor threshold of the as-decoded capacity. + */ + public static Map string2messageProperties(final String properties, final int extraEntries) { + if (properties == null || properties.isEmpty()) { + return new HashMap<>(Math.max(4, extraEntries)); + } + final int len = properties.length(); + int estEntries = 0; + for (int i = 0; i < len; i++) { + if (properties.charAt(i) == PROPERTY_SEPARATOR) { + estEntries++; + } + } + estEntries = Math.max(estEntries, 1); + HashMap map = new HashMap<>((estEntries + extraEntries) * 4 / 3 + 1); + int index = 0; + while (index < len) { + int newIndex = properties.indexOf(PROPERTY_SEPARATOR, index); + if (newIndex < 0) { + newIndex = len; + } + if (newIndex - index >= 3) { + int kvSepIndex = properties.indexOf(NAME_VALUE_SEPARATOR, index); + if (kvSepIndex > index && kvSepIndex < newIndex - 1) { + int klen = kvSepIndex - index; + String k = null; + if (klen < MessageConst.STRING_INTERN_BY_LEN.length) { + String[] candidates = MessageConst.STRING_INTERN_BY_LEN[klen]; + if (candidates != null) { + for (String candidate : candidates) { + if (properties.regionMatches(index, candidate, 0, klen)) { + k = candidate; + break; + } + } + } + } + if (k == null) { + k = properties.substring(index, kvSepIndex); } + int vOffset = kvSepIndex + 1; + int vLen = newIndex - vOffset; + String v = internStringValue(properties, vOffset, vLen); + if (v == null) { + v = properties.substring(vOffset, newIndex); + } + map.put(k, v); } - index = newIndex + 1; } + index = newIndex + 1; } return map; } + /** + * Variant of {@link #string2messageProperties(String)} that parses directly from a UTF-8 + * byte array, skipping the intermediate {@code new String(bytes, ...)} allocation. The + * separators {@link #NAME_VALUE_SEPARATOR} (0x01) and {@link #PROPERTY_SEPARATOR} (0x02) + * are ASCII single bytes that never appear inside multi-byte UTF-8 sequences, so byte-level + * scanning is safe. Canonical (intern) keys are ASCII and matched byte-by-byte. + *

Always returns a fresh {@link HashMap} to keep the same downstream contract as + * {@link #string2messageProperties} (mutable {@code Entry.setValue}, no aliasing across + * decodes on the same thread). + */ + public static Map bytes2messageProperties(final byte[] bytes, final int offset, + final int length) { + if (bytes == null || length <= 0) { + return new HashMap<>(4); + } + final int end = offset + length; + // Estimate entries: count PROPERTY_SEPARATOR occurrences to pre-size the HashMap and + // avoid resize churn under load. + int estEntries = 0; + for (int i = offset; i < end; i++) { + if (bytes[i] == PROPERTY_SEPARATOR) { + estEntries++; + } + } + estEntries = Math.max(estEntries, 1); + HashMap map = new HashMap<>(estEntries * 4 / 3 + 1); + int index = offset; + while (index < end) { + int sepIdx = end; + for (int i = index; i < end; i++) { + if (bytes[i] == PROPERTY_SEPARATOR) { + sepIdx = i; + break; + } + } + if (sepIdx - index >= 3) { + int kvSepIdx = -1; + for (int i = index; i < sepIdx; i++) { + if (bytes[i] == NAME_VALUE_SEPARATOR) { + kvSepIdx = i; + break; + } + } + if (kvSepIdx > index && kvSepIdx < sepIdx - 1) { + int klen = kvSepIdx - index; + String k = null; + if (klen < MessageConst.STRING_INTERN_BY_LEN.length) { + String[] candidates = MessageConst.STRING_INTERN_BY_LEN[klen]; + if (candidates != null) { + for (String candidate : candidates) { + if (asciiBytesEqual(bytes, index, candidate, klen)) { + k = candidate; + break; + } + } + } + } + if (k == null) { + k = new String(bytes, index, klen, CHARSET_UTF8); + } + int vOffset = kvSepIdx + 1; + int vLen = sepIdx - kvSepIdx - 1; + String v = internValue(bytes, vOffset, vLen); + if (v == null) { + v = new String(bytes, vOffset, vLen, CHARSET_UTF8); + } + map.put(k, v); + } + } + index = sepIdx + 1; + } + return map; + } + + private static final String[][] VALUE_INTERN_BY_LEN; + static { + String[] frequentValues = {"0", "1", "true", "false", "DefaultRegion"}; + int maxLen = 0; + for (String s : frequentValues) { + maxLen = Math.max(maxLen, s.length()); + } + VALUE_INTERN_BY_LEN = new String[maxLen + 1][]; + for (String s : frequentValues) { + int len = s.length(); + if (VALUE_INTERN_BY_LEN[len] == null) { + VALUE_INTERN_BY_LEN[len] = new String[]{s}; + } else { + String[] old = VALUE_INTERN_BY_LEN[len]; + String[] arr = new String[old.length + 1]; + System.arraycopy(old, 0, arr, 0, old.length); + arr[old.length] = s; + VALUE_INTERN_BY_LEN[len] = arr; + } + } + } + + private static String internStringValue(String s, int offset, int len) { + if (len >= VALUE_INTERN_BY_LEN.length) { + return null; + } + String[] candidates = VALUE_INTERN_BY_LEN[len]; + if (candidates == null) { + return null; + } + for (String candidate : candidates) { + if (s.regionMatches(offset, candidate, 0, len)) { + return candidate; + } + } + return null; + } + + private static String internValue(byte[] bytes, int offset, int len) { + if (len >= VALUE_INTERN_BY_LEN.length) { + return null; + } + String[] candidates = VALUE_INTERN_BY_LEN[len]; + if (candidates == null) { + return null; + } + for (String candidate : candidates) { + if (asciiBytesEqual(bytes, offset, candidate, len)) { + return candidate; + } + } + return null; + } + + public static boolean asciiBytesMatchString(byte[] bytes, int offset, String s, int len) { + if (s.length() != len) { + return false; + } + for (int i = 0; i < len; i++) { + if (bytes[offset + i] != (byte) s.charAt(i)) { + return false; + } + } + return true; + } + + private static boolean asciiBytesEqual(byte[] bytes, int offset, String asciiCandidate, int len) { + for (int i = 0; i < len; i++) { + if (bytes[offset + i] != (byte) asciiCandidate.charAt(i)) { + return false; + } + } + return true; + } + public static byte[] encodeMessage(Message message) { //only need flag, body, properties byte[] body = message.getBody(); @@ -725,7 +1033,8 @@ public static Message decodeMessage(ByteBuffer byteBuffer) throws Exception { short propertiesLen = byteBuffer.getShort(); byte[] propertiesBytes = new byte[propertiesLen]; byteBuffer.get(propertiesBytes); - message.setProperties(string2messageProperties(new String(propertiesBytes, CHARSET_UTF8))); + // opt16: parse directly from bytes; skip the intermediate String allocation. + message.setProperties(bytes2messageProperties(propertiesBytes, 0, propertiesLen)); return message; } diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java index e4f02e2c9b4..84b2b6d3ed5 100644 --- a/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java +++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageExtBrokerInner.java @@ -27,6 +27,10 @@ public class MessageExtBrokerInner extends MessageExt { private static final long serialVersionUID = 7256001576878700634L; private String propertiesString; + /** Pre-encoded UTF-8 bytes for {@link #propertiesString}. Either set directly via + * {@link #setPropertiesData} on the broker write hot path (skipping the + * String round-trip), or lazily computed from {@link #propertiesString}. */ + private transient byte[] propertiesData; private long tagsCode; private ByteBuffer encodedBuff; @@ -59,13 +63,44 @@ public String getPropertiesString() { public void setPropertiesString(String propertiesString) { this.propertiesString = propertiesString; + this.propertiesData = null; + } + + public byte[] getPropertiesData() { + // Defensive copy: callers must not be able to mutate the cached encoded bytes, + // which are reused as-is by the encoder. The encode hot path should call + // {@link #getEffectivePropertiesData()} (package-private) to avoid this copy. + return propertiesData == null ? null : propertiesData.clone(); + } + + public void setPropertiesData(byte[] propertiesData) { + this.propertiesData = propertiesData; + } + + /** Encoder-side accessor: returns cached {@link #propertiesData} when set, + * otherwise lazily encodes {@link #propertiesString} and caches the result. + * Returns null if neither is set. + *

Package-private and intended for the broker encode hot path only. The returned + * array is the internal buffer (no defensive copy) and must not be mutated by callers. */ + byte[] getEffectivePropertiesData() { + if (propertiesData != null) { + return propertiesData; + } + if (propertiesString != null) { + propertiesData = propertiesString.getBytes(MessageDecoder.CHARSET_UTF8); + return propertiesData; + } + return null; } public void deleteProperty(String name) { super.clearProperty(name); if (propertiesString != null) { - this.setPropertiesString(MessageUtils.deleteProperty(propertiesString, name)); + this.propertiesString = MessageUtils.deleteProperty(propertiesString, name); + } + if (propertiesData != null) { + this.propertiesData = MessageDecoder.messageProperties2Bytes(getProperties()); } } diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java index 39bfbf5fb3f..cae12db29d4 100644 --- a/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java +++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageDecoderTest.java @@ -428,4 +428,129 @@ private void verifyMessageId(MessageExt msgExt) throws UnknownHostException { assertThat(messageId.getAddress()).isEqualTo(msgExt.getStoreHost()); assertThat(messageId.getOffset()).isEqualTo(msgExt.getCommitLogOffset()); } + + /** + * messageProperties2Bytes must produce the exact same bytes as + * messageProperties2String(...).getBytes(UTF_8) for any well-formed input map (i.e. + * for any map without null keys or values, which is the contract enforced by both + * methods after the fix). Covers ASCII, multi-byte UTF-8, and a paired surrogate. + */ + @Test + public void testMessageProperties2BytesMatchesString() { + java.util.Map props = new java.util.LinkedHashMap<>(); + props.put("KEYS", "abc"); // ASCII + props.put("UNIQ_KEY", "value-123"); // ASCII + props.put("\u4E2D\u6587\u952E", "\u4E2D\u6587\u503C"); // multi-byte UTF-8 (CJK) + props.put("emoji", "a\uD83D\uDE00b"); // paired surrogate (U+1F600) + + String s = MessageDecoder.messageProperties2String(props); + byte[] viaBytes = MessageDecoder.messageProperties2Bytes(props); + + assertThat(viaBytes).isNotNull(); + assertThat(viaBytes).isEqualTo(s.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + } + + /** + * Both messageProperties2String and messageProperties2Bytes must skip entries with + * a null key or null value, keeping the two encoding paths byte-for-byte identical. + */ + @Test + public void testMessageProperties2StringAndBytesSkipNullKeyAndValue() { + java.util.Map props = new java.util.LinkedHashMap<>(); + props.put("a", "1"); + props.put(null, "skip-null-key"); + props.put("b", null); + props.put("c", "2"); + + String s = MessageDecoder.messageProperties2String(props); + byte[] bytes = MessageDecoder.messageProperties2Bytes(props); + + // Only "a=1" and "c=2" should survive; "null" literal must not appear. + assertThat(s).doesNotContain("null"); + assertThat(bytes).isEqualTo(s.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + + java.util.Map roundTrip = MessageDecoder.string2messageProperties(s); + assertThat(roundTrip).hasSize(2); + assertThat(roundTrip.get("a")).isEqualTo("1"); + assertThat(roundTrip.get("c")).isEqualTo("2"); + } + + /** + * Custom UTF-8 encoder must match JDK String.getBytes(UTF_8) for unpaired surrogates, + * which JDK replaces with the U+FFFD replacement character (3 bytes EF BF BD). + */ + @Test + public void testMessageProperties2BytesUnpairedSurrogateMatchesJdk() { + // Lone high surrogate, lone low surrogate, and a high-then-non-low sequence. + String[] malformed = new String[] { + "\uD83D", // unpaired high + "\uDE00", // unpaired low + "x\uD83Dy", // high followed by non-low + "x\uDE00y", // low followed by non-low + "\uD83D\uD83D" // two highs in a row + }; + + for (String value : malformed) { + java.util.Map props = new java.util.LinkedHashMap<>(); + props.put("k", value); + + byte[] expected = MessageDecoder.messageProperties2String(props) + .getBytes(java.nio.charset.StandardCharsets.UTF_8); + byte[] actual = MessageDecoder.messageProperties2Bytes(props); + + assertThat(actual).as("input = %s", java.util.Arrays.toString(value.toCharArray())) + .isEqualTo(expected); + } + } + + /** + * bytes2messageProperties must return an independent (not ThreadLocal-shared) map so + * that consecutive decodes on the same thread don't corrupt each other. Also verifies + * the standard HashMap contract is preserved (mutable Entry.setValue). + */ + @Test + public void testBytes2messagePropertiesReturnsIndependentMap() { + java.util.Map first = new java.util.LinkedHashMap<>(); + first.put("k1", "v1"); + first.put("k2", "v2"); + + java.util.Map second = new java.util.LinkedHashMap<>(); + second.put("k3", "v3"); + + byte[] firstBytes = MessageDecoder.messageProperties2Bytes(first); + byte[] secondBytes = MessageDecoder.messageProperties2Bytes(second); + + java.util.Map firstDecoded = + MessageDecoder.bytes2messageProperties(firstBytes, 0, firstBytes.length); + java.util.Map secondDecoded = + MessageDecoder.bytes2messageProperties(secondBytes, 0, secondBytes.length); + + // Decoding the second message must not mutate the first decoded map. + assertThat(firstDecoded).hasSize(2); + assertThat(firstDecoded.get("k1")).isEqualTo("v1"); + assertThat(firstDecoded.get("k2")).isEqualTo("v2"); + assertThat(secondDecoded).hasSize(1); + assertThat(secondDecoded.get("k3")).isEqualTo("v3"); + + // Standard HashMap contract: Entry.setValue must work. + for (Map.Entry e : firstDecoded.entrySet()) { + e.setValue("mutated"); + } + assertThat(firstDecoded.get("k1")).isEqualTo("mutated"); + } + + /** + * messageProperties2Bytes returns null for null/empty input; callers (broker encoders) + * treat null and a 0-length byte[] identically. The Javadoc must reflect this. + */ + @Test + public void testMessageProperties2BytesNullAndEmpty() { + assertThat(MessageDecoder.messageProperties2Bytes(null)).isNull(); + assertThat(MessageDecoder.messageProperties2Bytes(new java.util.HashMap<>())).isNull(); + + // A map containing only null-valued entries also produces no bytes. + java.util.Map nullOnly = new java.util.HashMap<>(); + nullOnly.put("k", null); + assertThat(MessageDecoder.messageProperties2Bytes(nullOnly)).isNull(); + } } \ No newline at end of file