From aaac3f5a8ad7358eeb99fad60e380b82a4802465 Mon Sep 17 00:00:00 2001 From: zdeng Date: Tue, 9 Dec 2025 16:14:39 +0800 Subject: [PATCH] HIVE-29363: Separate out thread pools from the housekeeping tasks --- .../MetastoreHousekeepingLeaderTestBase.java | 9 ++- .../hive/ql/lockmgr/TestDbTxnManager.java | 2 - .../hadoop/hive/metastore/HiveMetaStore.java | 1 - .../hadoop/hive/metastore/ThreadPool.java | 67 ------------------- .../metastore/leader/HouseKeepingTasks.java | 51 +++++++------- .../metastore/leader/LeaseLeaderElection.java | 2 +- 6 files changed, 34 insertions(+), 98 deletions(-) delete mode 100644 standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ThreadPool.java diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java index 61c81c29af7f..328226d7638d 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; +import org.apache.hadoop.hive.metastore.leader.HouseKeepingTasks; import org.apache.hadoop.hive.metastore.leader.LeaderElection; import org.apache.hadoop.hive.metastore.leader.LeaderElectionContext; import org.apache.hadoop.hive.metastore.leader.LeaderElectionFactory; @@ -42,6 +43,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; @@ -290,9 +292,10 @@ public void setName(String name) { protected void notifyListener() { ScheduledExecutorService service = null; if (!isLeader) { - try { - service = ThreadPool.getPool(); - } catch (Exception ignored) { + Optional houseKeepingTasks = + listeners.stream().filter(s -> s instanceof HouseKeepingTasks).findFirst(); + if (houseKeepingTasks.isPresent()) { + service = ((HouseKeepingTasks) houseKeepingTasks.get()).getExecutorService(); } } super.notifyListener(); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java index 77daeb5cb6b0..8b511f07dddd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java @@ -20,7 +20,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConfForTest; -import org.apache.hadoop.hive.metastore.ThreadPool; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse; import org.apache.hadoop.hive.metastore.api.ShowLocksResponse; @@ -549,7 +548,6 @@ public void setUp() throws Exception { public void tearDown() throws Exception { if (txnMgr != null) txnMgr.closeTxnManager(); TestTxnDbUtil.cleanDb(conf); - ThreadPool.shutdown(); } private static class MockQueryPlan extends QueryPlan { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 1d9fbf3e3ae1..00135cddfb40 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -302,7 +302,6 @@ public static void main(String[] args) throws Throwable { } catch (Exception e) { LOG.error("Error removing znode for this metastore instance from ZooKeeper.", e); } - ThreadPool.shutdown(); }, 10); //Start Metrics for Standalone (Remote) Mode diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ThreadPool.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ThreadPool.java deleted file mode 100644 index 5dca2b3cff3f..000000000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ThreadPool.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.metastore; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; - -/** - * Utility singleton class to manage all the threads. - */ -public class ThreadPool { - - static final private Logger LOG = LoggerFactory.getLogger(ThreadPool.class); - private static ThreadPool self; - private static ScheduledExecutorService pool; - - public static synchronized ThreadPool initialize(Configuration conf) { - if (self == null) { - self = new ThreadPool(conf); - LOG.debug("ThreadPool initialized"); - } - return self; - } - - private ThreadPool(Configuration conf) { - ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat("Metastore Scheduled Worker %d").build(); - pool = Executors.newScheduledThreadPool(MetastoreConf.getIntVar(conf, - MetastoreConf.ConfVars.THREAD_POOL_SIZE), threadFactory); - } - - public static ScheduledExecutorService getPool() { - if (self == null) { - throw new RuntimeException("ThreadPool accessed before initialized"); - } - return pool; - } - - public static synchronized void shutdown() { - if (self != null) { - pool.shutdown(); - self = null; - } - } -} diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java index 3220b56f7d38..af3bd2b0ac06 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/HouseKeepingTasks.java @@ -18,10 +18,12 @@ package org.apache.hadoop.hive.metastore.leader; +import com.cronutils.utils.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.HiveMetaStore; import org.apache.hadoop.hive.metastore.MetastoreTaskThread; -import org.apache.hadoop.hive.metastore.ThreadPool; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService; import org.apache.hadoop.hive.metastore.utils.JavaUtils; @@ -29,6 +31,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import static java.util.Objects.requireNonNull; @@ -38,7 +43,7 @@ public class HouseKeepingTasks implements LeaderElection.LeadershipStateListener private final Configuration configuration; // shut down pool when new leader is selected - private ThreadPool metastoreTaskThreadPool; + private ScheduledExecutorService metastoreTaskThreadPool; private boolean runOnlyRemoteTasks; @@ -94,30 +99,24 @@ public void takeLeadership(LeaderElection election) throws Exception { throw new IllegalStateException("There should be no running tasks before taking the leadership!"); } runningTasks = new ArrayList<>(); - metastoreTaskThreadPool = ThreadPool.initialize(configuration); + ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Metastore Scheduled Worker(" + election.getName() + ") %d").build(); + final List tasks; if (!runOnlyRemoteTasks) { - List alwaysTasks = new ArrayList<>(getAlwaysTasks()); - for (MetastoreTaskThread task : alwaysTasks) { - task.setConf(configuration); - task.enforceMutex(election.enforceMutex()); - long freq = task.runFrequency(TimeUnit.MILLISECONDS); - // For backwards compatibility, since some threads used to be hard coded but only run if - // frequency was > 0 - if (freq > 0) { - runningTasks.add(task); - metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS); - } - } + tasks = new ArrayList<>(getAlwaysTasks()); } else { - List remoteOnlyTasks = new ArrayList<>(getRemoteOnlyTasks()); - for (MetastoreTaskThread task : remoteOnlyTasks) { - task.setConf(configuration); - task.enforceMutex(election.enforceMutex()); - long freq = task.runFrequency(TimeUnit.MILLISECONDS); - if (freq > 0) { - runningTasks.add(task); - metastoreTaskThreadPool.getPool().scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS); - } + tasks = new ArrayList<>(getRemoteOnlyTasks()); + } + int poolSize = Math.min(MetastoreConf.getIntVar(configuration, + MetastoreConf.ConfVars.THREAD_POOL_SIZE), tasks.size()); + metastoreTaskThreadPool = Executors.newScheduledThreadPool(poolSize, threadFactory); + for (MetastoreTaskThread task : tasks) { + task.setConf(configuration); + task.enforceMutex(election.enforceMutex()); + long freq = task.runFrequency(TimeUnit.MILLISECONDS); + if (freq > 0) { + runningTasks.add(task); + metastoreTaskThreadPool.scheduleAtFixedRate(task, freq, freq, TimeUnit.MILLISECONDS); } } @@ -141,4 +140,8 @@ public void lossLeadership(LeaderElection election) throws Exception { } } + @VisibleForTesting + public ScheduledExecutorService getExecutorService() { + return metastoreTaskThreadPool; + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java index ee635a6f9f5f..fb9548d065ea 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/LeaseLeaderElection.java @@ -89,7 +89,7 @@ public class LeaseLeaderElection implements LeaderElection { private volatile boolean stopped = false; // Leadership change listeners - private final List listeners = new ArrayList<>(); + protected final List listeners = new ArrayList<>(); protected String name; private final String userName;