Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static com.google.api.gax.rpc.testing.FakeBatchableApi.callLabeledIntSquarer;
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth.assertWithMessage;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -599,15 +600,14 @@ void testPushCurrentBatchRunnable() throws Exception {
// Batcher present inside runnable should be GCed after following loop.
batcher.close();
batcher = null;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
isExecutorCancelled = pushBatchRunnable.isCancelled();
if (isExecutorCancelled) {
break;
}
Thread.sleep(DELAY_TIME * (1L << retry));
}
await()
.atMost(Duration.ofSeconds(5))
.until(
() -> {
System.gc();
System.runFinalization();
return pushBatchRunnable.isCancelled();
});
// ScheduledFuture should be isCancelled now.
assertThat(pushBatchRunnable.isCancelled()).isTrue();
}
Expand Down Expand Up @@ -733,18 +733,14 @@ public void splitResponse(
*/
@Test
void testUnclosedBatchersAreLogged() throws Exception {
final long DELAY_TIME = 30L;
int actualRemaining = 0;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
actualRemaining = BatcherReference.cleanQueue();
if (actualRemaining == 0) {
break;
}
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertThat(actualRemaining).isAtMost(0);
await()
.atMost(Duration.ofSeconds(5))
.until(
() -> {
System.gc();
System.runFinalization();
return BatcherReference.cleanQueue() == 0;
});
underTest = createDefaultBatcherImpl(batchingSettings, null);
Batcher<Integer, Integer> extraBatcher = createDefaultBatcherImpl(batchingSettings, null);

Expand All @@ -771,20 +767,16 @@ public boolean isLoggable(LogRecord record) {

underTest = null;
// That *should* have been the last reference. Try to reclaim it.
boolean success = false;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
int orphans = BatcherReference.cleanQueue();
if (orphans == 1) {
success = true;
break;
}
// Validates that there are no other batcher instance present while GC cleanup.
assertWithMessage("unexpected extra orphans").that(orphans).isEqualTo(0);
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertWithMessage("Batcher was not garbage collected").that(success).isTrue();
await()
.atMost(Duration.ofSeconds(5))
.until(
() -> {
System.gc();
System.runFinalization();
int orphans = BatcherReference.cleanQueue();
assertWithMessage("unexpected extra orphans").that(orphans).isAtMost(1);
return orphans == 1;
});

LogRecord lr;
synchronized (records) {
Expand All @@ -807,18 +799,14 @@ public boolean isLoggable(LogRecord record) {
@Test
void testClosedBatchersAreNotLogged() throws Exception {
// Clean out the existing instances
final long DELAY_TIME = 30L;
int actualRemaining = 0;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
actualRemaining = BatcherReference.cleanQueue();
if (actualRemaining == 0) {
break;
}
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertThat(actualRemaining).isAtMost(0);
await()
.atMost(Duration.ofSeconds(5))
.until(
() -> {
System.gc();
System.runFinalization();
return BatcherReference.cleanQueue() == 0;
});

// Capture logs
final List<LogRecord> records = new ArrayList<>(1);
Expand Down Expand Up @@ -849,16 +837,19 @@ public boolean isLoggable(LogRecord record) {
}
}
// Run GC a few times to give the batchers a chance to be collected
for (int retry = 0; retry < 100; retry++) {
System.gc();
System.runFinalization();
BatcherReference.cleanQueue();
Thread.sleep(10);
}

synchronized (records) {
assertThat(records).isEmpty();
}
await()
.pollInterval(Duration.ofMillis(10))
.during(Duration.ofSeconds(1))
.atMost(Duration.ofSeconds(5))
.until(
() -> {
System.gc();
System.runFinalization();
BatcherReference.cleanQueue();
synchronized (records) {
return records.isEmpty();
}
});
} finally {
// reset logging
batcherLogger.setFilter(oldFilter);
Expand Down Expand Up @@ -990,10 +981,12 @@ void testThrottlingBlocking() throws Exception {
// resulting in a shorter total_throttled_time at the verification of throttledTime
// at the end of the test.
// https://github.com/googleapis/sdk-platform-java/issues/1193
do {
Thread.sleep(10);
} while (batcherAddThreadHolder.isEmpty()
|| batcherAddThreadHolder.get(0).getState() != Thread.State.WAITING);
await()
.atMost(Duration.ofSeconds(5))
.until(
() ->
!batcherAddThreadHolder.isEmpty()
&& batcherAddThreadHolder.get(0).getState() == Thread.State.WAITING);

long beforeGetCall = System.currentTimeMillis();
executor.submit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@
*/
package com.google.api.gax.batching;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -68,8 +70,7 @@ void testBlocking() throws InterruptedException {
Thread t = new Thread(() -> semaphore.acquire(1));
t.start();

Thread.sleep(50);
assertTrue(t.isAlive());
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);

semaphore.release(1);
t.join();
Expand All @@ -95,8 +96,7 @@ void testReducePermitLimitBlocking() throws InterruptedException {
Thread t = new Thread(() -> semaphore.acquire(1));
t.start();

Thread.sleep(50);
assertTrue(t.isAlive());
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);

semaphore.release(1);
t.join();
Expand Down Expand Up @@ -124,17 +124,15 @@ void testAcquirePartialBlocking() throws Exception {
Thread t1 = new Thread(() -> semaphore.acquire(1));
t1.start();
// wait for thread to start
Thread.sleep(100);
assertTrue(t1.isAlive());
await().atMost(Duration.ofSeconds(5)).until(() -> t1.getState() == Thread.State.WAITING);
semaphore.release(6);
t1.join();

// now there should be 4 permits available, acquiring 6 should block
Thread t2 = new Thread(() -> semaphore.acquirePartial(6));
t2.start();
// wait fo thread to start
Thread.sleep(100);
assertTrue(t2.isAlive());
await().atMost(Duration.ofSeconds(5)).until(() -> t2.getState() == Thread.State.WAITING);
// limit should still be 5 and get limit should not block
assertEquals(5, semaphore.getPermitLimit());
}
Expand All @@ -158,8 +156,7 @@ void testIncreasePermitLimitBlocking() throws Exception {
Thread t = new Thread(() -> semaphore.acquire(1));
t.start();

Thread.sleep(50);
assertTrue(t.isAlive());
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);

semaphore.increasePermitLimit(1);
t.join();
Expand Down Expand Up @@ -208,8 +205,7 @@ void testReleaseWontOverflowBlocking() throws Exception {
semaphore.release(10);
Thread t = new Thread(() -> semaphore.acquire(11));
t.start();
Thread.sleep(100);
assertTrue(t.isAlive());
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);
}

@Test
Expand Down Expand Up @@ -239,7 +235,6 @@ void testPermitLimitUnderflowBlocking() throws Exception {
semaphore.release(10);
Thread t = new Thread(() -> semaphore.acquire(11));
t.start();
Thread.sleep(100);
assertTrue(t.isAlive());
await().atMost(Duration.ofSeconds(5)).until(() -> t.getState() == Thread.State.WAITING);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
package com.google.api.gax.retrying;

import static com.google.api.gax.retrying.FailingCallable.FAST_RETRY_SETTINGS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -41,12 +42,15 @@
import com.google.api.core.NanoClock;
import com.google.api.gax.retrying.FailingCallable.CustomException;
import com.google.api.gax.rpc.testing.FakeCallContext;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -99,26 +103,32 @@ void testSuccessWithFailuresPeekAttempt() throws Exception {

future.setAttemptFuture(executor.submit(future));

int failedAttempts = 0;
while (!future.isDone()) {
ApiFuture<String> attemptResult = future.peekAttemptResult();
if (attemptResult != null) {
assertTrue(attemptResult.isDone());
assertFalse(attemptResult.isCancelled());
try {
attemptResult.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof CustomException) {
failedAttempts++;
}
}
}
Thread.sleep(0L, 100);
}
final AtomicInteger failedAttempts = new AtomicInteger(0);
final AtomicReference<ApiFuture<String>> lastSeenAttempt = new AtomicReference<>();
await()
.pollInterval(Duration.ofMillis(2))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to poll more frequently for this? Seems like it would try 2/3 times at most?

.atMost(Duration.ofSeconds(5))
.until(
() -> {
ApiFuture<String> attemptResult = future.peekAttemptResult();
if (attemptResult != null && attemptResult != lastSeenAttempt.get()) {
lastSeenAttempt.set(attemptResult);
assertTrue(attemptResult.isDone());
assertFalse(attemptResult.isCancelled());
try {
attemptResult.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof CustomException) {
failedAttempts.incrementAndGet();
}
}
}
return future.isDone();
});

assertFutureSuccess(future);
assertEquals(15, future.getAttemptSettings().getAttemptCount());
assertTrue(failedAttempts > 0);
assertTrue(failedAttempts.get() > 0);
}
}

Expand Down Expand Up @@ -260,9 +270,12 @@ void testCancelOuterFutureAfterStart() throws Exception {
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

// The test sleeps a duration long enough to ensure that the future has been submitted for
// execution
Thread.sleep(150L);
await()
.atMost(Duration.ofSeconds(5))
.until(
() ->
future.getAttemptSettings() != null
&& future.getAttemptSettings().getAttemptCount() > 0);

boolean res = future.cancel(false);
assertTrue(res);
Expand Down Expand Up @@ -302,7 +315,9 @@ void testCancelProxiedFutureAfterStart() throws Exception {
callable.setExternalFuture(future);
future.setAttemptFuture(executor.submit(future));

Thread.sleep(50L);
await()
.atMost(Duration.ofSeconds(5))
.until(() -> callable.getFirstAttemptFinishedLatch().getCount() == 0);

// Note that shutdownNow() will not cancel internal FutureTasks automatically, which
// may potentially cause another thread handing on RetryingFuture#get() call forever.
Expand Down
Loading