Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,15 @@ public CompletableFuture<Void> closeAsync(boolean waitForWebServiceToStop) {
// Close protocol handler before unloading namespace bundles because protocol handlers might maintain
// Pulsar clients that could send lookup requests that affect unloading.
if (protocolHandlers != null) {
try {
List<CompletableFuture<Void>> channelCloseFutures =
brokerService.closeProtocolHandlerChannels();
// Wait for all protocol handler channels to close before closing protocol handlers
FutureUtil.waitForAll(channelCloseFutures).get();
LOG.info("Protocol handler channels closed successfully");
} catch (Exception e) {
LOG.warn("Failed to close protocol handler channels", e);
}
protocolHandlers.close();
protocolHandlers = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,8 @@ public class BrokerService implements Closeable {
private volatile DelayedDeliveryTrackerFactory fallbackDelayedDeliveryTrackerFactory;
private final ServerBootstrap defaultServerBootstrap;
private final List<Pair<String, EventLoopGroup>> protocolHandlersWorkerGroups = new ArrayList<>();

// Protocol handler channels for explicit lifecycle management
private final List<Channel> protocolHandlerChannels = new ArrayList<>(2);
@Getter
private final BundlesQuotas bundlesQuotas;

Expand Down Expand Up @@ -571,7 +572,8 @@ private void startProtocolHandler(String protocol,
}
bootstrap.childHandler(initializer);
try {
bootstrap.bind(address).sync();
Channel ch = bootstrap.bind(address).sync().channel();
protocolHandlerChannels.add(ch);
} catch (Exception e) {
throw new IOException("Failed to bind protocol `" + protocol + "` on " + address, e);
}
Expand Down Expand Up @@ -3954,4 +3956,24 @@ public void setCurrentClusterAllowedIfNoClusterIsAllowed(NamespaceName nsName, P
nsPolicies.allowed_clusters.add(pulsar.getConfig().getClusterName());
}
}

/**
* Close protocol handler channels explicitly with proper shutdown sequence.
* This method should be called from PulsarService before protocolHandlers.close()
* to ensure proper resource cleanup timing.
*
* Shutdown sequence: EventLoopGroups → Listen Channels → Protocol Handler Channels
*
* @return List of CompletableFuture for tracking the close operations
*/
public List<CompletableFuture<Void>> closeProtocolHandlerChannels() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
protocolHandlerChannels.forEach(ch -> {
if (ch.isOpen()) {
closeFutures.add(closeChannel(ch));
}
});
protocolHandlerChannels.clear(); // Clear the list after closing
return closeFutures;
}
}
Loading