Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
Expand Down Expand Up @@ -80,8 +81,8 @@ class BigQueryArrowResultSet extends BigQueryBaseResultSet {
// Decoder object will be reused to avoid re-allocation and too much garbage collection.
private VectorSchemaRoot vectorSchemaRoot;
private VectorLoader vectorLoader;
// producer thread's reference
private final Thread ownedThread;
// producer task's reference
private final Future<?> ownedTask;

private BigQueryArrowResultSet(
Schema schema,
Expand All @@ -93,7 +94,7 @@ private BigQueryArrowResultSet(
boolean isNested,
int fromIndex,
int toIndexExclusive,
Thread ownedThread,
Future<?> ownedTask,
BigQuery bigQuery,
Job job)
throws SQLException {
Expand All @@ -105,7 +106,7 @@ private BigQueryArrowResultSet(
this.fromIndex = fromIndex;
this.toIndexExclusive = toIndexExclusive;
this.nestedRowIndex = fromIndex - 1;
this.ownedThread = ownedThread;
this.ownedTask = ownedTask;
if (!isNested && arrowSchema != null) {
try {
this.arrowDeserializer = new ArrowDeserializer(arrowSchema);
Expand All @@ -127,10 +128,10 @@ static BigQueryArrowResultSet of(
long totalRows,
BigQueryStatement statement,
BlockingQueue<BigQueryArrowBatchWrapper> buffer,
Thread ownedThread,
Future<?> ownedTask,
BigQuery bigQuery)
throws SQLException {
return of(schema, arrowSchema, totalRows, statement, buffer, ownedThread, bigQuery, null);
return of(schema, arrowSchema, totalRows, statement, buffer, ownedTask, bigQuery, null);
}

static BigQueryArrowResultSet of(
Expand All @@ -139,7 +140,7 @@ static BigQueryArrowResultSet of(
long totalRows,
BigQueryStatement statement,
BlockingQueue<BigQueryArrowBatchWrapper> buffer,
Thread ownedThread,
Future<?> ownedTask,
BigQuery bigQuery,
Job job)
throws SQLException {
Expand All @@ -153,7 +154,7 @@ static BigQueryArrowResultSet of(
false,
-1,
-1,
ownedThread,
ownedTask,
bigQuery,
job);
}
Expand All @@ -165,7 +166,7 @@ static BigQueryArrowResultSet of(
this.currentNestedBatch = null;
this.fromIndex = 0;
this.toIndexExclusive = 0;
this.ownedThread = null;
this.ownedTask = null;
this.arrowDeserializer = null;
this.vectorSchemaRoot = null;
this.vectorLoader = null;
Expand Down Expand Up @@ -484,9 +485,9 @@ private String formatRangeElement(Object element, StandardSQLTypeName elementTyp
public void close() {
LOG.fineTrace("close", () -> String.format("Closing BigqueryArrowResultSet %s.", this));
this.isClosed = true;
if (ownedThread != null && !ownedThread.isInterrupted()) {
// interrupt the producer thread when result set is closed
ownedThread.interrupt();
if (ownedTask != null) {
// cancel the producer task when result set is closed
ownedTask.cancel(true);
}
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,11 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {

this.headerProvider = createHeaderProvider();
this.bigQuery = getBigQueryConnection();
// Fixed thread pool queues tasks to limit concurrent metadata calls and prevent API
// throttling.
this.metadataExecutor = BigQueryJdbcMdc.newFixedThreadPool(metadataFetchThreadCount);
// Cached pool executes queries immediately without queueing and reclaims all idle threads
// when inactive, minimizing resources.
this.queryExecutor = BigQueryJdbcMdc.newCachedThreadPool();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ public ResultSet getProcedures(

Thread fetcherThread = new Thread(procedureFetcher, "getProcedures-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getProcedures");
Expand Down Expand Up @@ -1207,7 +1207,7 @@ public ResultSet getProcedureColumns(
Thread fetcherThread =
new Thread(procedureColumnFetcher, "getProcedureColumns-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getProcedureColumns for catalog: " + catalog);
Expand Down Expand Up @@ -1878,7 +1878,7 @@ public ResultSet getTables(

Thread fetcherThread = new Thread(tableFetcher, "getTables-fetcher-" + effectiveCatalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getTables");
Expand Down Expand Up @@ -2018,7 +2018,8 @@ public ResultSet getCatalogs() {
populateQueue(catalogRows, queue, schemaFields);
signalEndOfData(queue, schemaFields);

return BigQueryJsonResultSet.of(catalogsSchema, catalogRows.size(), queue, null, new Thread[0]);
return BigQueryJsonResultSet.of(
catalogsSchema, catalogRows.size(), queue, null, new Future<?>[0]);
}

Schema defineGetCatalogsSchema() {
Expand Down Expand Up @@ -2050,7 +2051,7 @@ public ResultSet getTableTypes() {
signalEndOfData(queue, tableTypesSchema.getFields());

return BigQueryJsonResultSet.of(
tableTypesSchema, tableTypeRows.size(), queue, null, new Thread[0]);
tableTypesSchema, tableTypeRows.size(), queue, null, new Future<?>[0]);
}

static Schema defineGetTableTypesSchema() {
Expand Down Expand Up @@ -2204,7 +2205,7 @@ public ResultSet getColumns(

Thread fetcherThread = new Thread(columnFetcher, "getColumns-fetcher-" + effectiveCatalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getColumns");
Expand Down Expand Up @@ -2719,7 +2720,7 @@ public ResultSet getTypeInfo() {
populateQueue(typeInfoRows, queue, schemaFields);
signalEndOfData(queue, schemaFields);
return BigQueryJsonResultSet.of(
typeInfoSchema, typeInfoRows.size(), queue, null, new Thread[0]);
typeInfoSchema, typeInfoRows.size(), queue, null, new Future<?>[0]);
}

Schema defineGetTypeInfoSchema() {
Expand Down Expand Up @@ -3714,7 +3715,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) {

Thread fetcherThread = new Thread(schemaFetcher, "getSchemas-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getSchemas");
Expand Down Expand Up @@ -3833,7 +3834,7 @@ public ResultSet getClientInfoProperties() {
signalEndOfData(queue, resultSchemaFields);
}
return BigQueryJsonResultSet.of(
resultSchema, collectedResults.size(), queue, null, new Thread[0]);
resultSchema, collectedResults.size(), queue, null, new Future<?>[0]);
}

Schema defineGetClientInfoPropertiesSchema() {
Expand Down Expand Up @@ -4008,7 +4009,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct

Thread fetcherThread = new Thread(functionFetcher, "getFunctions-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getFunctions");
Expand Down Expand Up @@ -4262,7 +4263,7 @@ public ResultSet getFunctionColumns(
Thread fetcherThread =
new Thread(functionColumnFetcher, "getFunctionColumns-fetcher-" + catalog);
BigQueryJsonResultSet resultSet =
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, new Thread[] {fetcherThread});
BigQueryJsonResultSet.of(resultSchema, -1, queue, null, wrapThread(fetcherThread));

fetcherThread.start();
LOG.info("Started background thread for getFunctionColumns for catalog: " + catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ static void clear() {
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
MdcThreadPoolExecutor executor =
new MdcThreadPoolExecutor(
"Metadata Fetch Pool",
nThreads,
nThreads,
60L,
Expand All @@ -86,6 +87,7 @@ static ExecutorService newFixedThreadPool(int nThreads) {
*/
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new MdcThreadPoolExecutor(
"Query Executor Pool",
0,
Integer.MAX_VALUE,
60L,
Expand Down Expand Up @@ -125,15 +127,18 @@ public Thread newThread(Runnable r) {
}

private static class MdcThreadPoolExecutor extends ThreadPoolExecutor {
private final String poolName;

public MdcThreadPoolExecutor(
String poolName,
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.poolName = poolName;
}

private final AtomicBoolean warningLogged = new AtomicBoolean(false);
Expand All @@ -149,8 +154,8 @@ private void monitorQueueSaturation(int queueSize) {
if (queueSize >= warnThreshold) {
if (warningLogged.compareAndSet(false, true)) {
LOG.warning(
"Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the thread count property.",
maxPoolSize, getActiveCount(), queueSize);
"[%s] Thread pool is saturating. Max pool size: %d, Active threads: %d, Queued tasks: %d. Consider increasing the metadataFetchThreadCount property.",
poolName, maxPoolSize, getActiveCount(), queueSize);
}
} else if (queueSize <= recoveryThreshold) {
if (warningLogged.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
Pattern.CASE_INSENSITIVE);
static final String METADATA_FETCH_THREAD_COUNT_PROPERTY_NAME = "MetaDataFetchThreadCount";
static final int DEFAULT_METADATA_FETCH_THREAD_COUNT_VALUE = 32;

static final String RETRY_TIMEOUT_IN_SECS_PROPERTY_NAME = "Timeout";
static final long DEFAULT_RETRY_TIMEOUT_IN_SECS_VALUE = 0L;
static final String JOB_TIMEOUT_PROPERTY_NAME = "JobTimeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;

/** {@link ResultSet} Implementation for JSON datasource (Using REST APIs) */
class BigQueryJsonResultSet extends BigQueryBaseResultSet {
Expand All @@ -43,7 +44,7 @@ class BigQueryJsonResultSet extends BigQueryBaseResultSet {
private boolean afterLast = false;
private final int fromIndex;
private final int toIndexExclusive;
private final Thread[] ownedThreads;
private final Future<?>[] ownedTasks;

private BigQueryJsonResultSet(
Schema schema,
Expand All @@ -54,7 +55,7 @@ private BigQueryJsonResultSet(
BigQueryFieldValueListWrapper cursor,
int fromIndex,
int toIndexExclusive,
Thread[] ownedThreads,
Future<?>[] ownedTasks,
BigQuery bigQuery,
Job job) {
super(bigQuery, statement, schema, isNested, job);
Expand All @@ -64,7 +65,7 @@ private BigQueryJsonResultSet(
this.fromIndex = fromIndex;
this.toIndexExclusive = toIndexExclusive;
this.nestedRowIndex = fromIndex - 1;
this.ownedThreads = ownedThreads;
this.ownedTasks = ownedTasks;
}

/**
Expand All @@ -78,42 +79,42 @@ static BigQueryJsonResultSet of(
long totalRows,
BlockingQueue<BigQueryFieldValueListWrapper> buffer,
BigQueryStatement statement,
Thread[] ownedThreads,
Future<?>[] ownedTasks,
BigQuery bigQuery) {

return of(schema, totalRows, buffer, statement, ownedThreads, bigQuery, null);
return of(schema, totalRows, buffer, statement, ownedTasks, bigQuery, null);
}

static BigQueryJsonResultSet of(
Schema schema,
long totalRows,
BlockingQueue<BigQueryFieldValueListWrapper> buffer,
BigQueryStatement statement,
Thread[] ownedThreads,
Future<?>[] ownedTasks,
BigQuery bigQuery,
Job job) {

return new BigQueryJsonResultSet(
schema, totalRows, buffer, statement, false, null, -1, -1, ownedThreads, bigQuery, job);
schema, totalRows, buffer, statement, false, null, -1, -1, ownedTasks, bigQuery, job);
}

static BigQueryJsonResultSet of(
Schema schema,
long totalRows,
BlockingQueue<BigQueryFieldValueListWrapper> buffer,
BigQueryStatement statement,
Thread[] ownedThreads) {
Future<?>[] ownedTasks) {

return new BigQueryJsonResultSet(
schema, totalRows, buffer, statement, false, null, -1, -1, ownedThreads, null, null);
schema, totalRows, buffer, statement, false, null, -1, -1, ownedTasks, null, null);
}

BigQueryJsonResultSet() {
super(null, null, null, false);
totalRows = 0;
buffer = null;
fromIndex = 0;
ownedThreads = new Thread[0];
ownedTasks = new Future<?>[0];
toIndexExclusive = 0;
}

Expand Down Expand Up @@ -291,11 +292,9 @@ private FieldValue getObjectInternal(int columnIndex) throws SQLException {
public void close() {
LOG.fineTrace("close", () -> String.format("Closing BigqueryJsonResultSet %s.", this));
this.isClosed = true;
if (ownedThreads != null) {
for (Thread ownedThread : ownedThreads) {
if (!ownedThread.isInterrupted()) {
ownedThread.interrupt();
}
if (ownedTasks != null) {
for (Future<?> ownedTask : ownedTasks) {
ownedTask.cancel(true);
}
}
super.close();
Expand Down
Loading
Loading