11package com .taosdata .jdbc .ws ;
22
3+ import com .taosdata .jdbc .TSDBConstants ;
34import com .taosdata .jdbc .TSDBError ;
45import com .taosdata .jdbc .TSDBErrorNumbers ;
56import com .taosdata .jdbc .enums .WSFunction ;
2728public class Transport implements AutoCloseable {
2829 private static final Logger log = LoggerFactory .getLogger (Transport .class );
2930 private static final boolean isTest = "test" .equalsIgnoreCase (System .getProperty ("ENV_TAOS_JDBC_TEST" ));
30-
31- public static final int DEFAULT_MESSAGE_WAIT_TIMEOUT = 60_000 ;
32-
3331 public static final int TSDB_CODE_RPC_NETWORK_UNAVAIL = 0x0B ;
3432 public static final int TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED = 0x20 ;
35-
3633 private final AtomicInteger reconnectCount = new AtomicInteger (0 );
3734
3835 private final ArrayList <WSClient > clientArr = new ArrayList <>();
3936 private final InFlightRequest inFlightRequest ;
40- private long timeout ;
37+ private final long defaultTimeout ;
4138 private volatile boolean closed = false ;
4239
4340 private final ConnectionParam connectionParam ;
@@ -50,10 +47,13 @@ protected Transport() {
5047 this .inFlightRequest = null ;
5148 this .connectionParam = null ;
5249 this .wsFunction = null ;
50+ this .defaultTimeout = TSDBConstants .DEFAULT_MESSAGE_WAIT_TIMEOUT ;
5351 }
5452 public Transport (WSFunction function ,
5553 ConnectionParam param ,
5654 InFlightRequest inFlightRequest ) throws SQLException {
55+ this .defaultTimeout = param .getRequestTimeout ();
56+
5757 // master slave mode
5858 WSClient slave = WSClient .getSlaveInstance (param , function , this );
5959 if (slave != null ){
@@ -75,18 +75,7 @@ public Transport(WSFunction function,
7575 this .inFlightRequest = inFlightRequest ;
7676 this .connectionParam = param ;
7777 this .wsFunction = function ;
78-
79- setTimeout (param .getRequestTimeout ());
8078 }
81- public void setTimeout (long timeout ) {
82- if (timeout < 0 ){
83- timeout = DEFAULT_MESSAGE_WAIT_TIMEOUT ;
84- } else if (timeout == 0 ){
85- timeout = Integer .MAX_VALUE ;
86- }
87- this .timeout = timeout ;
88- }
89-
9079 private void reconnect (boolean isTmq ) throws SQLException {
9180 synchronized (this ) {
9281 if (isConnected ()){
@@ -118,10 +107,14 @@ private void tmqRethrowConnectionCloseException() throws SQLException {
118107 }
119108 }
120109 @ SuppressWarnings ("all" )
110+
121111 public Response send (Request request ) throws SQLException {
122- return send (request , true );
112+ return send (request , true , defaultTimeout );
113+ }
114+ public Response send (Request request , long timeout ) throws SQLException {
115+ return send (request , true , timeout );
123116 }
124- public Response send (Request request , boolean reSend ) throws SQLException {
117+ public Response send (Request request , boolean reSend , long timeout ) throws SQLException {
125118 if (isClosed ()){
126119 throw TSDBError .createSQLException (TSDBErrorNumbers .ERROR_CONNECTION_CLOSED , "Websocket Not Connected Exception" );
127120 }
@@ -164,11 +157,11 @@ public Response send(Request request, boolean reSend) throws SQLException {
164157 }
165158 return response ;
166159 }
167- public Response send (String action , long reqId , long resultId , long type , byte [] rawData ) throws SQLException {
168- return send (action , reqId , resultId , type , rawData , EMPTY_BYTE_ARRAY );
160+ public Response send (String action , long reqId , long resultId , long type , byte [] rawData , long timeout ) throws SQLException {
161+ return send (action , reqId , resultId , type , rawData , EMPTY_BYTE_ARRAY , timeout );
169162 }
170163
171- public Response send (String action , long reqId , long resultId , long type , byte [] rawData , byte [] rawData2 ) throws SQLException {
164+ public Response send (String action , long reqId , long resultId , long type , byte [] rawData , byte [] rawData2 , long timeout ) throws SQLException {
172165 if (isClosed ()){
173166 throw TSDBError .createSQLException (TSDBErrorNumbers .ERROR_CONNECTION_CLOSED , "Websocket Not Connected Exception" );
174167 }
@@ -246,7 +239,7 @@ public void sendFetchBlockAsync(long reqId,
246239 }
247240 }
248241
249- public Response send (String action , long reqId , ByteBuf buffer , boolean resend ) throws SQLException {
242+ public Response send (String action , long reqId , ByteBuf buffer , boolean resend , long timeout ) throws SQLException {
250243 if (isClosed ()){
251244 Utils .releaseByteBuf (buffer );
252245 throw TSDBError .createSQLException (TSDBErrorNumbers .ERROR_CONNECTION_CLOSED , "Websocket Not Connected Exception" );
@@ -304,7 +297,7 @@ private void handleErrInMasterSlaveMode(Response response) throws InterruptedExc
304297 }
305298 }
306299
307- public Response sendWithoutRetry (Request request ) throws SQLException {
300+ public Response sendWithoutRetry (Request request , long timeout ) throws SQLException {
308301 if (isClosed ()){
309302 throw TSDBError .createSQLException (TSDBErrorNumbers .ERROR_CONNECTION_CLOSED , "Websocket Not Connected Exception" );
310303 }
@@ -414,7 +407,7 @@ public void shutdown() {
414407 if (inFlightRequest .hasInFlightRequest ()) {
415408 CompletableFuture <Void > future = CompletableFuture .runAsync (() -> {
416409 try {
417- TimeUnit .MILLISECONDS .sleep (timeout );
410+ TimeUnit .MILLISECONDS .sleep (defaultTimeout );
418411 } catch (InterruptedException e ) {
419412 // ignore
420413 }
@@ -440,7 +433,7 @@ private boolean reconnectCurNode(boolean isTmq) throws SQLException {
440433 // send con msgs
441434 ConnectReq connectReq = new ConnectReq (connectionParam );
442435 ConnectResp auth ;
443- auth = (ConnectResp ) sendWithoutRetry (new Request (Action .CONN .getAction (), connectReq ));
436+ auth = (ConnectResp ) sendWithoutRetry (new Request (Action .CONN .getAction (), connectReq ), defaultTimeout );
444437
445438 if (Code .SUCCESS .getCode () == auth .getCode ()) {
446439 return true ;
0 commit comments