From 16c979c83d3c81b69e801801d285b15c5657648a Mon Sep 17 00:00:00 2001 From: Guian Gumpac Date: Thu, 18 Jun 2026 09:43:59 -0700 Subject: [PATCH 1/4] Standardized `gremlin-go` connection options Assisted-by: Kiro: Claude Opus 4.8 --- CHANGELOG.asciidoc | 8 + docs/src/reference/gremlin-variants.asciidoc | 38 ++-- docs/src/upgrade/release-4.x.x.asciidoc | 44 ++++ gremlin-go/driver/auth/auth.go | 118 ++++++++++ gremlin-go/driver/{ => auth}/auth_test.go | 48 +++- .../driver/{auth.go => auth_deprecated.go} | 22 +- gremlin-go/driver/auth_deprecated_test.go | 132 +++++++++++ gremlin-go/driver/client.go | 100 ++++++--- gremlin-go/driver/client_behavior_test.go | 2 +- gremlin-go/driver/client_test.go | 24 +- gremlin-go/driver/connection.go | 141 +++++++++--- gremlin-go/driver/connection_test.go | 207 ++++++++++++++---- gremlin-go/driver/driverRemoteConnection.go | 88 +++++--- .../driver/driverRemoteConnection_test.go | 37 ---- gremlin-go/driver/interceptor_test.go | 83 ++++--- .../driver/performance/performanceSuite.go | 2 +- gremlin-go/driver/strategies_test.go | 2 +- gremlin-go/driver/transaction_test.go | 8 +- gremlin-go/driver/traversal_test.go | 8 +- gremlin-go/examples/connections.go | 4 +- 20 files changed, 852 insertions(+), 264 deletions(-) create mode 100644 gremlin-go/driver/auth/auth.go rename gremlin-go/driver/{ => auth}/auth_test.go (66%) rename gremlin-go/driver/{auth.go => auth_deprecated.go} (77%) create mode 100644 gremlin-go/driver/auth_deprecated_test.go delete mode 100644 gremlin-go/driver/driverRemoteConnection_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index d2e0ad55cfb..75e5178aaa7 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -55,6 +55,14 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima ** Removed the `maxResponseContentLength` connection option and its `HttpObjectAggregator` cap; responses are streamed and the new `readTimeout` is the partial mitigation. ** Reconciled the `validationRequest` default: the builder default is now `g.inject(0)` to match the `Settings` default (it was previously `''`). * Added configurable CORS `allowedOrigins` setting to Gremlin Server; warns when wildcard origin is used alongside authentication. +* Standardized `gremlin-go` connection options per the TinkerPop 4.x GLV proposal: +** Moved `BasicAuth`/`SigV4Auth`/`SigV4AuthWithCredentials` out of package `gremlingo` into a new `auth` sub-package as `auth.Basic`/`auth.SigV4`/`auth.SigV4WithCredentials`; the flat `gremlingo` functions are retained as deprecated delegators (idiomatic Go `// Deprecated:` doc comments) that produce interceptors equivalent to the `auth.*` constructors, so existing code keeps compiling. New code should use the `auth` sub-package. +** Renamed `MaximumConcurrentConnections` to `MaxConnections` (default 128), `IdleConnectionTimeout` to `IdleTimeout` (default 180s), `KeepAliveInterval` to `KeepAliveTime` (default 30s), `ConnectionTimeout` to `ConnectTimeout` (default 5s), `TlsConfig` to `Ssl` (`*tls.Config`), and `RequestInterceptors` to `Interceptors`. *(breaking)* +** Renamed `EnableCompression` to `Compression`, now a typed `Compression` const (`CompressionNone`/`CompressionDeflate`) defaulting to `CompressionDeflate` (compression on by default; set `CompressionNone` to disable); `Accept-Encoding: deflate` is sent by default and the manual per-chunk deflate decode path is retained. *(breaking)* +** Added `DefaultBatchSize` (default 64), a connection-level default that fills the per-request `batchSize` when unset. +** Added `MaxResponseHeaderBytes`, exposing `http.Transport.MaxResponseHeaderBytes`. +** Added `Proxy` and set `http.Transport.Proxy` to `http.ProxyFromEnvironment` by default, fixing a regression where a custom transport silently dropped environment proxy configuration. +** Added `ReadTimeout`, an idle-read timeout reset on each read (via `SetReadDeadline`) that is re-armed across pooled-connection reuse and does not set `http.Client.Timeout`. * Fixed `ByteBuf` leak in `GraphBinaryMessageSerializerV4` when serialization throws an `IOException`. * Changed `Tree` to no longer extend `HashMap`; it is now a final class with a tree-shaped API (`childAt`, `hasChild`, `contains`, `findSubtree`, `getOrCreateChild`, `getNodesAtDepth`, `getLeafNodes`, `nodeCount`) and is no longer a `Map`. * Changed `count(local)` on a `Tree` to return the total node count (`Tree.nodeCount()`) instead of the root-entry count. diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 3cca5fe43c1..757dab0b6bf 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -189,17 +189,17 @@ headers, or request signing. Plain text and SigV4 authentication are provided as // Plain text authentication remote, err := gremlingo.NewDriverRemoteConnection("https://localhost:8182/gremlin", func(settings *gremlingo.DriverRemoteConnectionSettings) { - settings.TlsConfig = &tls.Config{InsecureSkipVerify: true} - settings.RequestInterceptors = []gremlingo.RequestInterceptor{ - gremlingo.BasicAuth("username", "password"), + settings.Ssl = &tls.Config{InsecureSkipVerify: true} + settings.Interceptors = []gremlingo.RequestInterceptor{ + auth.Basic("username", "password"), } }) // SigV4 authentication remote, err := gremlingo.NewDriverRemoteConnection("https://localhost:8182/gremlin", func(settings *gremlingo.DriverRemoteConnectionSettings) { - settings.RequestInterceptors = []gremlingo.RequestInterceptor{ - gremlingo.SigV4Auth("service-region", "service-name"), + settings.Interceptors = []gremlingo.RequestInterceptor{ + auth.SigV4("service-region", "service-name"), } }) ---- @@ -259,19 +259,23 @@ can be passed to the `NewClient` or `NewDriverRemoteConnection` functions as con |LogVerbosity |Log verbosity.|gremlingo.INFO |Logger |Instance of logger. |log |Language |Language used for logging messages. |language.English -|TlsConfig |TLS configuration. |empty -|ConnectionTimeout | Timeout for establishing connection. |15 seconds -|MaximumConcurrentConnections | Maximum number of concurrent TCP connections to the server. |128 +|Ssl |TLS configuration. |empty +|ConnectTimeout | Timeout for establishing the connection (TCP connect plus TLS handshake). |5 seconds +|MaxConnections | Maximum number of concurrent TCP connections to the server. |128 |MaxIdleConnections | Maximum number of idle (keep-alive) connections in the pool. |8 -|IdleConnectionTimeout | How long idle connections remain in the pool before being closed. |180 seconds -|KeepAliveInterval | TCP keep-alive probe interval. |30 seconds -|EnableCompression |Flag to enable compression. |false +|IdleTimeout | How long idle connections remain in the pool before being closed. |180 seconds +|KeepAliveTime | Idle time before TCP keep-alive probes begin. |30 seconds +|ReadTimeout | Idle-read timeout reset on each read of the response body. Set to `0` to disable. |0 +|Compression |The wire compression negotiated with the server (`gremlingo.CompressionNone` or `gremlingo.CompressionDeflate`). |gremlingo.CompressionDeflate +|DefaultBatchSize |The connection-level default batch size used when a request does not specify one. |64 +|MaxResponseHeaderBytes |Limit on the number of response header bytes the client will read. Maps to `http.Transport.MaxResponseHeaderBytes`. |0 (net/http default) +|Proxy |Function returning the proxy URL to use for a request. When `nil`, uses `http.ProxyFromEnvironment`. |nil |EnableUserAgentOnConnect |Enables sending a user agent to the server during connection requests. More details can be found in provider docs link:https://tinkerpop.apache.org/docs/x.y.z/dev/provider/#_graph_driver_provider_requirements[here].|true -|RequestInterceptors |Functions that modify HTTP requests before sending. Used for authentication and custom headers. |empty +|Interceptors |Functions that modify HTTP requests before sending. Used for authentication and custom headers. |empty |PDTRegistry |A `*PDTRegistry` for hydrating and dehydrating <>. |`nil` -|Auth |A single RequestInterceptor for authentication (e.g. `BasicAuth`). Always appended to the end of the interceptor list so it runs last. |nil +|Auth |A single RequestInterceptor for authentication (e.g. `auth.Basic`). Always appended to the end of the interceptor list so it runs last. |nil |========================================================= [[gremlin-go-interceptors]] @@ -284,17 +288,17 @@ servers without having to use interceptors. This is intended for cases where the A `RequestInterceptor` is a function with the signature `func(*HttpRequest) error` that mutates the `HttpRequest` in place. A slice of these is maintained and will be run sequentially for each request. When creating a -`DriverRemoteConnection` or `Client`, the `RequestInterceptors` field on the settings struct accepts an ordered +`DriverRemoteConnection` or `Client`, the `Interceptors` field on the settings struct accepts an ordered slice of interceptors. Order matters, so if one interceptor depends on another's output, ensure they are added in -the correct order. Note that authentication (e.g. `BasicAuth`, `SigV4Auth`) is also implemented using interceptors. +the correct order. Note that authentication (e.g. `auth.Basic`, `auth.SigV4`) is also implemented using interceptors. The `auth` convenience on connection settings appends the auth interceptor to the end of the list so it runs last. [source,go] ---- remote, err := gremlingo.NewDriverRemoteConnection("http://localhost:8182/gremlin", func(settings *gremlingo.DriverRemoteConnectionSettings) { - settings.RequestInterceptors = []gremlingo.RequestInterceptor{ - gremlingo.BasicAuth("username", "password"), + settings.Interceptors = []gremlingo.RequestInterceptor{ + auth.Basic("username", "password"), func(req *gremlingo.HttpRequest) error { req.Headers.Set("X-Custom-Header", "value") return nil diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index 1558bcd027d..0791fc76791 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -32,6 +32,50 @@ complete list of all the modifications that are part of this release. === Upgrading for Users +==== Standardizing Go Connection Options + +TinkerPop 4.x standardizes connection option names and defaults across the GLVs. In `gremlin-go`, several +`DriverRemoteConnectionSettings`/`ClientSettings` options introduced for the HTTP driver in 4.0.0-beta.2 have been +renamed for consistency, the authentication helpers have moved into a dedicated `auth` sub-package, and a number of +new options have been added. The notes below describe the Go changes. See <> +for the equivalent changes in the other drivers. + +Renames (breaking). The following settings fields have been renamed. Because they are struct fields, they cannot be +aliased, so existing code must be updated to the new names: + +- `MaximumConcurrentConnections` is now `MaxConnections` (default 128). +- `IdleConnectionTimeout` is now `IdleTimeout` (default 180s). +- `KeepAliveInterval` is now `KeepAliveTime` (default 30s). +- `ConnectionTimeout` is now `ConnectTimeout` (default 5s). +- `TlsConfig` (`*tls.Config`) is now `Ssl`. +- `RequestInterceptors` is now `Interceptors`. +- `EnableCompression` is now `Compression`. +- The `BasicAuth`, `SigV4Auth`, and `SigV4AuthWithCredentials` functions have moved out of package `gremlingo` into a +new `auth` sub-package (`github.com/apache/tinkerpop/gremlin-go/v4/driver/auth`) as `auth.Basic`, `auth.SigV4`, and +`auth.SigV4WithCredentials`. The flat `gremlingo` functions are retained as deprecated delegators (marked with +idiomatic Go `// Deprecated:` doc comments) so existing code keeps compiling; new code should use the `auth` +sub-package. + +Behavior changes. These change runtime behavior on upgrade, even if you do not change your configuration: + +- `Compression` is now a typed `Compression` const (`gremlingo.CompressionNone`/`gremlingo.CompressionDeflate`) +defaulting to `CompressionDeflate` (compression on by default), so the driver sends `Accept-Encoding: deflate` by +default. Set `gremlingo.CompressionNone` to disable it. The manual per-chunk deflate decode path is retained. +- `http.Transport.Proxy` now defaults to `http.ProxyFromEnvironment`, so the standard `HTTP_PROXY`/`HTTPS_PROXY`/ +`NO_PROXY` environment variables are honored. Previously the custom transport left `Proxy` unset, silently dropping +any environment proxy configuration. + +New options: + +- `ReadTimeout` (default 0, disabled): a per-read idle timeout reset on each read of the response body and re-armed +across pooled-connection reuse, so it never fires while a pooled connection is idle between requests. +- `MaxResponseHeaderBytes`: exposes `http.Transport.MaxResponseHeaderBytes` (native bytes). +- `Proxy`: an explicit `func(*http.Request) (*url.URL, error)` proxy override for the transport. +- `DefaultBatchSize` (default 64): a connection-level default that fills a request's `batchSize` when it is left +unset. + +See: link:https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg[dev list discussion on standardizing GLV connection options]. + ==== Standardizing Python Connection Options TinkerPop 4.x standardizes connection option names and defaults across the GLVs. In `gremlin-python`, several diff --git a/gremlin-go/driver/auth/auth.go b/gremlin-go/driver/auth/auth.go new file mode 100644 index 00000000000..d24330179d4 --- /dev/null +++ b/gremlin-go/driver/auth/auth.go @@ -0,0 +1,118 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +// Package auth provides authentication interceptors for the gremlin-go driver. +// Each constructor returns a gremlingo.RequestInterceptor that can be assigned to +// the Auth field of ClientSettings or DriverRemoteConnectionSettings. +package auth + +import ( + "context" + "encoding/base64" + "fmt" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/config" + + gremlingo "github.com/apache/tinkerpop/gremlin-go/v4/driver" +) + +// Basic returns a RequestInterceptor that adds a Basic authentication header. +func Basic(username, password string) gremlingo.RequestInterceptor { + encoded := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) + return func(req *gremlingo.HttpRequest) error { + req.Headers.Set(gremlingo.HeaderAuthorization, "Basic "+encoded) + return nil + } +} + +// SigV4 returns a RequestInterceptor that signs requests using AWS SigV4. +// It uses the default AWS credential chain (env vars, shared config, IAM role, etc.) +func SigV4(region, service string) gremlingo.RequestInterceptor { + return SigV4WithCredentials(region, service, nil) +} + +// SigV4WithCredentials returns a RequestInterceptor that signs requests using AWS SigV4 +// with the provided credentials provider. If provider is nil, uses default credential chain. +// If the request body has not been serialized yet (*RequestMessage), it is automatically +// serialized to JSON before signing via SerializeBody(). +// +// Caches the signer and credentials provider for efficiency. +func SigV4WithCredentials(region, service string, credentialsProvider aws.CredentialsProvider) gremlingo.RequestInterceptor { + // Create signer once - it's stateless and safe to reuse + signer := v4.NewSigner() + + // Cache for resolved credentials provider (lazy initialization) + var cachedProvider aws.CredentialsProvider + var providerOnce sync.Once + var providerErr error + + return func(req *gremlingo.HttpRequest) error { + // Ensure body is serialized to JSON bytes before signing. + // SerializeBody is idempotent: safe to call even if already serialized. + if _, err := req.SerializeBody(); err != nil { + return fmt.Errorf("SigV4 signing requires a serialized body: %w", err) + } + + ctx := context.Background() + + // Resolve credentials provider once if not provided + provider := credentialsProvider + if provider == nil { + providerOnce.Do(func() { + cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) + if err != nil { + providerErr = err + return + } + cachedProvider = cfg.Credentials + }) + if providerErr != nil { + return providerErr + } + provider = cachedProvider + } + + // Retrieve credentials (the provider handles caching internally) + creds, err := provider.Retrieve(ctx) + if err != nil { + return err + } + + stdReq, err := req.ToStdRequest() + if err != nil { + return err + } + stdReq.Body = nil // Body is handled separately via payload hash + + if err := signer.SignHTTP(ctx, creds, stdReq, req.PayloadHash(), service, region, time.Now()); err != nil { + return err + } + + // Copy signed headers back to HttpRequest + for k, v := range stdReq.Header { + req.Headers[k] = v + } + + return nil + } +} diff --git a/gremlin-go/driver/auth_test.go b/gremlin-go/driver/auth/auth_test.go similarity index 66% rename from gremlin-go/driver/auth_test.go rename to gremlin-go/driver/auth/auth_test.go index 7ec4079b6ea..26779d7262d 100644 --- a/gremlin-go/driver/auth_test.go +++ b/gremlin-go/driver/auth/auth_test.go @@ -17,7 +17,7 @@ specific language governing permissions and limitations under the License. */ -package gremlingo +package auth import ( "context" @@ -27,26 +27,30 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/stretchr/testify/assert" + + gremlingo "github.com/apache/tinkerpop/gremlin-go/v4/driver" ) -func createMockRequest() *HttpRequest { - req, _ := NewHttpRequest("POST", "https://test_url:8182/gremlin") +const graphBinaryMimeType = "application/vnd.graphbinary-v4.0" + +func createMockRequest() *gremlingo.HttpRequest { + req, _ := gremlingo.NewHttpRequest("POST", "https://test_url:8182/gremlin") req.Headers.Set("Content-Type", graphBinaryMimeType) req.Headers.Set("Accept", graphBinaryMimeType) req.Body = []byte(`{"gremlin":"g.V()"}`) return req } -func TestBasicAuth(t *testing.T) { +func TestBasic(t *testing.T) { t.Run("adds authorization header", func(t *testing.T) { req := createMockRequest() - assert.Empty(t, req.Headers.Get(HeaderAuthorization)) + assert.Empty(t, req.Headers.Get(gremlingo.HeaderAuthorization)) - interceptor := BasicAuth("username", "password") + interceptor := Basic("username", "password") err := interceptor(req) assert.NoError(t, err) - authHeader := req.Headers.Get(HeaderAuthorization) + authHeader := req.Headers.Get(gremlingo.HeaderAuthorization) assert.True(t, strings.HasPrefix(authHeader, "Basic ")) // Verify encoding @@ -72,7 +76,7 @@ func (m *mockCredentialsProvider) Retrieve(ctx context.Context) (aws.Credentials }, nil } -func TestSigV4Auth(t *testing.T) { +func TestSigV4(t *testing.T) { t.Run("adds signed headers", func(t *testing.T) { req := createMockRequest() assert.Empty(t, req.Headers.Get("Authorization")) @@ -82,7 +86,7 @@ func TestSigV4Auth(t *testing.T) { accessKey: "MOCK_ID", secretKey: "MOCK_KEY", } - interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) + interceptor := SigV4WithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) err := interceptor(req) assert.NoError(t, err) @@ -102,7 +106,7 @@ func TestSigV4Auth(t *testing.T) { secretKey: "MOCK_KEY", sessionToken: "MOCK_TOKEN", } - interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) + interceptor := SigV4WithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) err := interceptor(req) assert.NoError(t, err) @@ -111,4 +115,28 @@ func TestSigV4Auth(t *testing.T) { assert.True(t, strings.HasPrefix(authHeader, "AWS4-HMAC-SHA256 Credential=")) assert.Contains(t, authHeader, "gremlin-east-1/tinkerpop-sigv4/aws4_request") }) + + t.Run("auto-serializes *RequestMessage before signing", func(t *testing.T) { + provider := &mockCredentialsProvider{ + accessKey: "MOCK_ID", + secretKey: "MOCK_KEY", + } + interceptor := SigV4WithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) + + req, err := gremlingo.NewHttpRequest("POST", "https://test_url:8182/gremlin") + assert.NoError(t, err) + req.Headers.Set("Content-Type", "application/json") + req.Headers.Set("Accept", graphBinaryMimeType) + req.Body = &gremlingo.RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}} + + err = interceptor(req) + assert.NoError(t, err) + + bodyBytes, ok := req.Body.([]byte) + assert.True(t, ok, "Body should be []byte after auto-serialization") + assert.NotEmpty(t, bodyBytes) + assert.NotEmpty(t, req.Headers.Get("Authorization")) + assert.NotEmpty(t, req.Headers.Get("X-Amz-Date")) + assert.Contains(t, req.Headers.Get("Authorization"), "AWS4-HMAC-SHA256") + }) } diff --git a/gremlin-go/driver/auth.go b/gremlin-go/driver/auth_deprecated.go similarity index 77% rename from gremlin-go/driver/auth.go rename to gremlin-go/driver/auth_deprecated.go index 411458e65c3..5f4f1a8143b 100644 --- a/gremlin-go/driver/auth.go +++ b/gremlin-go/driver/auth_deprecated.go @@ -31,7 +31,17 @@ import ( "github.com/aws/aws-sdk-go-v2/config" ) -// BasicAuth returns a RequestInterceptor that adds Basic authentication header. +// This file retains the flat authentication functions that previously lived in +// package gremlingo. They are kept as deprecated wrappers so existing code keeps +// compiling, and they produce interceptors equivalent to the constructors in the +// auth sub-package. +// +// Note: these functions cannot literally call auth.Basic / auth.SigV4 because the +// auth sub-package imports this package (for RequestInterceptor, HttpRequest, etc.), +// so a back-import would create an import cycle. They are therefore re-implemented +// inline to mirror the auth sub-package behavior exactly. + +// Deprecated: As of 4.0.0, BasicAuth is replaced by auth.Basic in the auth sub-package. func BasicAuth(username, password string) RequestInterceptor { encoded := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) return func(req *HttpRequest) error { @@ -40,18 +50,12 @@ func BasicAuth(username, password string) RequestInterceptor { } } -// SigV4Auth returns a RequestInterceptor that signs requests using AWS SigV4. -// It uses the default AWS credential chain (env vars, shared config, IAM role, etc.) +// Deprecated: As of 4.0.0, SigV4Auth is replaced by auth.SigV4 in the auth sub-package. func SigV4Auth(region, service string) RequestInterceptor { return SigV4AuthWithCredentials(region, service, nil) } -// SigV4AuthWithCredentials returns a RequestInterceptor that signs requests using AWS SigV4 -// with the provided credentials provider. If provider is nil, uses default credential chain. -// If the request body has not been serialized yet (*RequestMessage), it is automatically -// serialized to JSON before signing via SerializeBody(). -// -// Caches the signer and credentials provider for efficiency. +// Deprecated: As of 4.0.0, SigV4AuthWithCredentials is replaced by auth.SigV4WithCredentials in the auth sub-package. func SigV4AuthWithCredentials(region, service string, credentialsProvider aws.CredentialsProvider) RequestInterceptor { // Create signer once - it's stateless and safe to reuse signer := v4.NewSigner() diff --git a/gremlin-go/driver/auth_deprecated_test.go b/gremlin-go/driver/auth_deprecated_test.go new file mode 100644 index 00000000000..ac0ce7bf72a --- /dev/null +++ b/gremlin-go/driver/auth_deprecated_test.go @@ -0,0 +1,132 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package gremlingo + +import ( + "context" + "encoding/base64" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/stretchr/testify/assert" +) + +const deprecatedAuthMimeType = "application/vnd.graphbinary-v4.0" + +func newDeprecatedAuthRequest(t *testing.T) *HttpRequest { + req, err := NewHttpRequest("POST", "https://test_url:8182/gremlin") + assert.NoError(t, err) + req.Headers.Set("Content-Type", deprecatedAuthMimeType) + req.Headers.Set("Accept", deprecatedAuthMimeType) + req.Body = []byte(`{"gremlin":"g.V()"}`) + return req +} + +// mockAuthCredentialsProvider implements aws.CredentialsProvider for testing. +type mockAuthCredentialsProvider struct { + accessKey string + secretKey string + sessionToken string +} + +func (m *mockAuthCredentialsProvider) Retrieve(ctx context.Context) (aws.Credentials, error) { + return aws.Credentials{ + AccessKeyID: m.accessKey, + SecretAccessKey: m.secretKey, + SessionToken: m.sessionToken, + }, nil +} + +// TestDeprecatedBasicAuth verifies the deprecated BasicAuth delegator produces a +// working interceptor that sets the Authorization header, equivalent to auth.Basic. +func TestDeprecatedBasicAuth(t *testing.T) { + t.Run("adds authorization header", func(t *testing.T) { + req := newDeprecatedAuthRequest(t) + assert.Empty(t, req.Headers.Get(HeaderAuthorization)) + + interceptor := BasicAuth("username", "password") + err := interceptor(req) + + assert.NoError(t, err) + authHeader := req.Headers.Get(HeaderAuthorization) + assert.True(t, strings.HasPrefix(authHeader, "Basic ")) + + encoded := strings.TrimPrefix(authHeader, "Basic ") + decoded, err := base64.StdEncoding.DecodeString(encoded) + assert.NoError(t, err) + assert.Equal(t, "username:password", string(decoded)) + }) + + t.Run("matches expected encoding", func(t *testing.T) { + req := newDeprecatedAuthRequest(t) + err := BasicAuth("user", "pass")(req) + assert.NoError(t, err) + + expected := "Basic " + base64.StdEncoding.EncodeToString([]byte("user:pass")) + assert.Equal(t, expected, req.Headers.Get(HeaderAuthorization)) + }) +} + +// TestDeprecatedSigV4Auth verifies the deprecated SigV4 delegators produce working +// interceptors that sign requests, equivalent to auth.SigV4WithCredentials. +func TestDeprecatedSigV4Auth(t *testing.T) { + t.Run("adds signed headers", func(t *testing.T) { + req := newDeprecatedAuthRequest(t) + assert.Empty(t, req.Headers.Get("Authorization")) + assert.Empty(t, req.Headers.Get("X-Amz-Date")) + + provider := &mockAuthCredentialsProvider{ + accessKey: "MOCK_ID", + secretKey: "MOCK_KEY", + } + interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) + err := interceptor(req) + + assert.NoError(t, err) + assert.NotEmpty(t, req.Headers.Get("X-Amz-Date")) + authHeader := req.Headers.Get("Authorization") + assert.True(t, strings.HasPrefix(authHeader, "AWS4-HMAC-SHA256 Credential=MOCK_ID")) + assert.Contains(t, authHeader, "gremlin-east-1/tinkerpop-sigv4/aws4_request") + assert.Contains(t, authHeader, "Signature=") + }) + + t.Run("adds session token when provided", func(t *testing.T) { + req := newDeprecatedAuthRequest(t) + assert.Empty(t, req.Headers.Get("X-Amz-Security-Token")) + + provider := &mockAuthCredentialsProvider{ + accessKey: "MOCK_ID", + secretKey: "MOCK_KEY", + sessionToken: "MOCK_TOKEN", + } + interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) + err := interceptor(req) + + assert.NoError(t, err) + assert.Equal(t, "MOCK_TOKEN", req.Headers.Get("X-Amz-Security-Token")) + }) + + t.Run("SigV4Auth delegates to credential chain variant", func(t *testing.T) { + // SigV4Auth with no explicit provider should still return a usable interceptor. + interceptor := SigV4Auth("gremlin-east-1", "tinkerpop-sigv4") + assert.NotNil(t, interceptor) + }) +} diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index aa34406fcd6..72b4a86002d 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -21,6 +21,8 @@ package gremlingo import ( "crypto/tls" + "net/http" + "net/url" "reflect" "sync" "time" @@ -28,47 +30,74 @@ import ( "golang.org/x/text/language" ) -const connectionTimeoutDefault = 5 * time.Second - // ClientSettings is used to modify a Client's settings on initialization. type ClientSettings struct { - TraversalSource string - LogVerbosity LogVerbosity - Logger Logger - Language language.Tag - TlsConfig *tls.Config - ConnectionTimeout time.Duration - EnableCompression bool - - // MaximumConcurrentConnections is the maximum number of concurrent TCP connections + TraversalSource string + LogVerbosity LogVerbosity + Logger Logger + Language language.Tag + + // Ssl is the TLS configuration used for secure (wss/https) connections. + Ssl *tls.Config + + // ConnectTimeout is the TCP/transport-establishment timeout (TCP connect plus + // TLS handshake where applicable), not an HTTP request timeout. + // Default: 5 seconds. Set to 0 to use the default. + ConnectTimeout time.Duration + + // ReadTimeout is an idle-read timeout: it is reset on each read of the response + // body rather than bounding the whole request. Streaming-safe. The deadline is + // re-armed across pooled-connection reuse. + // Default: 0 (disabled). + ReadTimeout time.Duration + + // Compression selects the content-encoding negotiated with the server. + // Default: CompressionDeflate (on). Set to CompressionNone to disable compression. + Compression Compression + + // MaxConnections is the maximum number of concurrent TCP connections // to the Gremlin server. This limits how many requests can be in-flight simultaneously. // Default: 128. Set to 0 to use the default. - MaximumConcurrentConnections int + MaxConnections int // MaxIdleConnections is the maximum number of idle (keep-alive) connections to retain // in the connection pool. Idle connections are reused for subsequent requests. // Default: 8. Set to 0 to use the default. MaxIdleConnections int - // IdleConnectionTimeout is how long an idle connection remains in the pool before + // IdleTimeout is how long an idle connection remains in the pool before // being closed. Set this to match your server's idle timeout if needed. // Default: 180 seconds (3 minutes). Set to 0 to use the default. - IdleConnectionTimeout time.Duration + IdleTimeout time.Duration - // KeepAliveInterval is the interval between TCP keep-alive probes on idle connections. + // KeepAliveTime is the TCP keep-alive idle-before-probe interval on connections. // This helps detect dead connections and keeps connections alive through firewalls. // Default: 30 seconds. Set to 0 to use the default. - KeepAliveInterval time.Duration + KeepAliveTime time.Duration + + // DefaultBatchSize is the connection-level default that fills a request's batchSize + // when it is not set per-request. + // Default: 64. Set to 0 to use the default. + DefaultBatchSize int + + // MaxResponseHeaderBytes limits the number of response header bytes the client will + // read. Maps to http.Transport.MaxResponseHeaderBytes. + // Default: 0 (use net/http's default). + MaxResponseHeaderBytes int64 + + // Proxy returns the proxy URL to use for a given request. When nil, the client + // uses http.ProxyFromEnvironment (HTTP_PROXY/HTTPS_PROXY/NO_PROXY). + Proxy func(*http.Request) (*url.URL, error) EnableUserAgentOnConnect bool // PDTRegistry enables automatic hydration of ProviderDefinedType values during deserialization. PDTRegistry *PDTRegistry - // RequestInterceptors are functions that modify HTTP requests before sending. - RequestInterceptors []RequestInterceptor + // Interceptors are functions that modify HTTP requests before sending. + Interceptors []RequestInterceptor - // Auth is a RequestInterceptor for authentication (e.g. BasicAuth, SigV4Auth). + // Auth is a RequestInterceptor for authentication (e.g. auth.Basic, auth.SigV4). // As a convenience, this is always appended to the end of the interceptor list // so it runs last, after any user interceptors have modified the request. Auth RequestInterceptor @@ -92,28 +121,33 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C LogVerbosity: Info, Logger: &defaultLogger{}, Language: language.English, - TlsConfig: &tls.Config{}, - ConnectionTimeout: connectionTimeoutDefault, - EnableCompression: false, + Ssl: &tls.Config{}, + ConnectTimeout: defaultConnectTimeout, + Compression: CompressionDeflate, EnableUserAgentOnConnect: true, - MaximumConcurrentConnections: 0, // Use default (128) - MaxIdleConnections: 0, // Use default (8) - IdleConnectionTimeout: 0, // Use default (180s) - KeepAliveInterval: 0, // Use default (30s) + MaxConnections: 0, // Use default (128) + MaxIdleConnections: 0, // Use default (8) + IdleTimeout: 0, // Use default (180s) + KeepAliveTime: 0, // Use default (30s) + DefaultBatchSize: 0, // Use default (64) } for _, configuration := range configurations { configuration(settings) } connSettings := &connectionSettings{ - tlsConfig: settings.TlsConfig, - connectionTimeout: settings.ConnectionTimeout, - maxConnsPerHost: settings.MaximumConcurrentConnections, + ssl: settings.Ssl, + connectTimeout: settings.ConnectTimeout, + readTimeout: settings.ReadTimeout, + maxConnsPerHost: settings.MaxConnections, maxIdleConnsPerHost: settings.MaxIdleConnections, - idleConnTimeout: settings.IdleConnectionTimeout, - keepAliveInterval: settings.KeepAliveInterval, - enableCompression: settings.EnableCompression, + idleTimeout: settings.IdleTimeout, + keepAliveTime: settings.KeepAliveTime, + compression: settings.Compression, + maxResponseHeaderBytes: settings.MaxResponseHeaderBytes, + defaultBatchSize: settings.DefaultBatchSize, + proxy: settings.Proxy, enableUserAgentOnConnect: settings.EnableUserAgentOnConnect, pdtRegistry: settings.PDTRegistry, } @@ -123,7 +157,7 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C conn := newConnection(logHandler, url, connSettings) // Add user-provided interceptors - for _, interceptor := range settings.RequestInterceptors { + for _, interceptor := range settings.Interceptors { conn.AddInterceptor(interceptor) } diff --git a/gremlin-go/driver/client_behavior_test.go b/gremlin-go/driver/client_behavior_test.go index dee4e24d8a5..f274c0ad2c5 100644 --- a/gremlin-go/driver/client_behavior_test.go +++ b/gremlin-go/driver/client_behavior_test.go @@ -144,7 +144,7 @@ func TestShouldHandleMalformedResponse(t *testing.T) { func TestShouldHandleEmptyResponseBody(t *testing.T) { url := socketServerURL() client, err := NewClient(url, func(settings *ClientSettings) { - settings.ConnectionTimeout = 5 * time.Second + settings.ConnectTimeout = 5 * time.Second }) if err != nil { t.Skip("Socket server not available") diff --git a/gremlin-go/driver/client_test.go b/gremlin-go/driver/client_test.go index ffffcad5cb5..f2623fdb609 100644 --- a/gremlin-go/driver/client_test.go +++ b/gremlin-go/driver/client_test.go @@ -37,7 +37,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig }) assert.NoError(t, err) assert.NotNil(t, client) @@ -56,7 +56,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig }) assert.NoError(t, err) assert.NotNil(t, client) @@ -71,7 +71,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig settings.TraversalSource = testServerModernGraphAlias }) assert.NoError(t, err) @@ -93,7 +93,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig settings.TraversalSource = testServerModernGraphAlias }) assert.NoError(t, err) @@ -117,7 +117,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig settings.TraversalSource = testServerModernGraphAlias }) assert.NoError(t, err) @@ -141,7 +141,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig settings.TraversalSource = testServerModernGraphAlias }) @@ -163,7 +163,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig }) assert.NoError(t, err) assert.NotNil(t, client) @@ -193,7 +193,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig }) assert.NoError(t, err) assert.NotNil(t, client) @@ -217,7 +217,7 @@ func TestClient(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig + settings.Ssl = testNoAuthTlsConfig settings.TraversalSource = testServerCrewGraphAlias }) @@ -302,7 +302,7 @@ func TestProviderDefinedTypeIntegration(t *testing.T) { t.Run("simple Point PDT round-trip", func(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = testServerModernGraphAlias }) require.NoError(t, err) @@ -325,7 +325,7 @@ func TestProviderDefinedTypeIntegration(t *testing.T) { t.Run("nested PDT (Person with Address)", func(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = testServerModernGraphAlias }) require.NoError(t, err) @@ -357,7 +357,7 @@ func TestProviderDefinedTypeIntegration(t *testing.T) { t.Run("PDT in collection", func(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = testServerModernGraphAlias }) require.NoError(t, err) diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go index 8673414d254..e92a4f080d5 100644 --- a/gremlin-go/driver/connection.go +++ b/gremlin-go/driver/connection.go @@ -22,26 +22,42 @@ package gremlingo import ( "bytes" "compress/zlib" + "context" "crypto/tls" "encoding/json" "fmt" "io" "net" "net/http" + "net/url" "strings" "sync" "time" ) +// Compression identifies the content-encoding negotiated with the Gremlin server. +type Compression string + +const ( + // CompressionNone disables compression. No Accept-Encoding header is sent. + CompressionNone Compression = "none" + // CompressionDeflate requests per-chunk deflate compression from the server (default). + CompressionDeflate Compression = "deflate" +) + // connectionSettings holds configuration for the connection. type connectionSettings struct { - tlsConfig *tls.Config - connectionTimeout time.Duration + ssl *tls.Config + connectTimeout time.Duration + readTimeout time.Duration maxConnsPerHost int maxIdleConnsPerHost int - idleConnTimeout time.Duration - keepAliveInterval time.Duration - enableCompression bool + idleTimeout time.Duration + keepAliveTime time.Duration + compression Compression + maxResponseHeaderBytes int64 + defaultBatchSize int + proxy func(*http.Request) (*url.URL, error) enableUserAgentOnConnect bool pdtRegistry *PDTRegistry } @@ -61,16 +77,17 @@ type connection struct { const ( defaultMaxConnsPerHost = 128 // Java: ConnectionPool.MAX_POOL_SIZE defaultMaxIdleConnsPerHost = 8 // Keep some connections warm - defaultIdleConnTimeout = 180 * time.Second // Java: CONNECTION_IDLE_TIMEOUT_MILLIS - defaultConnectionTimeout = 15 * time.Second // Java: CONNECTION_SETUP_TIMEOUT_MILLIS - defaultKeepAliveInterval = 30 * time.Second // TCP keep-alive probe interval + defaultIdleTimeout = 180 * time.Second // Java: CONNECTION_IDLE_TIMEOUT_MILLIS + defaultConnectTimeout = 5 * time.Second // TCP/transport-establishment timeout + defaultKeepAliveTime = 30 * time.Second // TCP keep-alive idle-before-probe interval + defaultBatchSizeValue = 64 // Java: resultIterationBatchSize default ) func newConnection(handler *logHandler, url string, connSettings *connectionSettings) *connection { // Apply defaults for zero values - connectionTimeout := connSettings.connectionTimeout - if connectionTimeout == 0 { - connectionTimeout = defaultConnectionTimeout + connectTimeout := connSettings.connectTimeout + if connectTimeout == 0 { + connectTimeout = defaultConnectTimeout } maxConnsPerHost := connSettings.maxConnsPerHost @@ -83,26 +100,57 @@ func newConnection(handler *logHandler, url string, connSettings *connectionSett maxIdleConnsPerHost = defaultMaxIdleConnsPerHost } - idleConnTimeout := connSettings.idleConnTimeout - if idleConnTimeout == 0 { - idleConnTimeout = defaultIdleConnTimeout + idleTimeout := connSettings.idleTimeout + if idleTimeout == 0 { + idleTimeout = defaultIdleTimeout } - keepAliveInterval := connSettings.keepAliveInterval - if keepAliveInterval == 0 { - keepAliveInterval = defaultKeepAliveInterval + keepAliveTime := connSettings.keepAliveTime + if keepAliveTime == 0 { + keepAliveTime = defaultKeepAliveTime + } + + // Default the proxy resolver to the environment (HTTP_PROXY/HTTPS_PROXY/NO_PROXY) + // unless an explicit override was provided. A custom http.Transport otherwise + // silently drops environment proxy configuration. + proxy := connSettings.proxy + if proxy == nil { + proxy = http.ProxyFromEnvironment + } + + dialer := &net.Dialer{ + Timeout: connectTimeout, + KeepAlive: keepAliveTime, + } + + readTimeout := connSettings.readTimeout + dialContext := dialer.DialContext + if readTimeout > 0 { + // Wrap each dialed connection so every Read re-arms the read deadline. + // This models an idle-read (per-read) timeout rather than a whole-request + // deadline, and resets correctly across pooled-connection reuse because the + // deadline is refreshed on every Read regardless of which request reuses it. + dialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + conn, err := dialer.DialContext(ctx, network, addr) + if err != nil { + return nil, err + } + return &readTimeoutConn{Conn: conn, timeout: readTimeout}, nil + } } transport := &http.Transport{ - DialContext: (&net.Dialer{ - Timeout: connectionTimeout, - KeepAlive: keepAliveInterval, - }).DialContext, - TLSClientConfig: connSettings.tlsConfig, - MaxConnsPerHost: maxConnsPerHost, - MaxIdleConnsPerHost: maxIdleConnsPerHost, - IdleConnTimeout: idleConnTimeout, - DisableCompression: !connSettings.enableCompression, + Proxy: proxy, + DialContext: dialContext, + TLSClientConfig: connSettings.ssl, + MaxConnsPerHost: maxConnsPerHost, + MaxIdleConnsPerHost: maxIdleConnsPerHost, + IdleConnTimeout: idleTimeout, + MaxResponseHeaderBytes: connSettings.maxResponseHeaderBytes, + // The server compresses per-GraphBinary-chunk (deflate) rather than using + // generic HTTP compression, so the manual decode path in getReader handles + // decompression. Disable net/http's transparent (gzip-only) handling. + DisableCompression: true, } return &connection{ @@ -118,12 +166,34 @@ func (c *connection) AddInterceptor(interceptor RequestInterceptor) { c.interceptors = append(c.interceptors, interceptor) } +// applyDefaultBatchSize fills the request's batchSize field with the connection-level +// default when the per-request value is unset. This is a client-side default-fill; it +// adds no wire field unless a batch size is in effect. +func (c *connection) applyDefaultBatchSize(req *RequestMessage) { + if req == nil || c.connSettings == nil { + return + } + batchSize := c.connSettings.defaultBatchSize + if batchSize == 0 { + batchSize = defaultBatchSizeValue + } + if req.Fields == nil { + req.Fields = make(map[string]interface{}) + } + if _, ok := req.Fields["batchSize"]; !ok { + req.Fields["batchSize"] = batchSize + } +} + // submit sends request and streams results directly to ResultSet. // Blocks until response headers arrive, ensuring the server has acknowledged // receipt of the request before returning. func (c *connection) submit(req *RequestMessage) (ResultSet, error) { rs := newChannelResultSet() + // Fill the connection-level default batchSize when the request did not set one. + c.applyDefaultBatchSize(req) + // Send the HTTP request synchronously — blocks until response headers arrive resp, err := c.sendRequest(req) if err != nil { @@ -273,7 +343,7 @@ func (c *connection) setHttpRequestHeaders(req *HttpRequest) { if c.connSettings.enableUserAgentOnConnect { req.Headers.Set(HeaderUserAgent, userAgent) } - if c.connSettings.enableCompression { + if c.connSettings.compression == CompressionDeflate { req.Headers.Set(HeaderAcceptEncoding, "deflate") } } @@ -381,3 +451,20 @@ func (c *connection) close() { c.wg.Wait() c.httpClient.CloseIdleConnections() } + +// readTimeoutConn wraps a net.Conn to enforce a per-read (idle-read) timeout. +// Each Read resets the read deadline to now+timeout, so the deadline measures the +// gap between reads rather than the total request duration. Because the deadline is +// re-armed on every Read, it resets correctly when a pooled connection is reused for +// a subsequent request. +type readTimeoutConn struct { + net.Conn + timeout time.Duration +} + +func (c *readTimeoutConn) Read(b []byte) (int, error) { + if err := c.Conn.SetReadDeadline(time.Now().Add(c.timeout)); err != nil { + return 0, err + } + return c.Conn.Read(b) +} diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index 9314b0616d7..d5a03a4db73 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -22,11 +22,13 @@ package gremlingo import ( "bytes" "crypto/tls" + "encoding/base64" "fmt" "io" "math/big" "net/http" "net/http/httptest" + "net/url" "os" "reflect" "sort" @@ -60,11 +62,22 @@ const basicAuthWithSsl = "https://localhost:45941/gremlin" var testNames = []string{"Lyndon", "Yang", "Simon", "Rithin", "Alexey", "Valentyn"} +// testBasicAuthInterceptor builds a Basic authentication interceptor inline, mirroring +// auth.Basic. The auth sub-package cannot be imported from package gremlingo tests +// because it imports gremlingo (one-directional dependency). +func testBasicAuthInterceptor(username, password string) RequestInterceptor { + encoded := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) + return func(req *HttpRequest) error { + req.Headers.Set(HeaderAuthorization, "Basic "+encoded) + return nil + } +} + func newDefaultConnectionSettings() *connectionSettings { return &connectionSettings{ - tlsConfig: &tls.Config{}, - connectionTimeout: connectionTimeoutDefault, - enableCompression: false, + ssl: &tls.Config{}, + connectTimeout: defaultConnectTimeout, + compression: CompressionDeflate, enableUserAgentOnConnect: true, } } @@ -95,7 +108,7 @@ func addTestData(t *testing.T, g *GraphTraversalSource) { func getTestGraph(t *testing.T, url string, tls *tls.Config) *GraphTraversalSource { remote, err := NewDriverRemoteConnection(url, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = tls + settings.Ssl = tls settings.TraversalSource = testServerGraphAlias }) assert.Nil(t, err) @@ -280,8 +293,8 @@ func TestConnection(t *testing.T) { client, err := NewClient(testNoAuthUrl, //client, err := NewClient(noAuthSslUrl, func(settings *ClientSettings) { - settings.TlsConfig = &tlsConf - settings.EnableCompression = true + settings.Ssl = &tlsConf + settings.Compression = CompressionDeflate settings.TraversalSource = testServerModernGraphAlias }) assert.Nil(t, err) @@ -310,8 +323,8 @@ func TestConnection(t *testing.T) { client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { - settings.TlsConfig = testNoAuthTlsConfig - settings.EnableCompression = true + settings.Ssl = testNoAuthTlsConfig + settings.Compression = CompressionDeflate settings.TraversalSource = testServerModernGraphAlias }) assert.Nil(t, err) @@ -507,9 +520,9 @@ func TestConnection(t *testing.T) { skipTestsIfNotEnabled(t, basicAuthIntegrationTestSuite, testBasicAuthEnable) remote, err := NewDriverRemoteConnection(testBasicAuthUrl, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = testBasicAuthTlsConfig - settings.RequestInterceptors = []RequestInterceptor{ - BasicAuth(testBasicAuthUsername, testBasicAuthPassword), + settings.Ssl = testBasicAuthTlsConfig + settings.Interceptors = []RequestInterceptor{ + testBasicAuthInterceptor(testBasicAuthUsername, testBasicAuthPassword), } }) assert.Nil(t, err) @@ -612,7 +625,7 @@ func TestConnection(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthWithAliasEnable) remote, err := NewDriverRemoteConnection(testNoAuthWithAliasUrl, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = testNoAuthWithAliasTlsConfig + settings.Ssl = testNoAuthWithAliasTlsConfig settings.TraversalSource = testServerModernGraphAlias }) assert.Nil(t, err) @@ -827,7 +840,7 @@ func TestStreamingResultDelivery(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthWithAliasEnable) remote, err := NewDriverRemoteConnection(getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl), func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = "ggrateful" }) assert.Nil(t, err) @@ -898,7 +911,7 @@ func TestNewConnection(t *testing.T) { t.Run("applies TLS config", func(t *testing.T) { tlsConfig := &tls.Config{InsecureSkipVerify: true} conn := newConnection(newTestLogHandler(), "https://localhost:8182/gremlin", &connectionSettings{ - tlsConfig: tlsConfig, + ssl: tlsConfig, }) transport := conn.httpClient.Transport.(*http.Transport) @@ -929,7 +942,7 @@ func TestSetHttpRequestHeaders(t *testing.T) { t.Run("sets compression header when enabled", func(t *testing.T) { conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{ - enableCompression: true, + compression: CompressionDeflate, }) req, _ := NewHttpRequest(http.MethodPost, "http://localhost/gremlin") @@ -939,6 +952,31 @@ func TestSetHttpRequestHeaders(t *testing.T) { }) } +func TestCompressionDefaults(t *testing.T) { + t.Run("NewClient defaults compression to deflate", func(t *testing.T) { + client, err := NewClient("http://localhost/gremlin") + assert.Nil(t, err) + defer client.Close() + assert.Equal(t, CompressionDeflate, client.connectionSettings.compression) + }) + + t.Run("NewDriverRemoteConnection defaults compression to deflate", func(t *testing.T) { + drc, err := NewDriverRemoteConnection("http://localhost/gremlin") + assert.Nil(t, err) + defer drc.Close() + assert.Equal(t, CompressionDeflate, drc.client.connectionSettings.compression) + }) + + t.Run("default connection sends Accept-Encoding deflate", func(t *testing.T) { + conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", newDefaultConnectionSettings()) + req, _ := NewHttpRequest(http.MethodPost, "http://localhost/gremlin") + + conn.setHttpRequestHeaders(req) + + assert.Equal(t, "deflate", req.Headers.Get(HeaderAcceptEncoding)) + }) +} + func TestGetReader(t *testing.T) { conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{}) @@ -986,7 +1024,7 @@ func TestGetReader(t *testing.T) { func TestConnectionWithMockServer(t *testing.T) { t.Run("handles connection error", func(t *testing.T) { conn := newConnection(newTestLogHandler(), "http://localhost:99999/gremlin", &connectionSettings{ - connectionTimeout: 100 * time.Millisecond, + connectTimeout: 100 * time.Millisecond, }) _, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) @@ -1003,7 +1041,7 @@ func TestConnectionWithMockServer(t *testing.T) { conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{ enableUserAgentOnConnect: true, - enableCompression: true, + compression: CompressionDeflate, }) rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) @@ -1188,9 +1226,9 @@ func TestConnectionPoolSettings(t *testing.T) { customSettings := &connectionSettings{ maxConnsPerHost: 256, maxIdleConnsPerHost: 16, - idleConnTimeout: 300 * time.Second, - keepAliveInterval: 60 * time.Second, - connectionTimeout: 30 * time.Second, + idleTimeout: 300 * time.Second, + keepAliveTime: 60 * time.Second, + connectTimeout: 30 * time.Second, } conn := newConnection(newTestLogHandler(), "http://localhost:8182/gremlin", customSettings) @@ -1221,14 +1259,107 @@ func TestConnectionPoolSettings(t *testing.T) { }) } +func TestConnectionNewOptions(t *testing.T) { + t.Run("proxy defaults to environment when unset", func(t *testing.T) { + conn := newConnection(newTestLogHandler(), "http://localhost:8182/gremlin", &connectionSettings{}) + transport := conn.httpClient.Transport.(*http.Transport) + assert.NotNil(t, transport.Proxy, "Proxy should default to http.ProxyFromEnvironment") + }) + + t.Run("explicit proxy override is used", func(t *testing.T) { + proxyURL, _ := url.Parse("http://proxy.example.com:3128") + conn := newConnection(newTestLogHandler(), "http://localhost:8182/gremlin", &connectionSettings{ + proxy: func(*http.Request) (*url.URL, error) { return proxyURL, nil }, + }) + transport := conn.httpClient.Transport.(*http.Transport) + req, _ := http.NewRequest(http.MethodGet, "http://localhost:8182/gremlin", nil) + got, err := transport.Proxy(req) + assert.NoError(t, err) + assert.Equal(t, proxyURL, got) + }) + + t.Run("max response header bytes is wired to transport", func(t *testing.T) { + conn := newConnection(newTestLogHandler(), "http://localhost:8182/gremlin", &connectionSettings{ + maxResponseHeaderBytes: 16384, + }) + transport := conn.httpClient.Transport.(*http.Transport) + assert.Equal(t, int64(16384), transport.MaxResponseHeaderBytes) + }) + + t.Run("compression disabled means no Accept-Encoding header", func(t *testing.T) { + conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{ + compression: CompressionNone, + }) + req, _ := NewHttpRequest(http.MethodPost, "http://localhost/gremlin") + conn.setHttpRequestHeaders(req) + assert.Empty(t, req.Headers.Get(HeaderAcceptEncoding)) + }) + + t.Run("default batch size fills unset request batchSize", func(t *testing.T) { + conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{}) + msg := &RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}} + conn.applyDefaultBatchSize(msg) + assert.Equal(t, defaultBatchSizeValue, msg.Fields["batchSize"]) + }) + + t.Run("custom default batch size fills unset request batchSize", func(t *testing.T) { + conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{ + defaultBatchSize: 256, + }) + msg := &RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}} + conn.applyDefaultBatchSize(msg) + assert.Equal(t, 256, msg.Fields["batchSize"]) + }) + + t.Run("default batch size does not override an explicit request batchSize", func(t *testing.T) { + conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{ + defaultBatchSize: 256, + }) + msg := &RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{"batchSize": 10}} + conn.applyDefaultBatchSize(msg) + assert.Equal(t, 10, msg.Fields["batchSize"]) + }) + + t.Run("read timeout wraps dialed connections and resets per read", func(t *testing.T) { + // A slow server that writes the response body in two chunks with a gap + // shorter than the read timeout should succeed because the deadline is + // reset on each read. + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + flusher, _ := w.(http.Flusher) + w.WriteHeader(http.StatusOK) + w.Write([]byte("part1")) + if flusher != nil { + flusher.Flush() + } + time.Sleep(50 * time.Millisecond) + w.Write([]byte("part2")) + })) + defer server.Close() + + conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{ + readTimeout: 500 * time.Millisecond, + }) + + // The read-timeout wrapping is applied at dial time; assert the request + // completes without the deadline firing between chunks. + httpReq, _ := http.NewRequest(http.MethodGet, server.URL, nil) + resp, err := conn.httpClient.Do(httpReq) + require.NoError(t, err) + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + assert.Equal(t, "part1part2", string(body)) + }) +} + func TestClientSettingsWiring(t *testing.T) { t.Run("ClientSettings wires connection pool settings", func(t *testing.T) { client, err := NewClient("http://localhost:8182/gremlin", func(settings *ClientSettings) { - settings.MaximumConcurrentConnections = 200 + settings.MaxConnections = 200 settings.MaxIdleConnections = 20 - settings.IdleConnectionTimeout = 240 * time.Second - settings.KeepAliveInterval = 45 * time.Second + settings.IdleTimeout = 240 * time.Second + settings.KeepAliveTime = 45 * time.Second }) require.NoError(t, err) defer client.Close() @@ -1236,8 +1367,8 @@ func TestClientSettingsWiring(t *testing.T) { // Verify settings were wired to connectionSettings assert.Equal(t, 200, client.connectionSettings.maxConnsPerHost) assert.Equal(t, 20, client.connectionSettings.maxIdleConnsPerHost) - assert.Equal(t, 240*time.Second, client.connectionSettings.idleConnTimeout) - assert.Equal(t, 45*time.Second, client.connectionSettings.keepAliveInterval) + assert.Equal(t, 240*time.Second, client.connectionSettings.idleTimeout) + assert.Equal(t, 45*time.Second, client.connectionSettings.keepAliveTime) // Verify settings were applied to http.Transport transport := client.conn.httpClient.Transport.(*http.Transport) @@ -1254,8 +1385,8 @@ func TestClientSettingsWiring(t *testing.T) { // Verify defaults are used (0 in settings means use default) assert.Equal(t, 0, client.connectionSettings.maxConnsPerHost) assert.Equal(t, 0, client.connectionSettings.maxIdleConnsPerHost) - assert.Equal(t, time.Duration(0), client.connectionSettings.idleConnTimeout) - assert.Equal(t, time.Duration(0), client.connectionSettings.keepAliveInterval) + assert.Equal(t, time.Duration(0), client.connectionSettings.idleTimeout) + assert.Equal(t, time.Duration(0), client.connectionSettings.keepAliveTime) // Verify defaults were applied to http.Transport transport := client.conn.httpClient.Transport.(*http.Transport) @@ -1269,10 +1400,10 @@ func TestDriverRemoteConnectionSettingsWiring(t *testing.T) { t.Run("DriverRemoteConnectionSettings wires connection pool settings", func(t *testing.T) { drc, err := NewDriverRemoteConnection("http://localhost:8182/gremlin", func(settings *DriverRemoteConnectionSettings) { - settings.MaximumConcurrentConnections = 150 + settings.MaxConnections = 150 settings.MaxIdleConnections = 15 - settings.IdleConnectionTimeout = 200 * time.Second - settings.KeepAliveInterval = 40 * time.Second + settings.IdleTimeout = 200 * time.Second + settings.KeepAliveTime = 40 * time.Second }) require.NoError(t, err) defer drc.Close() @@ -1280,8 +1411,8 @@ func TestDriverRemoteConnectionSettingsWiring(t *testing.T) { // Verify settings were wired to connectionSettings assert.Equal(t, 150, drc.client.connectionSettings.maxConnsPerHost) assert.Equal(t, 15, drc.client.connectionSettings.maxIdleConnsPerHost) - assert.Equal(t, 200*time.Second, drc.client.connectionSettings.idleConnTimeout) - assert.Equal(t, 40*time.Second, drc.client.connectionSettings.keepAliveInterval) + assert.Equal(t, 200*time.Second, drc.client.connectionSettings.idleTimeout) + assert.Equal(t, 40*time.Second, drc.client.connectionSettings.keepAliveTime) // Verify settings were applied to http.Transport transport := drc.client.conn.httpClient.Transport.(*http.Transport) @@ -1298,8 +1429,8 @@ func TestDriverRemoteConnectionSettingsWiring(t *testing.T) { // Verify defaults are used (0 in settings means use default) assert.Equal(t, 0, drc.client.connectionSettings.maxConnsPerHost) assert.Equal(t, 0, drc.client.connectionSettings.maxIdleConnsPerHost) - assert.Equal(t, time.Duration(0), drc.client.connectionSettings.idleConnTimeout) - assert.Equal(t, time.Duration(0), drc.client.connectionSettings.keepAliveInterval) + assert.Equal(t, time.Duration(0), drc.client.connectionSettings.idleTimeout) + assert.Equal(t, time.Duration(0), drc.client.connectionSettings.keepAliveTime) // Verify defaults were applied to http.Transport transport := drc.client.conn.httpClient.Transport.(*http.Transport) @@ -1322,7 +1453,7 @@ func TestInterceptorIntegration(t *testing.T) { client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { settings.TraversalSource = testServerModernGraphAlias - settings.RequestInterceptors = []RequestInterceptor{ + settings.Interceptors = []RequestInterceptor{ func(req *HttpRequest) error { if msg, ok := req.Body.(*RequestMessage); ok { req.Body = &RequestMessage{ @@ -1358,7 +1489,7 @@ func TestInterceptorIntegration(t *testing.T) { client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { settings.TraversalSource = testServerModernGraphAlias - settings.RequestInterceptors = []RequestInterceptor{ + settings.Interceptors = []RequestInterceptor{ func(req *HttpRequest) error { mu.Lock() callCount++ @@ -1406,7 +1537,7 @@ func TestInterceptorIntegration(t *testing.T) { client, err := NewClient(testNoAuthUrl, func(settings *ClientSettings) { settings.TraversalSource = testServerModernGraphAlias - settings.RequestInterceptors = []RequestInterceptor{ + settings.Interceptors = []RequestInterceptor{ func(req *HttpRequest) error { mu.Lock() callCount++ @@ -1462,7 +1593,7 @@ func TestConnectionWithMockServer_BasicAuth(t *testing.T) { defer server.Close() conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{}) - conn.AddInterceptor(BasicAuth("testuser", "testpass")) + conn.AddInterceptor(testBasicAuthInterceptor("testuser", "testpass")) rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) require.NoError(t, err) diff --git a/gremlin-go/driver/driverRemoteConnection.go b/gremlin-go/driver/driverRemoteConnection.go index 3450b7e1792..39d7f31553a 100644 --- a/gremlin-go/driver/driverRemoteConnection.go +++ b/gremlin-go/driver/driverRemoteConnection.go @@ -21,6 +21,8 @@ package gremlingo import ( "crypto/tls" + "net/http" + "net/url" "time" "golang.org/x/text/language" @@ -32,35 +34,64 @@ type DriverRemoteConnectionSettings struct { LogVerbosity LogVerbosity Logger Logger Language language.Tag - TlsConfig *tls.Config - ConnectionTimeout time.Duration - EnableCompression bool EnableUserAgentOnConnect bool - // MaximumConcurrentConnections is the maximum number of concurrent TCP connections + // Ssl is the TLS configuration used for secure (wss/https) connections. + Ssl *tls.Config + + // ConnectTimeout is the TCP/transport-establishment timeout (TCP connect plus + // TLS handshake where applicable), not an HTTP request timeout. + // Default: 5 seconds. Set to 0 to use the default. + ConnectTimeout time.Duration + + // ReadTimeout is an idle-read timeout: it is reset on each read of the response + // body rather than bounding the whole request. Streaming-safe. The deadline is + // re-armed across pooled-connection reuse. + // Default: 0 (disabled). + ReadTimeout time.Duration + + // Compression selects the content-encoding negotiated with the server. + // Default: CompressionDeflate (on). Set to CompressionNone to disable compression. + Compression Compression + + // MaxConnections is the maximum number of concurrent TCP connections // to the Gremlin server. This limits how many requests can be in-flight simultaneously. // Default: 128. Set to 0 to use the default. - MaximumConcurrentConnections int + MaxConnections int // MaxIdleConnections is the maximum number of idle (keep-alive) connections to retain // in the connection pool. Idle connections are reused for subsequent requests. // Default: 8. Set to 0 to use the default. MaxIdleConnections int - // IdleConnectionTimeout is how long an idle connection remains in the pool before + // IdleTimeout is how long an idle connection remains in the pool before // being closed. Set this to match your server's idle timeout if needed. // Default: 180 seconds (3 minutes). Set to 0 to use the default. - IdleConnectionTimeout time.Duration + IdleTimeout time.Duration - // KeepAliveInterval is the interval between TCP keep-alive probes on idle connections. + // KeepAliveTime is the TCP keep-alive idle-before-probe interval on connections. // This helps detect dead connections and keeps connections alive through firewalls. // Default: 30 seconds. Set to 0 to use the default. - KeepAliveInterval time.Duration + KeepAliveTime time.Duration + + // DefaultBatchSize is the connection-level default that fills a request's batchSize + // when it is not set per-request. + // Default: 64. Set to 0 to use the default. + DefaultBatchSize int + + // MaxResponseHeaderBytes limits the number of response header bytes the client will + // read. Maps to http.Transport.MaxResponseHeaderBytes. + // Default: 0 (use net/http's default). + MaxResponseHeaderBytes int64 + + // Proxy returns the proxy URL to use for a given request. When nil, the connection + // uses http.ProxyFromEnvironment (HTTP_PROXY/HTTPS_PROXY/NO_PROXY). + Proxy func(*http.Request) (*url.URL, error) - // RequestInterceptors are functions that modify HTTP requests before sending. - RequestInterceptors []RequestInterceptor + // Interceptors are functions that modify HTTP requests before sending. + Interceptors []RequestInterceptor - // Auth is a RequestInterceptor for authentication (e.g. BasicAuth, SigV4Auth). + // Auth is a RequestInterceptor for authentication (e.g. auth.Basic, auth.SigV4). // As a convenience, this is always appended to the end of the interceptor list // so it runs last, after any user interceptors have modified the request. Auth RequestInterceptor @@ -88,28 +119,33 @@ func NewDriverRemoteConnection( LogVerbosity: Info, Logger: &defaultLogger{}, Language: language.English, - TlsConfig: &tls.Config{}, - ConnectionTimeout: connectionTimeoutDefault, - EnableCompression: false, + Ssl: &tls.Config{}, + ConnectTimeout: defaultConnectTimeout, + Compression: CompressionDeflate, EnableUserAgentOnConnect: true, - MaximumConcurrentConnections: 0, // Use default (128) - MaxIdleConnections: 0, // Use default (8) - IdleConnectionTimeout: 0, // Use default (180s) - KeepAliveInterval: 0, // Use default (30s) + MaxConnections: 0, // Use default (128) + MaxIdleConnections: 0, // Use default (8) + IdleTimeout: 0, // Use default (180s) + KeepAliveTime: 0, // Use default (30s) + DefaultBatchSize: 0, // Use default (64) } for _, configuration := range configurations { configuration(settings) } connSettings := &connectionSettings{ - tlsConfig: settings.TlsConfig, - connectionTimeout: settings.ConnectionTimeout, - maxConnsPerHost: settings.MaximumConcurrentConnections, + ssl: settings.Ssl, + connectTimeout: settings.ConnectTimeout, + readTimeout: settings.ReadTimeout, + maxConnsPerHost: settings.MaxConnections, maxIdleConnsPerHost: settings.MaxIdleConnections, - idleConnTimeout: settings.IdleConnectionTimeout, - keepAliveInterval: settings.KeepAliveInterval, - enableCompression: settings.EnableCompression, + idleTimeout: settings.IdleTimeout, + keepAliveTime: settings.KeepAliveTime, + compression: settings.Compression, + maxResponseHeaderBytes: settings.MaxResponseHeaderBytes, + defaultBatchSize: settings.DefaultBatchSize, + proxy: settings.Proxy, enableUserAgentOnConnect: settings.EnableUserAgentOnConnect, pdtRegistry: settings.PDTRegistry, } @@ -119,7 +155,7 @@ func NewDriverRemoteConnection( conn := newConnection(logHandler, url, connSettings) // Add user-provided interceptors - for _, interceptor := range settings.RequestInterceptors { + for _, interceptor := range settings.Interceptors { conn.AddInterceptor(interceptor) } diff --git a/gremlin-go/driver/driverRemoteConnection_test.go b/gremlin-go/driver/driverRemoteConnection_test.go deleted file mode 100644 index ab230ecb4c8..00000000000 --- a/gremlin-go/driver/driverRemoteConnection_test.go +++ /dev/null @@ -1,37 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package gremlingo - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAuthentication(t *testing.T) { - - t.Run("Test BasicAuth interceptor", func(t *testing.T) { - interceptor := BasicAuth("user", "pass") - req, _ := NewHttpRequest("POST", "http://localhost:8182/gremlin") - err := interceptor(req) - assert.Nil(t, err) - assert.Contains(t, req.Headers.Get(HeaderAuthorization), "Basic ") - }) -} diff --git a/gremlin-go/driver/interceptor_test.go b/gremlin-go/driver/interceptor_test.go index 3931dff5795..2c370205c33 100644 --- a/gremlin-go/driver/interceptor_test.go +++ b/gremlin-go/driver/interceptor_test.go @@ -21,6 +21,7 @@ package gremlingo import ( "bytes" + "encoding/base64" "encoding/json" "fmt" "io" @@ -310,9 +311,10 @@ func TestFieldMutationBeforeSerialization(t *testing.T) { "interceptor field mutation should be reflected in the serialized output") } -// TestSigV4AuthWithSerializeBody verifies that SigV4Auth calls SerializeBody and signs -// the request properly. -func TestSigV4AuthWithSerializeBody(t *testing.T) { +// TestInterceptorSerializeBodyFlow verifies that an interceptor calling SerializeBody +// causes the request to be sent as serialized JSON, and that headers it sets reach the +// server. This mirrors how auth interceptors (e.g. auth.SigV4) operate. +func TestInterceptorSerializeBodyFlow(t *testing.T) { var capturedHeaders http.Header var capturedBody []byte @@ -328,25 +330,25 @@ func TestSigV4AuthWithSerializeBody(t *testing.T) { conn := newConnection(newTestLogHandler(), server.URL, &connectionSettings{}) - mockProvider := &mockCredentialsProvider{ - accessKey: "MOCK_ID", - secretKey: "MOCK_KEY", - } - - // Only SigV4Auth — no SerializeRequest() needed - conn.AddInterceptor(SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", mockProvider)) + // Interceptor serializes the body then sets an auth-like header, as auth.SigV4 does. + conn.AddInterceptor(func(req *HttpRequest) error { + if _, err := req.SerializeBody(); err != nil { + return err + } + req.Headers.Set("Authorization", "AWS4-HMAC-SHA256 Credential=MOCK_ID") + req.Headers.Set("X-Amz-Date", "20240101T000000Z") + return nil + }) rs, err := conn.submit(&RequestMessage{Gremlin: "g.V().count()", Fields: map[string]interface{}{}}) require.NoError(t, err) _, _ = rs.All() - // SigV4 should have added Authorization and X-Amz-Date headers assert.NotEmpty(t, capturedHeaders.Get("Authorization"), - "SigV4Auth should set Authorization header") + "interceptor should set Authorization header") assert.NotEmpty(t, capturedHeaders.Get("X-Amz-Date"), - "SigV4Auth should set X-Amz-Date header") - assert.Contains(t, capturedHeaders.Get("Authorization"), "AWS4-HMAC-SHA256", - "Authorization header should use AWS4-HMAC-SHA256 signing algorithm") + "interceptor should set X-Amz-Date header") + assert.Contains(t, capturedHeaders.Get("Authorization"), "AWS4-HMAC-SHA256") // Body should be valid JSON assert.NotEmpty(t, capturedBody, "body should be non-empty serialized bytes") @@ -356,14 +358,13 @@ func TestSigV4AuthWithSerializeBody(t *testing.T) { assert.Equal(t, "g.V().count()", parsed["gremlin"]) } -// TestSigV4Auth_AutoSerializesRequestMessage verifies that SigV4Auth automatically -// serializes *RequestMessage to JSON bytes before signing. -func TestSigV4Auth_AutoSerializesRequestMessage(t *testing.T) { - provider := &mockCredentialsProvider{ - accessKey: "MOCK_ID", - secretKey: "MOCK_KEY", +// TestInterceptorAutoSerializesRequestMessage verifies that an interceptor can call +// SerializeBody to turn a *RequestMessage into JSON bytes before the request is sent. +func TestInterceptorAutoSerializesRequestMessage(t *testing.T) { + interceptor := func(req *HttpRequest) error { + _, err := req.SerializeBody() + return err } - interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) req, err := NewHttpRequest("POST", "https://test_url:8182/gremlin") require.NoError(t, err) @@ -374,11 +375,11 @@ func TestSigV4Auth_AutoSerializesRequestMessage(t *testing.T) { req.Body = &RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}} err = interceptor(req) - require.NoError(t, err, "SigV4Auth should auto-serialize *RequestMessage") + require.NoError(t, err, "SerializeBody should auto-serialize *RequestMessage") // Body should now be []byte (serialized JSON) bodyBytes, ok := req.Body.([]byte) - assert.True(t, ok, "Body should be []byte after SigV4Auth auto-serialization") + assert.True(t, ok, "Body should be []byte after auto-serialization") assert.NotEmpty(t, bodyBytes, "serialized body should be non-empty") // Verify it's valid JSON @@ -386,21 +387,15 @@ func TestSigV4Auth_AutoSerializesRequestMessage(t *testing.T) { err = json.Unmarshal(bodyBytes, &parsed) require.NoError(t, err, "body should be valid JSON after auto-serialization") assert.Equal(t, "g.V()", parsed["gremlin"]) - - // SigV4 headers should be set - assert.NotEmpty(t, req.Headers.Get("Authorization"), "Authorization header should be set") - assert.NotEmpty(t, req.Headers.Get("X-Amz-Date"), "X-Amz-Date header should be set") - assert.Contains(t, req.Headers.Get("Authorization"), "AWS4-HMAC-SHA256") } -// TestSigV4Auth_RejectsNonByteBody verifies that SigV4Auth returns an error when Body -// is not []byte and not *RequestMessage (e.g., an io.Reader). -func TestSigV4Auth_RejectsNonByteBody(t *testing.T) { - provider := &mockCredentialsProvider{ - accessKey: "MOCK_ID", - secretKey: "MOCK_KEY", +// TestInterceptorSerializeBodyRejectsNonByteBody verifies that SerializeBody returns an +// error when Body is not []byte and not *RequestMessage (e.g., an io.Reader). +func TestInterceptorSerializeBodyRejectsNonByteBody(t *testing.T) { + interceptor := func(req *HttpRequest) error { + _, err := req.SerializeBody() + return err } - interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) req, err := NewHttpRequest("POST", "https://test_url:8182/gremlin") require.NoError(t, err) @@ -446,8 +441,12 @@ func TestMultipleInterceptors_MutateThenAuth(t *testing.T) { return nil }) - // BasicAuth adds the Authorization header (works on any body type) - conn.AddInterceptor(BasicAuth("admin", "secret")) + // A basic-auth-style interceptor adds the Authorization header (works on any body type) + conn.AddInterceptor(func(req *HttpRequest) error { + encoded := base64.StdEncoding.EncodeToString([]byte("admin:secret")) + req.Headers.Set(HeaderAuthorization, "Basic "+encoded) + return nil + }) rs, err := conn.submit(&RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}}) require.NoError(t, err) @@ -630,12 +629,12 @@ func TestAuthInterceptorIsAlwaysLast(t *testing.T) { })) defer server.Close() - t.Run("Auth runs after RequestInterceptors on Client", func(t *testing.T) { + t.Run("Auth runs after Interceptors on Client", func(t *testing.T) { order = nil client, err := NewClient(server.URL, func(settings *ClientSettings) { settings.Auth = func(req *HttpRequest) error { order = append(order, 3); return nil } - settings.RequestInterceptors = []RequestInterceptor{ + settings.Interceptors = []RequestInterceptor{ func(req *HttpRequest) error { order = append(order, 1); return nil }, func(req *HttpRequest) error { order = append(order, 2); return nil }, } @@ -651,12 +650,12 @@ func TestAuthInterceptorIsAlwaysLast(t *testing.T) { "Auth interceptor should always run last") }) - t.Run("Auth runs after RequestInterceptors on DriverRemoteConnection", func(t *testing.T) { + t.Run("Auth runs after Interceptors on DriverRemoteConnection", func(t *testing.T) { order = nil remote, err := NewDriverRemoteConnection(server.URL, func(settings *DriverRemoteConnectionSettings) { settings.Auth = func(req *HttpRequest) error { order = append(order, 3); return nil } - settings.RequestInterceptors = []RequestInterceptor{ + settings.Interceptors = []RequestInterceptor{ func(req *HttpRequest) error { order = append(order, 1); return nil }, func(req *HttpRequest) error { order = append(order, 2); return nil }, } diff --git a/gremlin-go/driver/performance/performanceSuite.go b/gremlin-go/driver/performance/performanceSuite.go index c8677015684..da6552f63ea 100644 --- a/gremlin-go/driver/performance/performanceSuite.go +++ b/gremlin-go/driver/performance/performanceSuite.go @@ -378,7 +378,7 @@ func createConnection(host string, port, poolSize, buffersSize int) (*GraphTrave drc, err = gremlingo.NewDriverRemoteConnection(endpoint, func(settings *DriverRemoteConnectionSettings) { settings.LogVerbosity = GremlinWarning settings.TraversalSource = gratefulGraphAlias - settings.MaximumConcurrentConnections = poolSize + settings.MaxConnections = poolSize }) if err != nil { diff --git a/gremlin-go/driver/strategies_test.go b/gremlin-go/driver/strategies_test.go index 27df32e3362..3ccb532fc10 100644 --- a/gremlin-go/driver/strategies_test.go +++ b/gremlin-go/driver/strategies_test.go @@ -30,7 +30,7 @@ import ( func getModernGraph(t *testing.T, url string, tls *tls.Config) *GraphTraversalSource { remote, err := NewDriverRemoteConnection(url, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = tls + settings.Ssl = tls settings.TraversalSource = testServerModernGraphAlias }) assert.Nil(t, err) diff --git a/gremlin-go/driver/transaction_test.go b/gremlin-go/driver/transaction_test.go index 9c9cc17674d..e68e3638ac7 100644 --- a/gremlin-go/driver/transaction_test.go +++ b/gremlin-go/driver/transaction_test.go @@ -31,7 +31,7 @@ func newTxRemoteConnection(t *testing.T) *DriverRemoteConnection { url := getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl) remote, err := NewDriverRemoteConnection(url, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = "gtx" }) assert.Nil(t, err) @@ -42,7 +42,7 @@ func newTxRemoteConnection(t *testing.T) *DriverRemoteConnection { func newTxClient(t *testing.T) *Client { url := getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl) client, err := NewClient(url, func(settings *ClientSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = "gtx" }) assert.Nil(t, err) @@ -356,7 +356,7 @@ func TestTransactionWithTraversalAPI(t *testing.T) { func TestTransactionRejectBeginOnNonTransactionalGraph(t *testing.T) { url := getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl) client, err := NewClient(url, func(settings *ClientSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = "gclassic" }) assert.Nil(t, err) @@ -371,7 +371,7 @@ func TestTransactionRejectBeginOnNonTransactionalGraph(t *testing.T) { func TestTransactionCleanUpOnBeginFailure(t *testing.T) { url := getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl) client, err := NewClient(url, func(settings *ClientSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = "gclassic" }) assert.Nil(t, err) diff --git a/gremlin-go/driver/traversal_test.go b/gremlin-go/driver/traversal_test.go index 40a20d2904b..7697203814a 100644 --- a/gremlin-go/driver/traversal_test.go +++ b/gremlin-go/driver/traversal_test.go @@ -478,7 +478,7 @@ func newWithOptionsConnection(t *testing.T) *GraphTraversalSource { remote, err := NewDriverRemoteConnection(testNoAuthWithAliasUrl, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = testNoAuthWithAliasTlsConfig + settings.Ssl = testNoAuthWithAliasTlsConfig settings.TraversalSource = "gmodern" }) assert.Nil(t, err) @@ -492,7 +492,7 @@ func newTestRemoteConnection(t *testing.T) *DriverRemoteConnection { remote, err := NewDriverRemoteConnection(testNoAuthWithAliasUrl, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = testNoAuthWithAliasTlsConfig + settings.Ssl = testNoAuthWithAliasTlsConfig settings.TraversalSource = "gtx" }) assert.Nil(t, err) @@ -518,7 +518,7 @@ func TestProviderDefinedTypeTraversalAPIIntegration(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) remote, err := NewDriverRemoteConnection(testNoAuthUrl, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = testServerModernGraphAlias }) require.NoError(t, err) @@ -552,7 +552,7 @@ func TestProviderDefinedTypeTraversalAPIIntegration(t *testing.T) { remote, err := NewDriverRemoteConnection(testNoAuthUrl, func(settings *DriverRemoteConnectionSettings) { - settings.TlsConfig = &tls.Config{} + settings.Ssl = &tls.Config{} settings.TraversalSource = testServerModernGraphAlias settings.PDTRegistry = registry }) diff --git a/gremlin-go/examples/connections.go b/gremlin-go/examples/connections.go index 790ed529640..4c42b71d419 100644 --- a/gremlin-go/examples/connections.go +++ b/gremlin-go/examples/connections.go @@ -68,8 +68,8 @@ func withConfigs() { driverRemoteConnection, err := gremlingo.NewDriverRemoteConnection(serverURL, func(settings *gremlingo.DriverRemoteConnectionSettings) { settings.TraversalSource = "g" - settings.MaximumConcurrentConnections = 4 - settings.EnableCompression = false + settings.MaxConnections = 4 + settings.Compression = gremlingo.CompressionNone }) if err != nil { From 8cc1543412b0ff1f1cba646fabe201d1b6c2c6c9 Mon Sep 17 00:00:00 2001 From: Guian Gumpac Date: Thu, 18 Jun 2026 12:19:27 -0700 Subject: [PATCH 2/4] Added BulkResults connection option Assisted-by: Kiro: Claude Opus 4.8 --- CHANGELOG.asciidoc | 1 + docs/src/reference/gremlin-variants.asciidoc | 5 +- docs/src/upgrade/release-4.x.x.asciidoc | 2 + gremlin-go/driver/bulkResults_test.go | 127 +++++++++++++++++++ gremlin-go/driver/client.go | 21 ++- gremlin-go/driver/driverRemoteConnection.go | 8 ++ 6 files changed, 161 insertions(+), 3 deletions(-) create mode 100644 gremlin-go/driver/bulkResults_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 75e5178aaa7..578788d5e79 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -63,6 +63,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima ** Added `MaxResponseHeaderBytes`, exposing `http.Transport.MaxResponseHeaderBytes`. ** Added `Proxy` and set `http.Transport.Proxy` to `http.ProxyFromEnvironment` by default, fixing a regression where a custom transport silently dropped environment proxy configuration. ** Added `ReadTimeout`, an idle-read timeout reset on each read (via `SetReadDeadline`) that is re-armed across pooled-connection reuse and does not set `http.Client.Timeout`. +** Added `BulkResults` (default false), a connection-level default for `bulkResults` applied to every request unless overridden per-request, aligning gremlin-go with the other GLVs which already expose a connection-level setting; the `DriverRemoteConnection` traversal path defaults to `true` regardless of this setting. * Fixed `ByteBuf` leak in `GraphBinaryMessageSerializerV4` when serialization throws an `IOException`. * Changed `Tree` to no longer extend `HashMap`; it is now a final class with a tree-shaped API (`childAt`, `hasChild`, `contains`, `findSubtree`, `getOrCreateChild`, `getNodesAtDepth`, `getLeafNodes`, `nodeCount`) and is no longer a `Map`. * Changed `count(local)` on a `Tree` to return the total node count (`Tree.nodeCount()`) instead of the root-entry count. diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 757dab0b6bf..227bedcaa2e 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -220,8 +220,8 @@ The following options are allowed on a per-request basis in this fashion: `batch `evaluationTimeout`. NOTE: When submitting traversals through `DriverRemoteConnection`, `bulkResults` defaults to `true` per-request -to optimize result transfer. This does not apply to direct `Client.Submit()` calls, where `bulkResults` must be -set explicitly if desired. +to optimize result transfer. For direct `Client.Submit()` calls, set the connection-level `BulkResults` option to +apply a default to every request, or set `bulkResults` per-request to override it. anchor:go-imports[] [[gremlin-go-imports]] @@ -268,6 +268,7 @@ can be passed to the `NewClient` or `NewDriverRemoteConnection` functions as con |ReadTimeout | Idle-read timeout reset on each read of the response body. Set to `0` to disable. |0 |Compression |The wire compression negotiated with the server (`gremlingo.CompressionNone` or `gremlingo.CompressionDeflate`). |gremlingo.CompressionDeflate |DefaultBatchSize |The connection-level default batch size used when a request does not specify one. |64 +|BulkResults |Connection-level default for `bulkResults`, applied to every request unless overridden per-request. The `DriverRemoteConnection` traversal path defaults to `true` regardless of this setting. |false |MaxResponseHeaderBytes |Limit on the number of response header bytes the client will read. Maps to `http.Transport.MaxResponseHeaderBytes`. |0 (net/http default) |Proxy |Function returning the proxy URL to use for a request. When `nil`, uses `http.ProxyFromEnvironment`. |nil |EnableUserAgentOnConnect |Enables sending a user agent to the server during connection requests. diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index 0791fc76791..7f2e1b02155 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -73,6 +73,8 @@ across pooled-connection reuse, so it never fires while a pooled connection is i - `Proxy`: an explicit `func(*http.Request) (*url.URL, error)` proxy override for the transport. - `DefaultBatchSize` (default 64): a connection-level default that fills a request's `batchSize` when it is left unset. +- `BulkResults` (default false): a connection-level default for `bulkResults` applied to every request unless +overridden per-request. The `DriverRemoteConnection` traversal path defaults to `true` regardless of this setting. See: link:https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg[dev list discussion on standardizing GLV connection options]. diff --git a/gremlin-go/driver/bulkResults_test.go b/gremlin-go/driver/bulkResults_test.go new file mode 100644 index 00000000000..a7b6c970183 --- /dev/null +++ b/gremlin-go/driver/bulkResults_test.go @@ -0,0 +1,127 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package gremlingo + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newBulkResultsCaptureServer returns an httptest server that captures the JSON +// request body of the most recent submission, plus a pointer to the captured map. +func newBulkResultsCaptureServer(t *testing.T) (*httptest.Server, *map[string]interface{}) { + t.Helper() + captured := new(map[string]interface{}) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if err == nil && len(body) > 0 { + var parsed map[string]interface{} + if json.Unmarshal(body, &parsed) == nil { + *captured = parsed + } + } + w.WriteHeader(http.StatusOK) + })) + return server, captured +} + +// TestConnectionLevelBulkResults verifies the connection-level BulkResults setting, +// which matches the other GLVs: a plain bool defaulting to false. When true it is +// applied to every request unless overridden per-request; the DriverRemoteConnection +// traversal path independently defaults to true regardless of the setting. +func TestConnectionLevelBulkResults(t *testing.T) { + t.Run("connection-level BulkResults=true sends bulkResults=true on script path", func(t *testing.T) { + server, captured := newBulkResultsCaptureServer(t) + defer server.Close() + + client, err := NewClient(server.URL, func(settings *ClientSettings) { + settings.BulkResults = true + }) + require.NoError(t, err) + defer client.Close() + + rs, err := client.SubmitWithOptions("g.V()", *new(RequestOptions)) + require.NoError(t, err) + _, _ = rs.All() + + assert.Equal(t, true, (*captured)["bulkResults"], + "connection-level BulkResults=true should be sent on the script path") + }) + + t.Run("per-request SetBulkResults(false) overrides connection-level true", func(t *testing.T) { + server, captured := newBulkResultsCaptureServer(t) + defer server.Close() + + client, err := NewClient(server.URL, func(settings *ClientSettings) { + settings.BulkResults = true + }) + require.NoError(t, err) + defer client.Close() + + opts := new(RequestOptionsBuilder).SetBulkResults(false).Create() + rs, err := client.SubmitWithOptions("g.V()", opts) + require.NoError(t, err) + _, _ = rs.All() + + assert.Equal(t, false, (*captured)["bulkResults"], + "per-request SetBulkResults(false) should override a connection-level true") + }) + + t.Run("default (false) connection-level leaves script-path bulkResults unset", func(t *testing.T) { + server, captured := newBulkResultsCaptureServer(t) + defer server.Close() + + client, err := NewClient(server.URL) + require.NoError(t, err) + defer client.Close() + + rs, err := client.SubmitWithOptions("g.V()", *new(RequestOptions)) + require.NoError(t, err) + _, _ = rs.All() + + _, present := (*captured)["bulkResults"] + assert.False(t, present, + "with the default (false) connection-level setting and no per-request value, bulkResults should not be sent on the script path") + }) + + t.Run("DRC traversal path defaults bulkResults to true regardless of connection-level false", func(t *testing.T) { + server, captured := newBulkResultsCaptureServer(t) + defer server.Close() + + // Connection-level BulkResults defaults to false, but the DRC traversal + // path still defaults to true, matching the other GLVs. + client, err := NewClient(server.URL) + require.NoError(t, err) + defer client.Close() + + rs, err := client.submitGremlinLang(NewGremlinLang(nil)) + require.NoError(t, err) + _, _ = rs.All() + + assert.Equal(t, true, (*captured)["bulkResults"], + "the DRC traversal path should default bulkResults to true") + }) +} diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index 72b4a86002d..37aecd47400 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -80,6 +80,13 @@ type ClientSettings struct { // Default: 64. Set to 0 to use the default. DefaultBatchSize int + // BulkResults is the connection-level default for bulkResults. When true, requests + // submitted on this connection bulk results unless overridden per-request via + // RequestOptionsBuilder.SetBulkResults. The DriverRemoteConnection traversal path + // defaults to true regardless of this setting. + // Default: false. + BulkResults bool + // MaxResponseHeaderBytes limits the number of response header bytes the client will // read. Maps to http.Transport.MaxResponseHeaderBytes. // Default: 0 (use net/http's default). @@ -110,6 +117,7 @@ type Client struct { logHandler *logHandler connectionSettings *connectionSettings conn *connection + bulkResults bool // connection-level default for bulkResults (default false) transactions sync.Map // tracks open transactions for cascade rollback on close } @@ -172,6 +180,7 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C logHandler: logHandler, connectionSettings: connSettings, conn: conn, + bulkResults: settings.BulkResults, } return client, nil @@ -216,6 +225,14 @@ func (client *Client) untrackTransaction(tx *Transaction) { // SubmitWithOptions submits a Gremlin script to the server with specified RequestOptions and returns a ResultSet. func (client *Client) SubmitWithOptions(traversalString string, requestOptions RequestOptions) (ResultSet, error) { client.logHandler.logf(Debug, submitStartedString, traversalString) + // Apply the connection-level bulkResults default when the request did not set it + // per-request. The script path only forces bulking when the connection-level + // setting is true; a false setting leaves the request untouched (matching the + // other GLVs, whose connection-level bulkResults defaults to false). + if requestOptions.bulkResults == nil && client.bulkResults { + bulk := true + requestOptions.bulkResults = &bulk + } request := MakeStringRequest(traversalString, client.traversalSource, requestOptions) rs, err := client.conn.submit(&request) return rs, err @@ -253,7 +270,9 @@ func (client *Client) submitGremlinLangWithBuilder(gremlinLang *GremlinLang, bui } // default bulkResults to true when using DRC through request options - // consistent with Java RequestOptions.getRequestOptions and Python extract_request_options + // consistent with Java RequestOptions.getRequestOptions and Python extract_request_options. + // The traversal path always defaults to true when unset, regardless of the + // connection-level BulkResults setting (matching the other GLVs). if builder.bulkResults == nil { builder.SetBulkResults(true) } diff --git a/gremlin-go/driver/driverRemoteConnection.go b/gremlin-go/driver/driverRemoteConnection.go index 39d7f31553a..fa3eb503245 100644 --- a/gremlin-go/driver/driverRemoteConnection.go +++ b/gremlin-go/driver/driverRemoteConnection.go @@ -79,6 +79,13 @@ type DriverRemoteConnectionSettings struct { // Default: 64. Set to 0 to use the default. DefaultBatchSize int + // BulkResults is the connection-level default for bulkResults. When true, requests + // submitted on this connection bulk results unless overridden per-request via + // RequestOptionsBuilder.SetBulkResults. The DriverRemoteConnection traversal path + // defaults to true regardless of this setting. + // Default: false. + BulkResults bool + // MaxResponseHeaderBytes limits the number of response header bytes the client will // read. Maps to http.Transport.MaxResponseHeaderBytes. // Default: 0 (use net/http's default). @@ -170,6 +177,7 @@ func NewDriverRemoteConnection( logHandler: logHandler, connectionSettings: connSettings, conn: conn, + bulkResults: settings.BulkResults, } return &DriverRemoteConnection{client: client, isClosed: false, settings: settings}, nil From d4f7b9304b583352a9bcf76b5464b261a4fcf772 Mon Sep 17 00:00:00 2001 From: Guian Gumpac Date: Wed, 24 Jun 2026 14:28:07 -0700 Subject: [PATCH 3/4] GLV parity based on comments in other PRs defaultBatchSize to batchSize Docs update Timeout and timeout millis Removed deprecated options Assisted-by: Kiro: Claude Opus 4.8 --- CHANGELOG.asciidoc | 8 +- docs/src/reference/gremlin-variants.asciidoc | 42 +++++- docs/src/upgrade/release-4.x.x.asciidoc | 19 ++- gremlin-go/driver/auth_deprecated.go | 117 ---------------- gremlin-go/driver/auth_deprecated_test.go | 132 ------------------- gremlin-go/driver/client.go | 77 ++++++++--- gremlin-go/driver/connection.go | 18 ++- gremlin-go/driver/connection_test.go | 52 +++++++- gremlin-go/driver/driverRemoteConnection.go | 77 ++++++++--- 9 files changed, 228 insertions(+), 314 deletions(-) delete mode 100644 gremlin-go/driver/auth_deprecated.go delete mode 100644 gremlin-go/driver/auth_deprecated_test.go diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 578788d5e79..a1e797ffe37 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -56,13 +56,13 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima ** Reconciled the `validationRequest` default: the builder default is now `g.inject(0)` to match the `Settings` default (it was previously `''`). * Added configurable CORS `allowedOrigins` setting to Gremlin Server; warns when wildcard origin is used alongside authentication. * Standardized `gremlin-go` connection options per the TinkerPop 4.x GLV proposal: -** Moved `BasicAuth`/`SigV4Auth`/`SigV4AuthWithCredentials` out of package `gremlingo` into a new `auth` sub-package as `auth.Basic`/`auth.SigV4`/`auth.SigV4WithCredentials`; the flat `gremlingo` functions are retained as deprecated delegators (idiomatic Go `// Deprecated:` doc comments) that produce interceptors equivalent to the `auth.*` constructors, so existing code keeps compiling. New code should use the `auth` sub-package. -** Renamed `MaximumConcurrentConnections` to `MaxConnections` (default 128), `IdleConnectionTimeout` to `IdleTimeout` (default 180s), `KeepAliveInterval` to `KeepAliveTime` (default 30s), `ConnectionTimeout` to `ConnectTimeout` (default 5s), `TlsConfig` to `Ssl` (`*tls.Config`), and `RequestInterceptors` to `Interceptors`. *(breaking)* +** Moved `BasicAuth`/`SigV4Auth`/`SigV4AuthWithCredentials` out of package `gremlingo` into a new `auth` sub-package as `auth.Basic`/`auth.SigV4`/`auth.SigV4WithCredentials`. The flat `gremlingo` functions have been removed; use the `auth` sub-package. +** Renamed `MaximumConcurrentConnections` to `MaxConnections` (default 128), `IdleConnectionTimeout` to `IdleTimeoutMillis` (default 180000), `KeepAliveInterval` to `KeepAliveTimeMillis` (default 30000), `ConnectionTimeout` to `ConnectTimeoutMillis` (default 5000), `TlsConfig` to `Ssl` (`*tls.Config`), and `RequestInterceptors` to `Interceptors`. Each timeout has a `time.Duration` companion (`IdleTimeout`/`KeepAliveTime`/`ConnectTimeout`); set only one form per option. *(breaking)* ** Renamed `EnableCompression` to `Compression`, now a typed `Compression` const (`CompressionNone`/`CompressionDeflate`) defaulting to `CompressionDeflate` (compression on by default; set `CompressionNone` to disable); `Accept-Encoding: deflate` is sent by default and the manual per-chunk deflate decode path is retained. *(breaking)* -** Added `DefaultBatchSize` (default 64), a connection-level default that fills the per-request `batchSize` when unset. +** Added `BatchSize` (default 64), a connection-level default that fills the per-request `batchSize` when unset. ** Added `MaxResponseHeaderBytes`, exposing `http.Transport.MaxResponseHeaderBytes`. ** Added `Proxy` and set `http.Transport.Proxy` to `http.ProxyFromEnvironment` by default, fixing a regression where a custom transport silently dropped environment proxy configuration. -** Added `ReadTimeout`, an idle-read timeout reset on each read (via `SetReadDeadline`) that is re-armed across pooled-connection reuse and does not set `http.Client.Timeout`. +** Added `ReadTimeoutMillis` (with a `time.Duration` companion `ReadTimeout`; set only one), an idle-read timeout reset on each read (via `SetReadDeadline`) that is re-armed across pooled-connection reuse and does not set `http.Client.Timeout`. ** Added `BulkResults` (default false), a connection-level default for `bulkResults` applied to every request unless overridden per-request, aligning gremlin-go with the other GLVs which already expose a connection-level setting; the `DriverRemoteConnection` traversal path defaults to `true` regardless of this setting. * Fixed `ByteBuf` leak in `GraphBinaryMessageSerializerV4` when serialization throws an `IOException`. * Changed `Tree` to no longer extend `HashMap`; it is now a final class with a tree-shaped API (`childAt`, `hasChild`, `contains`, `findSubtree`, `getOrCreateChild`, `getNodesAtDepth`, `getLeafNodes`, `nodeCount`) and is no longer a `Map`. diff --git a/docs/src/reference/gremlin-variants.asciidoc b/docs/src/reference/gremlin-variants.asciidoc index 227bedcaa2e..eb3e324a98f 100644 --- a/docs/src/reference/gremlin-variants.asciidoc +++ b/docs/src/reference/gremlin-variants.asciidoc @@ -260,14 +260,14 @@ can be passed to the `NewClient` or `NewDriverRemoteConnection` functions as con |Logger |Instance of logger. |log |Language |Language used for logging messages. |language.English |Ssl |TLS configuration. |empty -|ConnectTimeout | Timeout for establishing the connection (TCP connect plus TLS handshake). |5 seconds +|ConnectTimeoutMillis | Timeout in milliseconds for establishing the connection (TCP connect plus TLS handshake). Also settable as `ConnectTimeout` (a `time.Duration`); set only one. |5000 |MaxConnections | Maximum number of concurrent TCP connections to the server. |128 |MaxIdleConnections | Maximum number of idle (keep-alive) connections in the pool. |8 -|IdleTimeout | How long idle connections remain in the pool before being closed. |180 seconds -|KeepAliveTime | Idle time before TCP keep-alive probes begin. |30 seconds -|ReadTimeout | Idle-read timeout reset on each read of the response body. Set to `0` to disable. |0 +|IdleTimeoutMillis | How long in milliseconds idle connections remain in the pool before being closed. Also settable as `IdleTimeout` (a `time.Duration`); set only one. |180000 +|KeepAliveTimeMillis | Idle time in milliseconds before TCP keep-alive probes begin. Also settable as `KeepAliveTime` (a `time.Duration`); set only one. |30000 +|ReadTimeoutMillis | Idle-read timeout in milliseconds reset on each read of the response body. Set to `0` to disable. Also settable as `ReadTimeout` (a `time.Duration`); set only one. |0 |Compression |The wire compression negotiated with the server (`gremlingo.CompressionNone` or `gremlingo.CompressionDeflate`). |gremlingo.CompressionDeflate -|DefaultBatchSize |The connection-level default batch size used when a request does not specify one. |64 +|BatchSize |The connection-level default batch size used when a request does not specify one. |64 |BulkResults |Connection-level default for `bulkResults`, applied to every request unless overridden per-request. The `DriverRemoteConnection` traversal path defaults to `true` regardless of this setting. |false |MaxResponseHeaderBytes |Limit on the number of response header bytes the client will read. Maps to `http.Transport.MaxResponseHeaderBytes`. |0 (net/http default) |Proxy |Function returning the proxy URL to use for a request. When `nil`, uses `http.ProxyFromEnvironment`. |nil @@ -279,6 +279,38 @@ link:https://tinkerpop.apache.org/docs/x.y.z/dev/provider/#_graph_driver_provide |Auth |A single RequestInterceptor for authentication (e.g. `auth.Basic`). Always appended to the end of the interceptor list so it runs last. |nil |========================================================= +Note that no driver timeout bounds the *total* duration of a request once it is under way. `ReadTimeout` only bounds +the gap between response chunks, so a response that keeps producing chunks will not time out no matter how long it +runs overall, and there is no client-side "overall" request timeout. If you need an absolute deadline, impose it in +your application around the call. Because `Submit` does not accept a `context.Context`, run it in a goroutine and +bound it with a `select`: + +[source,go] +---- +// bound the entire request (submit plus full result iteration) to 30 seconds +type outcome struct { + results []*gremlingo.Result + err error +} +done := make(chan outcome, 1) +go func() { + rs, err := client.Submit("g.V().out().out()") + if err != nil { + done <- outcome{nil, err} + return + } + results, err := rs.All() + done <- outcome{results, err} +}() + +select { +case o := <-done: + // use o.results / o.err +case <-time.After(30 * time.Second): + // deadline exceeded; stop waiting on the request +} +---- + [[gremlin-go-interceptors]] === RequestInterceptor diff --git a/docs/src/upgrade/release-4.x.x.asciidoc b/docs/src/upgrade/release-4.x.x.asciidoc index 7f2e1b02155..15c63c84000 100644 --- a/docs/src/upgrade/release-4.x.x.asciidoc +++ b/docs/src/upgrade/release-4.x.x.asciidoc @@ -44,17 +44,15 @@ Renames (breaking). The following settings fields have been renamed. Because the aliased, so existing code must be updated to the new names: - `MaximumConcurrentConnections` is now `MaxConnections` (default 128). -- `IdleConnectionTimeout` is now `IdleTimeout` (default 180s). -- `KeepAliveInterval` is now `KeepAliveTime` (default 30s). -- `ConnectionTimeout` is now `ConnectTimeout` (default 5s). +- `IdleConnectionTimeout` is now `IdleTimeoutMillis` (default 180000), an `int` of milliseconds. The `time.Duration` companion `IdleTimeout` may be set instead (set only one). +- `KeepAliveInterval` is now `KeepAliveTimeMillis` (default 30000), an `int` of milliseconds. The `time.Duration` companion `KeepAliveTime` may be set instead (set only one). +- `ConnectionTimeout` is now `ConnectTimeoutMillis` (default 5000), an `int` of milliseconds. The `time.Duration` companion `ConnectTimeout` may be set instead (set only one). - `TlsConfig` (`*tls.Config`) is now `Ssl`. - `RequestInterceptors` is now `Interceptors`. - `EnableCompression` is now `Compression`. - The `BasicAuth`, `SigV4Auth`, and `SigV4AuthWithCredentials` functions have moved out of package `gremlingo` into a new `auth` sub-package (`github.com/apache/tinkerpop/gremlin-go/v4/driver/auth`) as `auth.Basic`, `auth.SigV4`, and -`auth.SigV4WithCredentials`. The flat `gremlingo` functions are retained as deprecated delegators (marked with -idiomatic Go `// Deprecated:` doc comments) so existing code keeps compiling; new code should use the `auth` -sub-package. +`auth.SigV4WithCredentials`. The flat `gremlingo` functions have been removed; use the `auth` sub-package. Behavior changes. These change runtime behavior on upgrade, even if you do not change your configuration: @@ -67,16 +65,17 @@ any environment proxy configuration. New options: -- `ReadTimeout` (default 0, disabled): a per-read idle timeout reset on each read of the response body and re-armed -across pooled-connection reuse, so it never fires while a pooled connection is idle between requests. +- `ReadTimeoutMillis` (default 0, disabled), an `int` of milliseconds: a per-read idle timeout reset on each read of +the response body and re-armed across pooled-connection reuse, so it never fires while a pooled connection is idle +between requests. The `time.Duration` companion `ReadTimeout` may be set instead (set only one). - `MaxResponseHeaderBytes`: exposes `http.Transport.MaxResponseHeaderBytes` (native bytes). - `Proxy`: an explicit `func(*http.Request) (*url.URL, error)` proxy override for the transport. -- `DefaultBatchSize` (default 64): a connection-level default that fills a request's `batchSize` when it is left +- `BatchSize` (default 64): a connection-level default that fills a request's `batchSize` when it is left unset. - `BulkResults` (default false): a connection-level default for `bulkResults` applied to every request unless overridden per-request. The `DriverRemoteConnection` traversal path defaults to `true` regardless of this setting. -See: link:https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg[dev list discussion on standardizing GLV connection options]. +See: link:https://lists.apache.org/thread/yqtr2wnb1kq2pqqq4002cz511q5o0bkg[[DISCUSS] Standardizing GLV connection options in TinkerPop 4]. ==== Standardizing Python Connection Options diff --git a/gremlin-go/driver/auth_deprecated.go b/gremlin-go/driver/auth_deprecated.go deleted file mode 100644 index 5f4f1a8143b..00000000000 --- a/gremlin-go/driver/auth_deprecated.go +++ /dev/null @@ -1,117 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package gremlingo - -import ( - "context" - "encoding/base64" - "fmt" - "sync" - "time" - - "github.com/aws/aws-sdk-go-v2/aws" - v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" - "github.com/aws/aws-sdk-go-v2/config" -) - -// This file retains the flat authentication functions that previously lived in -// package gremlingo. They are kept as deprecated wrappers so existing code keeps -// compiling, and they produce interceptors equivalent to the constructors in the -// auth sub-package. -// -// Note: these functions cannot literally call auth.Basic / auth.SigV4 because the -// auth sub-package imports this package (for RequestInterceptor, HttpRequest, etc.), -// so a back-import would create an import cycle. They are therefore re-implemented -// inline to mirror the auth sub-package behavior exactly. - -// Deprecated: As of 4.0.0, BasicAuth is replaced by auth.Basic in the auth sub-package. -func BasicAuth(username, password string) RequestInterceptor { - encoded := base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) - return func(req *HttpRequest) error { - req.Headers.Set(HeaderAuthorization, "Basic "+encoded) - return nil - } -} - -// Deprecated: As of 4.0.0, SigV4Auth is replaced by auth.SigV4 in the auth sub-package. -func SigV4Auth(region, service string) RequestInterceptor { - return SigV4AuthWithCredentials(region, service, nil) -} - -// Deprecated: As of 4.0.0, SigV4AuthWithCredentials is replaced by auth.SigV4WithCredentials in the auth sub-package. -func SigV4AuthWithCredentials(region, service string, credentialsProvider aws.CredentialsProvider) RequestInterceptor { - // Create signer once - it's stateless and safe to reuse - signer := v4.NewSigner() - - // Cache for resolved credentials provider (lazy initialization) - var cachedProvider aws.CredentialsProvider - var providerOnce sync.Once - var providerErr error - - return func(req *HttpRequest) error { - // Ensure body is serialized to JSON bytes before signing. - // SerializeBody is idempotent: safe to call even if already serialized. - if _, err := req.SerializeBody(); err != nil { - return fmt.Errorf("SigV4 signing requires a serialized body: %w", err) - } - - ctx := context.Background() - - // Resolve credentials provider once if not provided - provider := credentialsProvider - if provider == nil { - providerOnce.Do(func() { - cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) - if err != nil { - providerErr = err - return - } - cachedProvider = cfg.Credentials - }) - if providerErr != nil { - return providerErr - } - provider = cachedProvider - } - - // Retrieve credentials (the provider handles caching internally) - creds, err := provider.Retrieve(ctx) - if err != nil { - return err - } - - stdReq, err := req.ToStdRequest() - if err != nil { - return err - } - stdReq.Body = nil // Body is handled separately via payload hash - - if err := signer.SignHTTP(ctx, creds, stdReq, req.PayloadHash(), service, region, time.Now()); err != nil { - return err - } - - // Copy signed headers back to HttpRequest - for k, v := range stdReq.Header { - req.Headers[k] = v - } - - return nil - } -} diff --git a/gremlin-go/driver/auth_deprecated_test.go b/gremlin-go/driver/auth_deprecated_test.go deleted file mode 100644 index ac0ce7bf72a..00000000000 --- a/gremlin-go/driver/auth_deprecated_test.go +++ /dev/null @@ -1,132 +0,0 @@ -/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -package gremlingo - -import ( - "context" - "encoding/base64" - "strings" - "testing" - - "github.com/aws/aws-sdk-go-v2/aws" - "github.com/stretchr/testify/assert" -) - -const deprecatedAuthMimeType = "application/vnd.graphbinary-v4.0" - -func newDeprecatedAuthRequest(t *testing.T) *HttpRequest { - req, err := NewHttpRequest("POST", "https://test_url:8182/gremlin") - assert.NoError(t, err) - req.Headers.Set("Content-Type", deprecatedAuthMimeType) - req.Headers.Set("Accept", deprecatedAuthMimeType) - req.Body = []byte(`{"gremlin":"g.V()"}`) - return req -} - -// mockAuthCredentialsProvider implements aws.CredentialsProvider for testing. -type mockAuthCredentialsProvider struct { - accessKey string - secretKey string - sessionToken string -} - -func (m *mockAuthCredentialsProvider) Retrieve(ctx context.Context) (aws.Credentials, error) { - return aws.Credentials{ - AccessKeyID: m.accessKey, - SecretAccessKey: m.secretKey, - SessionToken: m.sessionToken, - }, nil -} - -// TestDeprecatedBasicAuth verifies the deprecated BasicAuth delegator produces a -// working interceptor that sets the Authorization header, equivalent to auth.Basic. -func TestDeprecatedBasicAuth(t *testing.T) { - t.Run("adds authorization header", func(t *testing.T) { - req := newDeprecatedAuthRequest(t) - assert.Empty(t, req.Headers.Get(HeaderAuthorization)) - - interceptor := BasicAuth("username", "password") - err := interceptor(req) - - assert.NoError(t, err) - authHeader := req.Headers.Get(HeaderAuthorization) - assert.True(t, strings.HasPrefix(authHeader, "Basic ")) - - encoded := strings.TrimPrefix(authHeader, "Basic ") - decoded, err := base64.StdEncoding.DecodeString(encoded) - assert.NoError(t, err) - assert.Equal(t, "username:password", string(decoded)) - }) - - t.Run("matches expected encoding", func(t *testing.T) { - req := newDeprecatedAuthRequest(t) - err := BasicAuth("user", "pass")(req) - assert.NoError(t, err) - - expected := "Basic " + base64.StdEncoding.EncodeToString([]byte("user:pass")) - assert.Equal(t, expected, req.Headers.Get(HeaderAuthorization)) - }) -} - -// TestDeprecatedSigV4Auth verifies the deprecated SigV4 delegators produce working -// interceptors that sign requests, equivalent to auth.SigV4WithCredentials. -func TestDeprecatedSigV4Auth(t *testing.T) { - t.Run("adds signed headers", func(t *testing.T) { - req := newDeprecatedAuthRequest(t) - assert.Empty(t, req.Headers.Get("Authorization")) - assert.Empty(t, req.Headers.Get("X-Amz-Date")) - - provider := &mockAuthCredentialsProvider{ - accessKey: "MOCK_ID", - secretKey: "MOCK_KEY", - } - interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) - err := interceptor(req) - - assert.NoError(t, err) - assert.NotEmpty(t, req.Headers.Get("X-Amz-Date")) - authHeader := req.Headers.Get("Authorization") - assert.True(t, strings.HasPrefix(authHeader, "AWS4-HMAC-SHA256 Credential=MOCK_ID")) - assert.Contains(t, authHeader, "gremlin-east-1/tinkerpop-sigv4/aws4_request") - assert.Contains(t, authHeader, "Signature=") - }) - - t.Run("adds session token when provided", func(t *testing.T) { - req := newDeprecatedAuthRequest(t) - assert.Empty(t, req.Headers.Get("X-Amz-Security-Token")) - - provider := &mockAuthCredentialsProvider{ - accessKey: "MOCK_ID", - secretKey: "MOCK_KEY", - sessionToken: "MOCK_TOKEN", - } - interceptor := SigV4AuthWithCredentials("gremlin-east-1", "tinkerpop-sigv4", provider) - err := interceptor(req) - - assert.NoError(t, err) - assert.Equal(t, "MOCK_TOKEN", req.Headers.Get("X-Amz-Security-Token")) - }) - - t.Run("SigV4Auth delegates to credential chain variant", func(t *testing.T) { - // SigV4Auth with no explicit provider should still return a usable interceptor. - interceptor := SigV4Auth("gremlin-east-1", "tinkerpop-sigv4") - assert.NotNil(t, interceptor) - }) -} diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index 37aecd47400..9bc7a1ae293 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -40,15 +40,24 @@ type ClientSettings struct { // Ssl is the TLS configuration used for secure (wss/https) connections. Ssl *tls.Config - // ConnectTimeout is the TCP/transport-establishment timeout (TCP connect plus - // TLS handshake where applicable), not an HTTP request timeout. - // Default: 5 seconds. Set to 0 to use the default. + // ConnectTimeoutMillis is the TCP/transport-establishment timeout in milliseconds + // (TCP connect plus TLS handshake where applicable), not an HTTP request timeout. + // This is the canonical form; ConnectTimeout is the time.Duration companion. Set + // only one of the two. + // Default: 5000 (5 seconds). Set to 0 to use the default. + ConnectTimeoutMillis int + + // ConnectTimeout is the time.Duration companion to ConnectTimeoutMillis. ConnectTimeout time.Duration - // ReadTimeout is an idle-read timeout: it is reset on each read of the response - // body rather than bounding the whole request. Streaming-safe. The deadline is - // re-armed across pooled-connection reuse. + // ReadTimeoutMillis is an idle-read timeout in milliseconds: it is reset on each + // read of the response body rather than bounding the whole request. Streaming-safe. + // The deadline is re-armed across pooled-connection reuse. This is the canonical + // form; ReadTimeout is the time.Duration companion. Set only one of the two. // Default: 0 (disabled). + ReadTimeoutMillis int + + // ReadTimeout is the time.Duration companion to ReadTimeoutMillis. ReadTimeout time.Duration // Compression selects the content-encoding negotiated with the server. @@ -65,20 +74,30 @@ type ClientSettings struct { // Default: 8. Set to 0 to use the default. MaxIdleConnections int - // IdleTimeout is how long an idle connection remains in the pool before - // being closed. Set this to match your server's idle timeout if needed. - // Default: 180 seconds (3 minutes). Set to 0 to use the default. + // IdleTimeoutMillis is how long in milliseconds an idle connection remains in the + // pool before being closed. Set this to match your server's idle timeout if needed. + // This is the canonical form; IdleTimeout is the time.Duration companion. Set only + // one of the two. + // Default: 180000 (180 seconds). Set to 0 to use the default. + IdleTimeoutMillis int + + // IdleTimeout is the time.Duration companion to IdleTimeoutMillis. IdleTimeout time.Duration - // KeepAliveTime is the TCP keep-alive idle-before-probe interval on connections. - // This helps detect dead connections and keeps connections alive through firewalls. - // Default: 30 seconds. Set to 0 to use the default. + // KeepAliveTimeMillis is the TCP keep-alive idle-before-probe interval in + // milliseconds on connections. This helps detect dead connections and keeps + // connections alive through firewalls. This is the canonical form; KeepAliveTime is + // the time.Duration companion. Set only one of the two. + // Default: 30000 (30 seconds). Set to 0 to use the default. + KeepAliveTimeMillis int + + // KeepAliveTime is the time.Duration companion to KeepAliveTimeMillis. KeepAliveTime time.Duration - // DefaultBatchSize is the connection-level default that fills a request's batchSize + // BatchSize is the connection-level default that fills a request's batchSize // when it is not set per-request. // Default: 64. Set to 0 to use the default. - DefaultBatchSize int + BatchSize int // BulkResults is the connection-level default for bulkResults. When true, requests // submitted on this connection bulk results unless overridden per-request via @@ -130,7 +149,6 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C Logger: &defaultLogger{}, Language: language.English, Ssl: &tls.Config{}, - ConnectTimeout: defaultConnectTimeout, Compression: CompressionDeflate, EnableUserAgentOnConnect: true, @@ -138,23 +156,40 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C MaxIdleConnections: 0, // Use default (8) IdleTimeout: 0, // Use default (180s) KeepAliveTime: 0, // Use default (30s) - DefaultBatchSize: 0, // Use default (64) + BatchSize: 0, // Use default (64) } for _, configuration := range configurations { configuration(settings) } + connectTimeout, err := resolveTimeout(settings.ConnectTimeoutMillis, settings.ConnectTimeout, "ConnectTimeout") + if err != nil { + return nil, err + } + readTimeout, err := resolveTimeout(settings.ReadTimeoutMillis, settings.ReadTimeout, "ReadTimeout") + if err != nil { + return nil, err + } + idleTimeout, err := resolveTimeout(settings.IdleTimeoutMillis, settings.IdleTimeout, "IdleTimeout") + if err != nil { + return nil, err + } + keepAliveTime, err := resolveTimeout(settings.KeepAliveTimeMillis, settings.KeepAliveTime, "KeepAliveTime") + if err != nil { + return nil, err + } + connSettings := &connectionSettings{ ssl: settings.Ssl, - connectTimeout: settings.ConnectTimeout, - readTimeout: settings.ReadTimeout, + connectTimeout: connectTimeout, + readTimeout: readTimeout, maxConnsPerHost: settings.MaxConnections, maxIdleConnsPerHost: settings.MaxIdleConnections, - idleTimeout: settings.IdleTimeout, - keepAliveTime: settings.KeepAliveTime, + idleTimeout: idleTimeout, + keepAliveTime: keepAliveTime, compression: settings.Compression, maxResponseHeaderBytes: settings.MaxResponseHeaderBytes, - defaultBatchSize: settings.DefaultBatchSize, + batchSize: settings.BatchSize, proxy: settings.Proxy, enableUserAgentOnConnect: settings.EnableUserAgentOnConnect, pdtRegistry: settings.PDTRegistry, diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go index e92a4f080d5..b00852fb650 100644 --- a/gremlin-go/driver/connection.go +++ b/gremlin-go/driver/connection.go @@ -56,7 +56,7 @@ type connectionSettings struct { keepAliveTime time.Duration compression Compression maxResponseHeaderBytes int64 - defaultBatchSize int + batchSize int proxy func(*http.Request) (*url.URL, error) enableUserAgentOnConnect bool pdtRegistry *PDTRegistry @@ -83,6 +83,20 @@ const ( defaultBatchSizeValue = 64 // Java: resultIterationBatchSize default ) +// resolveTimeout reconciles a duration option with its millisecond companion. The +// *Millis form is the canonical/documented option; the time.Duration form is the +// idiomatic Go companion. Supplying both (each non-zero) is a configuration error. +// A zero result means "unset", letting the caller apply its default. +func resolveTimeout(millis int, duration time.Duration, name string) (time.Duration, error) { + if millis != 0 && duration != 0 { + return 0, fmt.Errorf("set only one of %sMillis or %s, not both", name, name) + } + if millis != 0 { + return time.Duration(millis) * time.Millisecond, nil + } + return duration, nil +} + func newConnection(handler *logHandler, url string, connSettings *connectionSettings) *connection { // Apply defaults for zero values connectTimeout := connSettings.connectTimeout @@ -173,7 +187,7 @@ func (c *connection) applyDefaultBatchSize(req *RequestMessage) { if req == nil || c.connSettings == nil { return } - batchSize := c.connSettings.defaultBatchSize + batchSize := c.connSettings.batchSize if batchSize == 0 { batchSize = defaultBatchSizeValue } diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index d5a03a4db73..f3ca4094cae 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -1304,7 +1304,7 @@ func TestConnectionNewOptions(t *testing.T) { t.Run("custom default batch size fills unset request batchSize", func(t *testing.T) { conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{ - defaultBatchSize: 256, + batchSize: 256, }) msg := &RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{}} conn.applyDefaultBatchSize(msg) @@ -1313,7 +1313,7 @@ func TestConnectionNewOptions(t *testing.T) { t.Run("default batch size does not override an explicit request batchSize", func(t *testing.T) { conn := newConnection(newTestLogHandler(), "http://localhost/gremlin", &connectionSettings{ - defaultBatchSize: 256, + batchSize: 256, }) msg := &RequestMessage{Gremlin: "g.V()", Fields: map[string]interface{}{"batchSize": 10}} conn.applyDefaultBatchSize(msg) @@ -1440,6 +1440,54 @@ func TestDriverRemoteConnectionSettingsWiring(t *testing.T) { }) } +func TestTimeoutMillisOptions(t *testing.T) { + t.Run("Millis companions map to the duration connection settings", func(t *testing.T) { + client, err := NewClient("http://localhost:8182/gremlin", + func(settings *ClientSettings) { + settings.ConnectTimeoutMillis = 1500 + settings.ReadTimeoutMillis = 2500 + settings.IdleTimeoutMillis = 90000 + settings.KeepAliveTimeMillis = 15000 + }) + require.NoError(t, err) + defer client.Close() + + assert.Equal(t, 1500*time.Millisecond, client.connectionSettings.connectTimeout) + assert.Equal(t, 2500*time.Millisecond, client.connectionSettings.readTimeout) + assert.Equal(t, 90*time.Second, client.connectionSettings.idleTimeout) + assert.Equal(t, 15*time.Second, client.connectionSettings.keepAliveTime) + }) + + t.Run("Duration companions are honored when Millis is unset", func(t *testing.T) { + client, err := NewClient("http://localhost:8182/gremlin", + func(settings *ClientSettings) { + settings.ReadTimeout = 7 * time.Second + }) + require.NoError(t, err) + defer client.Close() + + assert.Equal(t, 7*time.Second, client.connectionSettings.readTimeout) + }) + + t.Run("setting both Millis and Duration for the same option is an error", func(t *testing.T) { + _, err := NewClient("http://localhost:8182/gremlin", + func(settings *ClientSettings) { + settings.ReadTimeoutMillis = 2500 + settings.ReadTimeout = 7 * time.Second + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "ReadTimeout") + + _, err = NewDriverRemoteConnection("http://localhost:8182/gremlin", + func(settings *DriverRemoteConnectionSettings) { + settings.IdleTimeoutMillis = 90000 + settings.IdleTimeout = 90 * time.Second + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "IdleTimeout") + }) +} + func TestInterceptorIntegration(t *testing.T) { testNoAuthUrl := getEnvOrDefaultString("GREMLIN_SERVER_URL", noAuthUrl) testNoAuthEnable := getEnvOrDefaultBool("RUN_INTEGRATION_TESTS", true) diff --git a/gremlin-go/driver/driverRemoteConnection.go b/gremlin-go/driver/driverRemoteConnection.go index fa3eb503245..8c62063b906 100644 --- a/gremlin-go/driver/driverRemoteConnection.go +++ b/gremlin-go/driver/driverRemoteConnection.go @@ -39,15 +39,24 @@ type DriverRemoteConnectionSettings struct { // Ssl is the TLS configuration used for secure (wss/https) connections. Ssl *tls.Config - // ConnectTimeout is the TCP/transport-establishment timeout (TCP connect plus - // TLS handshake where applicable), not an HTTP request timeout. - // Default: 5 seconds. Set to 0 to use the default. + // ConnectTimeoutMillis is the TCP/transport-establishment timeout in milliseconds + // (TCP connect plus TLS handshake where applicable), not an HTTP request timeout. + // This is the canonical form; ConnectTimeout is the time.Duration companion. Set + // only one of the two. + // Default: 5000 (5 seconds). Set to 0 to use the default. + ConnectTimeoutMillis int + + // ConnectTimeout is the time.Duration companion to ConnectTimeoutMillis. ConnectTimeout time.Duration - // ReadTimeout is an idle-read timeout: it is reset on each read of the response - // body rather than bounding the whole request. Streaming-safe. The deadline is - // re-armed across pooled-connection reuse. + // ReadTimeoutMillis is an idle-read timeout in milliseconds: it is reset on each + // read of the response body rather than bounding the whole request. Streaming-safe. + // The deadline is re-armed across pooled-connection reuse. This is the canonical + // form; ReadTimeout is the time.Duration companion. Set only one of the two. // Default: 0 (disabled). + ReadTimeoutMillis int + + // ReadTimeout is the time.Duration companion to ReadTimeoutMillis. ReadTimeout time.Duration // Compression selects the content-encoding negotiated with the server. @@ -64,20 +73,30 @@ type DriverRemoteConnectionSettings struct { // Default: 8. Set to 0 to use the default. MaxIdleConnections int - // IdleTimeout is how long an idle connection remains in the pool before - // being closed. Set this to match your server's idle timeout if needed. - // Default: 180 seconds (3 minutes). Set to 0 to use the default. + // IdleTimeoutMillis is how long in milliseconds an idle connection remains in the + // pool before being closed. Set this to match your server's idle timeout if needed. + // This is the canonical form; IdleTimeout is the time.Duration companion. Set only + // one of the two. + // Default: 180000 (180 seconds). Set to 0 to use the default. + IdleTimeoutMillis int + + // IdleTimeout is the time.Duration companion to IdleTimeoutMillis. IdleTimeout time.Duration - // KeepAliveTime is the TCP keep-alive idle-before-probe interval on connections. - // This helps detect dead connections and keeps connections alive through firewalls. - // Default: 30 seconds. Set to 0 to use the default. + // KeepAliveTimeMillis is the TCP keep-alive idle-before-probe interval in + // milliseconds on connections. This helps detect dead connections and keeps + // connections alive through firewalls. This is the canonical form; KeepAliveTime is + // the time.Duration companion. Set only one of the two. + // Default: 30000 (30 seconds). Set to 0 to use the default. + KeepAliveTimeMillis int + + // KeepAliveTime is the time.Duration companion to KeepAliveTimeMillis. KeepAliveTime time.Duration - // DefaultBatchSize is the connection-level default that fills a request's batchSize + // BatchSize is the connection-level default that fills a request's batchSize // when it is not set per-request. // Default: 64. Set to 0 to use the default. - DefaultBatchSize int + BatchSize int // BulkResults is the connection-level default for bulkResults. When true, requests // submitted on this connection bulk results unless overridden per-request via @@ -127,7 +146,6 @@ func NewDriverRemoteConnection( Logger: &defaultLogger{}, Language: language.English, Ssl: &tls.Config{}, - ConnectTimeout: defaultConnectTimeout, Compression: CompressionDeflate, EnableUserAgentOnConnect: true, @@ -135,23 +153,40 @@ func NewDriverRemoteConnection( MaxIdleConnections: 0, // Use default (8) IdleTimeout: 0, // Use default (180s) KeepAliveTime: 0, // Use default (30s) - DefaultBatchSize: 0, // Use default (64) + BatchSize: 0, // Use default (64) } for _, configuration := range configurations { configuration(settings) } + connectTimeout, err := resolveTimeout(settings.ConnectTimeoutMillis, settings.ConnectTimeout, "ConnectTimeout") + if err != nil { + return nil, err + } + readTimeout, err := resolveTimeout(settings.ReadTimeoutMillis, settings.ReadTimeout, "ReadTimeout") + if err != nil { + return nil, err + } + idleTimeout, err := resolveTimeout(settings.IdleTimeoutMillis, settings.IdleTimeout, "IdleTimeout") + if err != nil { + return nil, err + } + keepAliveTime, err := resolveTimeout(settings.KeepAliveTimeMillis, settings.KeepAliveTime, "KeepAliveTime") + if err != nil { + return nil, err + } + connSettings := &connectionSettings{ ssl: settings.Ssl, - connectTimeout: settings.ConnectTimeout, - readTimeout: settings.ReadTimeout, + connectTimeout: connectTimeout, + readTimeout: readTimeout, maxConnsPerHost: settings.MaxConnections, maxIdleConnsPerHost: settings.MaxIdleConnections, - idleTimeout: settings.IdleTimeout, - keepAliveTime: settings.KeepAliveTime, + idleTimeout: idleTimeout, + keepAliveTime: keepAliveTime, compression: settings.Compression, maxResponseHeaderBytes: settings.MaxResponseHeaderBytes, - defaultBatchSize: settings.DefaultBatchSize, + batchSize: settings.BatchSize, proxy: settings.Proxy, enableUserAgentOnConnect: settings.EnableUserAgentOnConnect, pdtRegistry: settings.PDTRegistry, From 8279d5ad1bee81b46743106ad8d5ed2026660586 Mon Sep 17 00:00:00 2001 From: Guian Gumpac Date: Thu, 25 Jun 2026 11:19:13 -0700 Subject: [PATCH 4/4] Trigger CI