Skip to content

Commit b0a3738

Browse files
authored
fix: websocket reconnect null point error (#296)
Closes: TD-38690
1 parent 9c473bc commit b0a3738

19 files changed

+2500
-130
lines changed

CHANGELOG.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,33 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77

88
## [unreleased]
99

10-
**Full Changelog**: [3.7.6...](https://github.com/taosdata/taos-connector-jdbc/compare/3.7.6...)
10+
**Full Changelog**: [3.7.8...](https://github.com/taosdata/taos-connector-jdbc/compare/3.7.8...)
11+
12+
### Features
13+
14+
15+
- Optimize the load balancing logic to achieve rebalancing after node failure recovery. (#291) ([9c473bc](https://github.com/taosdata/taos-connector-jdbc/commit/9c473bccc699ddba33316efa313da2230143ac34))
16+
17+
18+
## [3.7.8] - 2025-11-20
19+
20+
**Full Changelog**: [3.7.7...3.7.8](https://github.com/taosdata/taos-connector-jdbc/compare/3.7.7...3.7.8)
21+
22+
### Bug Fixes
23+
24+
25+
- Fix getTables need identifier quote string bug. (#292) ([ce5651d](https://github.com/taosdata/taos-connector-jdbc/commit/ce5651d31c87174c06ba567b9a1596c54a6c288b))
26+
27+
28+
## [3.7.7] - 2025-11-13
29+
30+
**Full Changelog**: [3.7.6...3.7.7](https://github.com/taosdata/taos-connector-jdbc/compare/3.7.6...3.7.7)
1131

1232
### Bug Fixes
1333

1434

1535
- Load properties issue in windows (#289) ([b6841d7](https://github.com/taosdata/taos-connector-jdbc/commit/b6841d726f9520556ddf069a5e0aea95af3be876))
36+
- WebSocket statement timeout setting resets transport's timeout (#290) ([c9792c0](https://github.com/taosdata/taos-connector-jdbc/commit/c9792c0eea77e9978ff29bf64c55b823d0c0755f))
1637

1738

1839
## [3.7.6] - 2025-10-17

deploy-pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>com.taosdata.jdbc</groupId>
77
<artifactId>taos-jdbcdriver</artifactId>
8-
<version>3.7.8</version>
8+
<version>3.7.9</version>
99

1010
<packaging>jar</packaging>
1111

@@ -48,6 +48,7 @@
4848
<netty-all.version>4.1.127.Final</netty-all.version>
4949
<junit.version>4.13.2</junit.version>
5050
<mockito.version>4.11.0</mockito.version>
51+
<byte-buddy.version>1.12.22</byte-buddy.version>
5152
<slf4j.version>1.7.36</slf4j.version>
5253
<logback.version>1.3.16</logback.version>
5354
</properties>
@@ -71,6 +72,12 @@
7172
<version>${mockito.version}</version>
7273
<scope>test</scope>
7374
</dependency>
75+
<dependency>
76+
<groupId>net.bytebuddy</groupId>
77+
<artifactId>byte-buddy</artifactId>
78+
<version>${byte-buddy.version}</version>
79+
<scope>test</scope>
80+
</dependency>
7481
<dependency>
7582
<groupId>ch.qos.logback</groupId>
7683
<artifactId>logback-classic</artifactId>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.taosdata.jdbc</groupId>
55
<artifactId>taos-jdbcdriver</artifactId>
6-
<version>3.7.8</version>
6+
<version>3.7.9</version>
77

88
<packaging>jar</packaging>
99
<name>JDBCDriver</name>

src/main/java/com/taosdata/jdbc/ws/Transport.java

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -117,13 +117,16 @@ public Response send(Request request, boolean reSend, long timeout) throws SQLEx
117117
try {
118118
connectionManager.getCurrentClient().send(reqString);
119119
} catch (WebsocketNotConnectedException e) {
120-
connectionManager.handleConnectionException(false, this);
121120
try {
121+
connectionManager.handleConnectionException(this);
122122
if (!reSend) {
123123
inFlightRequest.remove(request.getAction(), request.id());
124-
throw new SQLException("reconnect, need to resend " + request.getAction() + " msg");
124+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
125125
}
126126
connectionManager.getCurrentClient().send(reqString);
127+
} catch (SQLException ex) {
128+
inFlightRequest.remove(request.getAction(), request.id());
129+
throw ex;
127130
} catch (Exception ex) {
128131
inFlightRequest.remove(request.getAction(), request.id());
129132
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESTFul_Client_IOException, e.getMessage());
@@ -135,7 +138,11 @@ public Response send(Request request, boolean reSend, long timeout) throws SQLEx
135138
try {
136139
response = responseFuture.get();
137140
handleTaosdError(response);
138-
} catch (InterruptedException | ExecutionException e) {
141+
} catch (InterruptedException e) {
142+
Thread.currentThread().interrupt();
143+
inFlightRequest.remove(request.getAction(), request.id());
144+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
145+
} catch (ExecutionException e) {
139146
inFlightRequest.remove(request.getAction(), request.id());
140147
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
141148
}
@@ -193,8 +200,8 @@ public Response send(String action, long reqId, long resultId, long type, byte[]
193200
Utils.retainByteBuf(buffer);
194201
connectionManager.getCurrentClient().send(buffer);
195202
} catch (WebsocketNotConnectedException e) {
196-
connectionManager.handleConnectionException(false, this);
197203
try {
204+
connectionManager.handleConnectionException(this);
198205
Utils.retainByteBuf(buffer);
199206
connectionManager.getCurrentClient().send(buffer);
200207
} catch (Exception ex) {
@@ -210,7 +217,11 @@ public Response send(String action, long reqId, long resultId, long type, byte[]
210217
try {
211218
response = responseFuture.get();
212219
handleTaosdError(response);
213-
} catch (InterruptedException | ExecutionException e) {
220+
} catch (InterruptedException e) {
221+
Thread.currentThread().interrupt();
222+
inFlightRequest.remove(action, reqId);
223+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
224+
} catch (ExecutionException e) {
214225
inFlightRequest.remove(action, reqId);
215226
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
216227
}
@@ -243,7 +254,7 @@ public void sendFetchBlockAsync(long reqId, long resultId) throws SQLException {
243254
Utils.retainByteBuf(buffer);
244255
connectionManager.getCurrentClient().send(buffer);
245256
} catch (WebsocketNotConnectedException e) {
246-
connectionManager.handleConnectionException(false, this);
257+
connectionManager.handleConnectionException(this);
247258
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED,
248259
"Websocket reconnected, but the result set is closed");
249260
} finally {
@@ -276,13 +287,20 @@ public Response send(String action, long reqId, ByteBuf buffer, boolean resend,
276287
Utils.retainByteBuf(buffer);
277288
connectionManager.getCurrentClient().send(buffer);
278289
} catch (WebsocketNotConnectedException e) {
279-
connectionManager.handleConnectionException(false, this);
290+
try {
291+
connectionManager.handleConnectionException(this);
292+
} catch (SQLException ex) {
293+
inFlightRequest.remove(action, reqId);
294+
throw ex;
295+
}
296+
297+
if (!resend) {
298+
inFlightRequest.remove(action, reqId);
299+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_CONNECTION_CLOSED);
300+
}
301+
280302
try {
281303
Utils.retainByteBuf(buffer);
282-
if (!resend) {
283-
inFlightRequest.remove(action, reqId);
284-
throw new SQLException("reconnect, need to resend " + action + " msg");
285-
}
286304
connectionManager.getCurrentClient().send(buffer);
287305
} catch (Exception ex) {
288306
inFlightRequest.remove(action, reqId);
@@ -299,9 +317,13 @@ public Response send(String action, long reqId, ByteBuf buffer, boolean resend,
299317
try {
300318
response = responseFuture.get();
301319
handleTaosdError(response);
302-
} catch (InterruptedException | ExecutionException e) {
320+
} catch (InterruptedException e) {
321+
Thread.currentThread().interrupt();
303322
inFlightRequest.remove(action, reqId);
304323
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
324+
} catch (ExecutionException e) {
325+
inFlightRequest.remove(action, reqId);
326+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
305327
}
306328
return response;
307329
}
@@ -353,9 +375,13 @@ public Response sendWithoutRetry(Request request, long timeout) throws SQLExcept
353375
completableFuture, timeout, TimeUnit.MILLISECONDS, reqString);
354376
try {
355377
response = responseFuture.get();
356-
} catch (InterruptedException | ExecutionException e) {
357-
inFlightRequest.remove(request.getAction(), request.id());
378+
} catch (InterruptedException e) {
379+
Thread.currentThread().interrupt();
380+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
381+
} catch (ExecutionException e) {
358382
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_QUERY_TIMEOUT, e.getMessage());
383+
} finally {
384+
inFlightRequest.remove(request.getAction(), request.id());
359385
}
360386
return response;
361387
}
@@ -374,7 +400,7 @@ public void sendWithoutResponse(Request request) throws SQLException {
374400
try {
375401
connectionManager.getCurrentClient().send(request.toString());
376402
} catch (WebsocketNotConnectedException e) {
377-
connectionManager.handleConnectionException(false, this);
403+
connectionManager.handleConnectionException(this);
378404
try {
379405
connectionManager.getCurrentClient().send(request.toString());
380406
} catch (Exception ex) {

src/main/java/com/taosdata/jdbc/ws/WSClient.java

Lines changed: 48 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,19 @@
1010
import io.netty.bootstrap.Bootstrap;
1111
import io.netty.buffer.ByteBuf;
1212
import io.netty.buffer.PooledByteBufAllocator;
13-
import io.netty.channel.*;
14-
import io.netty.channel.socket.SocketChannel;
13+
import io.netty.channel.Channel;
14+
import io.netty.channel.ChannelFuture;
15+
import io.netty.channel.ChannelOption;
16+
import io.netty.channel.ConnectTimeoutException;
1517
import io.netty.channel.socket.nio.NioSocketChannel;
16-
import io.netty.handler.codec.http.DefaultHttpHeaders;
17-
import io.netty.handler.codec.http.HttpClientCodec;
18-
import io.netty.handler.codec.http.HttpObjectAggregator;
19-
import io.netty.handler.codec.http.websocketx.*;
20-
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
21-
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandshaker;
22-
import io.netty.handler.codec.http.websocketx.extensions.compression.DeflateFrameClientExtensionHandshaker;
23-
import io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateClientExtensionHandshaker;
24-
import io.netty.handler.ssl.SslContext;
25-
import io.netty.handler.ssl.SslContextBuilder;
26-
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
18+
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
19+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
20+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
2721
import io.netty.handler.timeout.TimeoutException;
22+
import io.netty.util.concurrent.Promise;
2823
import org.slf4j.Logger;
2924
import org.slf4j.LoggerFactory;
3025

31-
import javax.net.ssl.SSLException;
3226
import java.net.URI;
3327
import java.net.URISyntaxException;
3428
import java.sql.SQLException;
@@ -80,66 +74,7 @@ private Bootstrap createBootstrap() {
8074
.channel(NioSocketChannel.class)
8175
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionParam.getConnectTimeout())
8276
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
83-
.handler(new ChannelInitializer<SocketChannel>() {
84-
@Override
85-
protected void initChannel(SocketChannel ch) throws SSLException {
86-
ChannelPipeline p = ch.pipeline();
87-
88-
if (connectionParam.isUseSsl()) {
89-
if (connectionParam.isDisableSslCertValidation()){
90-
SslContext sslCtx = SslContextBuilder.forClient()
91-
.trustManager(InsecureTrustManagerFactory.INSTANCE)
92-
.build();
93-
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
94-
} else {
95-
SslContext sslCtx = SslContextBuilder.forClient().build();
96-
p.addLast(sslCtx.newHandler(ch.alloc(), host, port));
97-
}
98-
}
99-
100-
// for debug
101-
//p.addLast(new LoggingHandler(LogLevel.DEBUG));
102-
103-
p.addLast(new HttpClientCodec());
104-
p.addLast(new HttpObjectAggregator(8192));
105-
106-
// use custom websocket client handshaker to avoid mask encode
107-
WebSocketClientHandshaker handshaker = new CustomWebSocketClientHandshaker(serverUri,
108-
WebSocketVersion.V13,
109-
null,
110-
true,
111-
new DefaultHttpHeaders(),
112-
100 * 1024 * 1024,
113-
true,
114-
false,
115-
-1L);
116-
p.addLast(new WebSocketHandshakeHandler(handshaker));
117-
118-
if (connectionParam.isEnableCompression()) {
119-
WebSocketClientExtensionHandshaker deflateHandshaker = new PerMessageDeflateClientExtensionHandshaker(
120-
6, false,
121-
15, // clientMaxWindowSize (2^15 = 32KB)
122-
true, // clientNoContextTakeover
123-
true // serverNoContextTakeover
124-
125-
);
126-
127-
WebSocketClientExtensionHandler extensionHandler = new WebSocketClientExtensionHandler(deflateHandshaker, new DeflateFrameClientExtensionHandshaker(false),
128-
new DeflateFrameClientExtensionHandshaker(true));
129-
p.addLast(extensionHandler);
130-
}
131-
132-
p.addLast(new WebSocketFrameAggregator(100 * 1024 * 1024)); // max 100MB
133-
134-
// Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
135-
// If you change it to V00, ping is not supported and remember to change
136-
// HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
137-
final WebSocketClientHandler handler =
138-
new WebSocketClientHandler(connectionParam.getTextMessageHandler(),
139-
connectionParam.getBinaryMessageHandler());
140-
p.addLast(handler);
141-
}
142-
});
77+
.handler(new WebSocketChannelInitializer(connectionParam, host, port, serverUri));
14378
return b;
14479
}
14580

@@ -228,8 +163,32 @@ private Channel getChannel() throws SQLException {
228163
}
229164

230165
tmpChn = connectFuture.channel();
166+
Promise<WebSocketHandshakeHandler> wsHandlerPromise = tmpChn.eventLoop().newPromise();
167+
168+
final Channel chn = tmpChn;
169+
tmpChn.eventLoop().execute(() -> {
170+
try {
171+
WebSocketHandshakeHandler wsHandler = chn.pipeline().get(WebSocketHandshakeHandler.class);
172+
if (wsHandler == null) {
173+
wsHandlerPromise.setFailure(new IllegalStateException("WebSocketHandshakeHandler not initialized,initChannel execute failed"));
174+
} else {
175+
wsHandlerPromise.setSuccess(wsHandler);
176+
}
177+
} catch (Exception e) {
178+
wsHandlerPromise.setFailure(e);
179+
}
180+
});
231181

232-
WebSocketHandshakeHandler wsHandler = tmpChn.pipeline().get(WebSocketHandshakeHandler.class);
182+
// wait for the Promise to complete with timeout
183+
wsHandlerPromise.awaitUninterruptibly(connectionParam.getConnectTimeout());
184+
if (!wsHandlerPromise.isSuccess()) {
185+
tmpChn.close().syncUninterruptibly();
186+
log.error("get wsHandler failed", wsHandlerPromise.cause());
187+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "get wsHandler failed: " + wsHandlerPromise.cause().getMessage());
188+
}
189+
190+
// get result from the Promise
191+
WebSocketHandshakeHandler wsHandler = wsHandlerPromise.getNow();
233192
ChannelFuture handshakeFuture = wsHandler.handshakeFuture();
234193

235194
if (!handshakeFuture.awaitUninterruptibly(connectionParam.getConnectTimeout())) {
@@ -339,23 +298,23 @@ public CompletableFuture<Void> closeAsync() {
339298

340299
return resultFuture;
341300
}
342-
public boolean reconnectBlockingWithoutRetry() {
343-
return connectBlocking();
344-
}
345301

346302
public boolean connectBlocking(){
347-
if (channel != null && channel.isActive()){
348-
return true;
349-
}
350-
if (channel != null){
351-
channel.close().syncUninterruptibly();
352-
}
303+
synchronized (this) {
304+
if (channel != null && channel.isActive()) {
305+
return true;
306+
}
307+
if (channel != null) {
308+
channel.close().syncUninterruptibly();
309+
}
353310

354-
try {
355-
channel = getChannel();
356-
return true;
357-
} catch (SQLException e){
358-
return false;
311+
try {
312+
channel = getChannel();
313+
return true;
314+
} catch (SQLException e) {
315+
log.error("WebSocket connect failed: ", e);
316+
return false;
317+
}
359318
}
360319
}
361320

0 commit comments

Comments
 (0)