Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: Integration Tests

on:
push:
branches: [main]
pull_request:

jobs:
integration-tests:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Java
uses: actions/setup-java@v4
with:
distribution: temurin
java-version: '21'
cache: maven

- name: Start test cluster
working-directory: docker
run: docker compose up -d

- name: Wait for broker to be healthy
working-directory: docker
run: |
echo "Waiting for broker to become healthy..."
for i in $(seq 1 30); do
status=$(docker inspect --format='{{.State.Health.Status}}' danube-test-broker 2>/dev/null || echo "missing")
if [ "$status" = "healthy" ]; then
echo "Broker is healthy."
break
fi
if [ "$i" -eq 30 ]; then
echo "Broker failed to become healthy (status: $status)"
docker compose logs broker
exit 1
fi
echo " attempt $i/30 — status: $status"
sleep 5
done

- name: Run integration tests
run: mvn -pl danube-client -am verify -P integration-tests

- name: Print broker logs on failure
if: failure()
working-directory: docker
run: docker compose logs broker

- name: Stop test cluster
if: always()
working-directory: docker
run: docker compose down -v
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.danubemessaging.client.model.StreamMessage;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -38,7 +37,8 @@ void fanoutExclusive() throws Exception {
.build();
producer.create();

record ConsumerEntry(String name, Consumer consumer, TestHelpers.MessageCollector collector) {}
record ConsumerEntry(String name, Consumer consumer, TestHelpers.MessageCollector collector) {
}

List<ConsumerEntry> consumers = new CopyOnWriteArrayList<>();
for (int i = 0; i < 3; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@
import com.danubemessaging.client.model.StreamMessage;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.danubemessaging.client.it.TestHelpers.*;
import static org.junit.jupiter.api.Assertions.*;
Expand All @@ -22,7 +21,7 @@
*/
class PartitionedBasicIT {

private void runPartitionedBasic(String topicPrefix, SubType subType) {
private void runPartitionedBasic(String topicPrefix, SubType subType) throws Exception {
DanubeClient client = newClient();
String topic = uniqueTopic(topicPrefix);
int partitions = 3;
Expand All @@ -43,21 +42,25 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) {
consumer.subscribe();

try {
var publisher = consumer.receive();
String[] expected = { "Hello Danube 1", "Hello Danube 2", "Hello Danube 3" };

// Attach collector BEFORE sending
var collector = new TestHelpers.MessageCollector(expected.length);
consumer.receive().subscribe(collector);

Thread.sleep(300);

String[] expected = {"Hello Danube 1", "Hello Danube 2", "Hello Danube 3"};
for (String body : expected) {
producer.send(body.getBytes(), Map.of());
}

List<StreamMessage> messages = receiveMessages(publisher, expected.length, Duration.ofSeconds(10));
assertTrue(collector.latch.await(10, TimeUnit.SECONDS),
"Timeout: received " + collector.messages.size() + "/" + expected.length);

// Verify all payloads received
Set<String> received = new HashSet<>();
Set<String> partsSeen = new HashSet<>();
for (StreamMessage msg : messages) {
for (StreamMessage msg : collector.messages) {
received.add(new String(msg.payload()));
partsSeen.add(msg.messageId().topicName());
consumer.ack(msg);
Expand All @@ -72,22 +75,19 @@ private void runPartitionedBasic(String topicPrefix, SubType subType) {
String partName = topic + "-part-" + i;
assertTrue(partsSeen.contains(partName), "missing partition: " + partName);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
fail("Interrupted");
} finally {
consumer.close();
client.close();
}
}

@Test
void partitionedBasicExclusive() {
void partitionedBasicExclusive() throws Exception {
runPartitionedBasic("/default/part_basic_excl", SubType.EXCLUSIVE);
}

@Test
void partitionedBasicShared() {
void partitionedBasicShared() throws Exception {
runPartitionedBasic("/default/part_basic_shared", SubType.SHARED);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.danubemessaging.client.model.StreamMessage;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -51,20 +50,12 @@ private void runReliableBasic(String topicPrefix, SubType subType) throws Except
Arrays.fill(blobData, (byte) 'D');
int messageCount = 20;

var publisher = consumer.receive();

Thread.sleep(400);

for (int i = 0; i < messageCount; i++) {
producer.send(blobData, Map.of());
}

// Receive and ack inline
// Attach subscriber BEFORE sending so the receive stream is ready
AtomicInteger count = new AtomicInteger();
AtomicReference<Throwable> error = new AtomicReference<>();
CountDownLatch done = new CountDownLatch(1);

publisher.subscribe(new TestHelpers.MessageCollector(messageCount) {
consumer.receive().subscribe(new TestHelpers.MessageCollector(messageCount) {
@Override
public void onNext(StreamMessage item) {
try {
Expand All @@ -81,6 +72,12 @@ public void onNext(StreamMessage item) {
}
});

Thread.sleep(400);

for (int i = 0; i < messageCount; i++) {
producer.send(blobData, Map.of());
}

assertTrue(done.await(15, TimeUnit.SECONDS),
"Timeout: received " + count.get() + "/" + messageCount);

Expand Down
Loading