From 460bd48acfce41bae45ccee1ce25b75eae4bd054 Mon Sep 17 00:00:00 2001 From: Dan Rusei Date: Sat, 21 Feb 2026 21:02:06 +0200 Subject: [PATCH] correct the integration tests and tests on pull requests --- .github/workflows/integration-tests.yml | 57 ++++ .../client/it/FanoutExclusiveIT.java | 4 +- .../client/it/PartitionedBasicIT.java | 24 +- .../client/it/ReliableDispatchIT.java | 19 +- .../client/it/SchemaVersionIT.java | 244 +++++++++--------- .../client/it/SubscriptionBasicIT.java | 19 +- 6 files changed, 214 insertions(+), 153 deletions(-) create mode 100644 .github/workflows/integration-tests.yml diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml new file mode 100644 index 0000000..8185494 --- /dev/null +++ b/.github/workflows/integration-tests.yml @@ -0,0 +1,57 @@ +name: Integration Tests + +on: + push: + branches: [main] + pull_request: + +jobs: + integration-tests: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Java + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: '21' + cache: maven + + - name: Start test cluster + working-directory: docker + run: docker compose up -d + + - name: Wait for broker to be healthy + working-directory: docker + run: | + echo "Waiting for broker to become healthy..." + for i in $(seq 1 30); do + status=$(docker inspect --format='{{.State.Health.Status}}' danube-test-broker 2>/dev/null || echo "missing") + if [ "$status" = "healthy" ]; then + echo "Broker is healthy." + break + fi + if [ "$i" -eq 30 ]; then + echo "Broker failed to become healthy (status: $status)" + docker compose logs broker + exit 1 + fi + echo " attempt $i/30 — status: $status" + sleep 5 + done + + - name: Run integration tests + run: mvn -pl danube-client -am verify -P integration-tests + + - name: Print broker logs on failure + if: failure() + working-directory: docker + run: docker compose logs broker + + - name: Stop test cluster + if: always() + working-directory: docker + run: docker compose down -v diff --git a/danube-client/src/test/java/com/danubemessaging/client/it/FanoutExclusiveIT.java b/danube-client/src/test/java/com/danubemessaging/client/it/FanoutExclusiveIT.java index 954f159..17bfb2d 100644 --- a/danube-client/src/test/java/com/danubemessaging/client/it/FanoutExclusiveIT.java +++ b/danube-client/src/test/java/com/danubemessaging/client/it/FanoutExclusiveIT.java @@ -7,7 +7,6 @@ import com.danubemessaging.client.model.StreamMessage; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,7 +37,8 @@ void fanoutExclusive() throws Exception { .build(); producer.create(); - record ConsumerEntry(String name, Consumer consumer, TestHelpers.MessageCollector collector) {} + record ConsumerEntry(String name, Consumer consumer, TestHelpers.MessageCollector collector) { + } List consumers = new CopyOnWriteArrayList<>(); for (int i = 0; i < 3; i++) { diff --git a/danube-client/src/test/java/com/danubemessaging/client/it/PartitionedBasicIT.java b/danube-client/src/test/java/com/danubemessaging/client/it/PartitionedBasicIT.java index 1739ceb..c5e3c16 100644 --- a/danube-client/src/test/java/com/danubemessaging/client/it/PartitionedBasicIT.java +++ b/danube-client/src/test/java/com/danubemessaging/client/it/PartitionedBasicIT.java @@ -7,11 +7,10 @@ import com.danubemessaging.client.model.StreamMessage; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import static com.danubemessaging.client.it.TestHelpers.*; import static org.junit.jupiter.api.Assertions.*; @@ -22,7 +21,7 @@ */ class PartitionedBasicIT { - private void runPartitionedBasic(String topicPrefix, SubType subType) { + private void runPartitionedBasic(String topicPrefix, SubType subType) throws Exception { DanubeClient client = newClient(); String topic = uniqueTopic(topicPrefix); int partitions = 3; @@ -43,21 +42,25 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) { consumer.subscribe(); try { - var publisher = consumer.receive(); + String[] expected = { "Hello Danube 1", "Hello Danube 2", "Hello Danube 3" }; + + // Attach collector BEFORE sending + var collector = new TestHelpers.MessageCollector(expected.length); + consumer.receive().subscribe(collector); Thread.sleep(300); - String[] expected = {"Hello Danube 1", "Hello Danube 2", "Hello Danube 3"}; for (String body : expected) { producer.send(body.getBytes(), Map.of()); } - List messages = receiveMessages(publisher, expected.length, Duration.ofSeconds(10)); + assertTrue(collector.latch.await(10, TimeUnit.SECONDS), + "Timeout: received " + collector.messages.size() + "/" + expected.length); // Verify all payloads received Set received = new HashSet<>(); Set partsSeen = new HashSet<>(); - for (StreamMessage msg : messages) { + for (StreamMessage msg : collector.messages) { received.add(new String(msg.payload())); partsSeen.add(msg.messageId().topicName()); consumer.ack(msg); @@ -72,9 +75,6 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) { String partName = topic + "-part-" + i; assertTrue(partsSeen.contains(partName), "missing partition: " + partName); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Interrupted"); } finally { consumer.close(); client.close(); @@ -82,12 +82,12 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) { } @Test - void partitionedBasicExclusive() { + void partitionedBasicExclusive() throws Exception { runPartitionedBasic("/default/part_basic_excl", SubType.EXCLUSIVE); } @Test - void partitionedBasicShared() { + void partitionedBasicShared() throws Exception { runPartitionedBasic("/default/part_basic_shared", SubType.SHARED); } } diff --git a/danube-client/src/test/java/com/danubemessaging/client/it/ReliableDispatchIT.java b/danube-client/src/test/java/com/danubemessaging/client/it/ReliableDispatchIT.java index c4ec3e5..6e0677c 100644 --- a/danube-client/src/test/java/com/danubemessaging/client/it/ReliableDispatchIT.java +++ b/danube-client/src/test/java/com/danubemessaging/client/it/ReliableDispatchIT.java @@ -8,7 +8,6 @@ import com.danubemessaging.client.model.StreamMessage; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.Arrays; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -51,20 +50,12 @@ private void runReliableBasic(String topicPrefix, SubType subType) throws Except Arrays.fill(blobData, (byte) 'D'); int messageCount = 20; - var publisher = consumer.receive(); - - Thread.sleep(400); - - for (int i = 0; i < messageCount; i++) { - producer.send(blobData, Map.of()); - } - - // Receive and ack inline + // Attach subscriber BEFORE sending so the receive stream is ready AtomicInteger count = new AtomicInteger(); AtomicReference error = new AtomicReference<>(); CountDownLatch done = new CountDownLatch(1); - publisher.subscribe(new TestHelpers.MessageCollector(messageCount) { + consumer.receive().subscribe(new TestHelpers.MessageCollector(messageCount) { @Override public void onNext(StreamMessage item) { try { @@ -81,6 +72,12 @@ public void onNext(StreamMessage item) { } }); + Thread.sleep(400); + + for (int i = 0; i < messageCount; i++) { + producer.send(blobData, Map.of()); + } + assertTrue(done.await(15, TimeUnit.SECONDS), "Timeout: received " + count.get() + "/" + messageCount); diff --git a/danube-client/src/test/java/com/danubemessaging/client/it/SchemaVersionIT.java b/danube-client/src/test/java/com/danubemessaging/client/it/SchemaVersionIT.java index 6863c47..1b0afe2 100644 --- a/danube-client/src/test/java/com/danubemessaging/client/it/SchemaVersionIT.java +++ b/danube-client/src/test/java/com/danubemessaging/client/it/SchemaVersionIT.java @@ -9,8 +9,8 @@ import com.danubemessaging.client.schema.SchemaType; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.Map; +import java.util.concurrent.TimeUnit; import static com.danubemessaging.client.it.TestHelpers.*; import static org.junit.jupiter.api.Assertions.*; @@ -20,124 +20,130 @@ */ class SchemaVersionIT { - @Test - void producerPinToVersion() throws Exception { - DanubeClient client = newClient(); - String topic = uniqueTopic("/default/pin_version"); - SchemaRegistryClient schemaClient = client.newSchemaRegistry(); - - String subject = uniqueTopic("version-pin-java"); - - // Register V1 - String schemaV1 = """ - {"type": "object", "properties": {"id": {"type": "integer"}}, "required": ["id"]}"""; - schemaClient.registerSchema( - schemaClient.newRegistration() - .withSubject(subject) - .withSchemaType(SchemaType.JSON_SCHEMA) - .withSchemaDefinition(schemaV1.getBytes())); - - // Register V2 - String schemaV2 = """ - {"type": "object", "properties": {"id": {"type": "integer"}, "name": {"type": "string"}}, "required": ["id"]}"""; - schemaClient.registerSchema( - schemaClient.newRegistration() - .withSubject(subject) - .withSchemaType(SchemaType.JSON_SCHEMA) - .withSchemaDefinition(schemaV2.getBytes())); - - // Producer pinned to V1 - Producer producer = client.newProducer() - .withTopic(topic) - .withName("producer_v1_pinned") - .withSchemaPinnedVersion(subject, 1) - .build(); - producer.create(); - - // Consumer - Consumer consumer = client.newConsumer() - .withTopic(topic) - .withConsumerName("consumer_version") - .withSubscription("sub_version") - .withSubscriptionType(SubType.EXCLUSIVE) - .build(); - consumer.subscribe(); - - try { - var publisher = consumer.receive(); - Thread.sleep(200); - - producer.send("{\"id\": 123}".getBytes(), Map.of()); - - StreamMessage msg = receiveOne(publisher, Duration.ofSeconds(5)); - - assertNotNull(msg.schemaVersion(), "expected schema_version to be set"); - assertEquals(1, msg.schemaVersion(), "expected schema_version=1"); - - consumer.ack(msg); - } finally { - consumer.close(); - client.close(); + @Test + void producerPinToVersion() throws Exception { + DanubeClient client = newClient(); + String topic = uniqueTopic("/default/pin_version"); + SchemaRegistryClient schemaClient = client.newSchemaRegistry(); + + String subject = uniqueTopic("version-pin-java"); + + // Register V1 + String schemaV1 = """ + {"type": "object", "properties": {"id": {"type": "integer"}}, "required": ["id"]}"""; + schemaClient.registerSchema( + schemaClient.newRegistration() + .withSubject(subject) + .withSchemaType(SchemaType.JSON_SCHEMA) + .withSchemaDefinition(schemaV1.getBytes())); + + // Register V2 + String schemaV2 = """ + {"type": "object", "properties": {"id": {"type": "integer"}, "name": {"type": "string"}}, "required": ["id"]}"""; + schemaClient.registerSchema( + schemaClient.newRegistration() + .withSubject(subject) + .withSchemaType(SchemaType.JSON_SCHEMA) + .withSchemaDefinition(schemaV2.getBytes())); + + // Producer pinned to V1 + Producer producer = client.newProducer() + .withTopic(topic) + .withName("producer_v1_pinned") + .withSchemaPinnedVersion(subject, 1) + .build(); + producer.create(); + + // Consumer + Consumer consumer = client.newConsumer() + .withTopic(topic) + .withConsumerName("consumer_version") + .withSubscription("sub_version") + .withSubscriptionType(SubType.EXCLUSIVE) + .build(); + consumer.subscribe(); + + try { + // Attach collector BEFORE sending + var collector = new TestHelpers.MessageCollector(1); + consumer.receive().subscribe(collector); + Thread.sleep(200); + + producer.send("{\"id\": 123}".getBytes(), Map.of()); + + assertTrue(collector.latch.await(5, TimeUnit.SECONDS), "Timeout waiting for message"); + StreamMessage msg = collector.messages.getFirst(); + + assertNotNull(msg.schemaVersion(), "expected schema_version to be set"); + assertEquals(1, msg.schemaVersion(), "expected schema_version=1"); + + consumer.ack(msg); + } finally { + consumer.close(); + client.close(); + } } - } - - @Test - void producerLatestVersion() throws Exception { - DanubeClient client = newClient(); - String topic = uniqueTopic("/default/latest_version"); - SchemaRegistryClient schemaClient = client.newSchemaRegistry(); - - String subject = uniqueTopic("latest-version-java"); - - // Register V1 - String schemaV1 = """ - {"type": "object", "properties": {"a": {"type": "string"}}}"""; - schemaClient.registerSchema( - schemaClient.newRegistration() - .withSubject(subject) - .withSchemaType(SchemaType.JSON_SCHEMA) - .withSchemaDefinition(schemaV1.getBytes())); - - // Register V2 - String schemaV2 = """ - {"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "integer"}}}"""; - schemaClient.registerSchema( - schemaClient.newRegistration() - .withSubject(subject) - .withSchemaType(SchemaType.JSON_SCHEMA) - .withSchemaDefinition(schemaV2.getBytes())); - - // Producer without version pin (should use latest V2) - Producer producer = client.newProducer() - .withTopic(topic) - .withName("producer_latest") - .withSchemaLatest(subject) - .build(); - producer.create(); - - // Consumer - Consumer consumer = client.newConsumer() - .withTopic(topic) - .withConsumerName("consumer_latest") - .withSubscription("sub_latest") - .build(); - consumer.subscribe(); - - try { - var publisher = consumer.receive(); - Thread.sleep(200); - - producer.send("{\"a\": \"test\"}".getBytes(), Map.of()); - - StreamMessage msg = receiveOne(publisher, Duration.ofSeconds(5)); - - assertNotNull(msg.schemaVersion(), "expected schema_version to be set"); - assertEquals(2, msg.schemaVersion(), "expected schema_version=2 (latest)"); - - consumer.ack(msg); - } finally { - consumer.close(); - client.close(); + + @Test + void producerLatestVersion() throws Exception { + DanubeClient client = newClient(); + String topic = uniqueTopic("/default/latest_version"); + SchemaRegistryClient schemaClient = client.newSchemaRegistry(); + + String subject = uniqueTopic("latest-version-java"); + + // Register V1 + String schemaV1 = """ + {"type": "object", "properties": {"a": {"type": "string"}}}"""; + schemaClient.registerSchema( + schemaClient.newRegistration() + .withSubject(subject) + .withSchemaType(SchemaType.JSON_SCHEMA) + .withSchemaDefinition(schemaV1.getBytes())); + + // Register V2 + String schemaV2 = """ + {"type": "object", "properties": {"a": {"type": "string"}, "b": {"type": "integer"}}}"""; + schemaClient.registerSchema( + schemaClient.newRegistration() + .withSubject(subject) + .withSchemaType(SchemaType.JSON_SCHEMA) + .withSchemaDefinition(schemaV2.getBytes())); + + // Producer without version pin (should use latest V2) + Producer producer = client.newProducer() + .withTopic(topic) + .withName("producer_latest") + .withSchemaLatest(subject) + .build(); + producer.create(); + + // Consumer + Consumer consumer = client.newConsumer() + .withTopic(topic) + .withConsumerName("consumer_latest") + .withSubscription("sub_latest") + .build(); + consumer.subscribe(); + + try { + // Attach collector BEFORE sending + var collector = new TestHelpers.MessageCollector(1); + consumer.receive().subscribe(collector); + Thread.sleep(200); + + producer.send("{\"a\": \"test\"}".getBytes(), Map.of()); + + assertTrue(collector.latch.await(5, TimeUnit.SECONDS), "Timeout waiting for message"); + StreamMessage msg = collector.messages.getFirst(); + + assertNotNull(msg.schemaVersion(), "expected schema_version to be set"); + assertEquals(2, msg.schemaVersion(), "expected schema_version=2 (latest)"); + + consumer.ack(msg); + } finally { + consumer.close(); + client.close(); + } } - } } diff --git a/danube-client/src/test/java/com/danubemessaging/client/it/SubscriptionBasicIT.java b/danube-client/src/test/java/com/danubemessaging/client/it/SubscriptionBasicIT.java index 70e19b0..75a6a82 100644 --- a/danube-client/src/test/java/com/danubemessaging/client/it/SubscriptionBasicIT.java +++ b/danube-client/src/test/java/com/danubemessaging/client/it/SubscriptionBasicIT.java @@ -7,8 +7,8 @@ import com.danubemessaging.client.model.StreamMessage; import org.junit.jupiter.api.Test; -import java.time.Duration; import java.util.Map; +import java.util.concurrent.TimeUnit; import static com.danubemessaging.client.it.TestHelpers.*; import static org.junit.jupiter.api.Assertions.*; @@ -19,7 +19,7 @@ */ class SubscriptionBasicIT { - private void runBasicSubscription(String topicPrefix, SubType subType) { + private void runBasicSubscription(String topicPrefix, SubType subType) throws Exception { DanubeClient client = newClient(); String topic = uniqueTopic(topicPrefix); @@ -38,21 +38,22 @@ private void runBasicSubscription(String topicPrefix, SubType subType) { consumer.subscribe(); try { - var publisher = consumer.receive(); + // Attach subscriber BEFORE sending so the receive stream is ready + var collector = new TestHelpers.MessageCollector(1); + consumer.receive().subscribe(collector); Thread.sleep(300); byte[] payload = "Hello Danube".getBytes(); producer.send(payload, Map.of()); - StreamMessage msg = receiveOne(publisher, Duration.ofSeconds(10)); + assertTrue(collector.latch.await(10, TimeUnit.SECONDS), + "Timeout waiting for message"); + StreamMessage msg = collector.messages.getFirst(); assertEquals("Hello Danube", new String(msg.payload())); consumer.ack(msg); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - fail("Interrupted"); } finally { consumer.close(); client.close(); @@ -60,12 +61,12 @@ private void runBasicSubscription(String topicPrefix, SubType subType) { } @Test - void basicSubscriptionShared() { + void basicSubscriptionShared() throws Exception { runBasicSubscription("/default/sub_basic_shared", SubType.SHARED); } @Test - void basicSubscriptionExclusive() { + void basicSubscriptionExclusive() throws Exception { runBasicSubscription("/default/sub_basic_exclusive", SubType.EXCLUSIVE); } }