Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/submitqueue/orchestrator/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ go_library(
"//submitqueue/extension/changeprovider",
"//submitqueue/extension/changeprovider/fake",
"//submitqueue/extension/changeprovider/github",
"//submitqueue/extension/changeprovider/phabricator",
"//submitqueue/extension/changeprovider/routing",
"//submitqueue/extension/conflict",
"//submitqueue/extension/conflict/all",
"//submitqueue/extension/conflict/fake",
Expand Down
89 changes: 81 additions & 8 deletions example/submitqueue/orchestrator/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"net"
nethttp "net/http"
"os"
"os/signal"
"sync"
Expand Down Expand Up @@ -49,6 +50,8 @@ import (
"github.com/uber/submitqueue/submitqueue/extension/changeprovider"
cpfake "github.com/uber/submitqueue/submitqueue/extension/changeprovider/fake"
githubprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/github"
phabprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/phabricator"
routingprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/routing"
"github.com/uber/submitqueue/submitqueue/extension/conflict"
"github.com/uber/submitqueue/submitqueue/extension/conflict/all"
conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake"
Expand Down Expand Up @@ -757,30 +760,100 @@ func parseTimeout(envVal string, defaultVal time.Duration) time.Duration {
return defaultVal
}

// newChangeProvider creates a ChangeProvider for GitHub (github.com), configured
// via GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. When GITHUB_TOKEN is
// unset it returns the fake change provider (one empty ChangeInfo per URI unless
// a URI carries a failure marker, see changeprovider/fake).
// newChangeProvider creates a routing ChangeProvider containing GitHub and Phab ChangeProviders.
// When neither GITHUB_TOKEN nor PHAB_API_TOKEN is set, falls back to the fake change provider.
func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.ChangeProvider, error) {
if os.Getenv("GITHUB_TOKEN") == "" {
logger.Warn("GITHUB_TOKEN not set; using fake change provider (empty change info unless URI-marked)")
ghProvider, err := newGitHubChangeProvider(logger, scope)
if err != nil {
return nil, err
}

phabProvider, err := newPhabChangeProvider(logger, scope)
if err != nil {
return nil, err
}

if ghProvider == nil && phabProvider == nil {
logger.Warn("no change provider tokens set; using fake change provider (empty change info unless URI-marked)")
return cpfake.New(), nil
}

routingProvider, err := routingprovider.NewProvider(routingprovider.Params{
GitHub: ghProvider,
Phabricator: phabProvider,
})
if err != nil {
return nil, fmt.Errorf("failed to create routing change provider: %w", err)
}
return routingProvider, nil
}

// newGitHubChangeProvider creates a GitHub ChangeProvider configured via
// GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. Returns nil when
// GITHUB_TOKEN is unset.
func newGitHubChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.ChangeProvider, error) {
if os.Getenv("GITHUB_TOKEN") == "" {
return nil, nil
}

client, err := http.NewClient(getEnv("GITHUB_BASE_URL", "https://api.github.com"))
if err != nil {
return nil, fmt.Errorf("failed to build GitHub HTTP client: %w", err)
}

ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: os.Getenv("GITHUB_TOKEN")})
client.Transport = &oauth2.Transport{Source: ts, Base: client.Transport}

client.Timeout = parseTimeout(os.Getenv("GITHUB_TIMEOUT"), 30*time.Second)

return githubprovider.NewProvider(githubprovider.Params{
HTTPClient: client,
Logger: logger.Sugar(),
MetricsScope: scope.SubScope("changeprovider"),
MetricsScope: scope.SubScope("changeprovider.github"),
}), nil
}

// apiTokenTransport injects a Phabricator API token as a query parameter in each request.
type apiTokenTransport struct {
token string
next nethttp.RoundTripper
}

func (t *apiTokenTransport) RoundTrip(req *nethttp.Request) (*nethttp.Response, error) {
r := req.Clone(req.Context())
q := r.URL.Query()
q.Set("api.token", t.token)
r.URL.RawQuery = q.Encode()
return t.next.RoundTrip(r)
}

// newPhabChangeProvider creates a Phabricator ChangeProvider configured via PHAB_API_ENDPOINT and PHAB_API_TOKEN.
// Returns nil when PHAB_API_TOKEN or PHAB_API_ENDPOINT are unset.
func newPhabChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.ChangeProvider, error) {
token := os.Getenv("PHAB_API_TOKEN")
if token == "" {
return nil, nil
}

endpoint := os.Getenv("PHAB_API_ENDPOINT")
if endpoint == "" {
return nil, nil
}

client, err := http.NewClient(endpoint)
if err != nil {
return nil, fmt.Errorf("failed to build Phabricator HTTP client: %w", err)
}

baseTransport := client.Transport.(*http.BaseURLTransport)
baseTransport.Next = &apiTokenTransport{
token: token,
next: baseTransport.Next,
}

return phabprovider.NewProvider(phabprovider.Params{
HTTPClient: client,
Logger: logger.Sugar(),
MetricsScope: scope.SubScope("changeprovider.phabricator"),
}), nil
}

Expand Down
30 changes: 30 additions & 0 deletions submitqueue/extension/changeprovider/routing/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "routing",
srcs = ["provider.go"],
importpath = "github.com/uber/submitqueue/submitqueue/extension/changeprovider/routing",
visibility = ["//visibility:public"],
deps = [
"//platform/base/change/git",
"//platform/base/change/github",
"//platform/base/change/phabricator",
"//submitqueue/entity",
"//submitqueue/extension/changeprovider",
],
)

go_test(
name = "routing_test",
srcs = ["provider_test.go"],
embed = [":routing"],
deps = [
"//platform/base/change",
"//submitqueue/entity",
"//submitqueue/extension/changeprovider",
"//submitqueue/extension/changeprovider/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_uber_go_mock//gomock",
],
)
145 changes: 145 additions & 0 deletions submitqueue/extension/changeprovider/routing/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) 2026 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package routing provides a ChangeProvider that dispatches URIs to
// downstream change providers based on the URI's change type.
package routing

import (
"context"
"fmt"

"github.com/uber/submitqueue/platform/base/change/git"
"github.com/uber/submitqueue/platform/base/change/github"
"github.com/uber/submitqueue/platform/base/change/phabricator"
"github.com/uber/submitqueue/submitqueue/entity"
"github.com/uber/submitqueue/submitqueue/extension/changeprovider"
)

// Params holds the optional downstream providers, keyed by change type.
// At least one must be non-nil.
type Params struct {
// GitHub handles URIs that parse as GitHub change IDs.
GitHub changeprovider.ChangeProvider
// Phabricator handles URIs that parse as Phabricator change IDs.
Phabricator changeprovider.ChangeProvider
// Git handles URIs that parse as git change IDs.
Git changeprovider.ChangeProvider
}

// matchedURI pairs a URI with its original position in the input slice.
type matchedURI struct {
// index is the position of this URI in the original request.Change.URIs slice.
index int
// uri is the raw URI string.
uri string
}

type provider struct {
github changeprovider.ChangeProvider
phabricator changeprovider.ChangeProvider
git changeprovider.ChangeProvider
}

// NewProvider creates a ChangeProvider that routes URIs to the appropriate
// downstream provider based on the URI's change type.
// Returns an error if all providers are nil.
func NewProvider(params Params) (changeprovider.ChangeProvider, error) {
if params.GitHub == nil && params.Phabricator == nil && params.Git == nil {
return nil, fmt.Errorf("at least one change provider must be configured")
}
return &provider{
github: params.GitHub,
phabricator: params.Phabricator,
git: params.Git,
}, nil
}

// Get classifies each URI in the request by change type, groups them by provider,
// calls each provider once with its subset of changes, and reassembles results in the original order.
func (p *provider) Get(ctx context.Context, request entity.Request) ([]entity.ChangeInfo, error) {
changesByProvider, err := p.groupChangesByProvider(request.Change.URIs)
if err != nil {
return nil, err
}

results := make([]entity.ChangeInfo, len(request.Change.URIs))
for changeProvider, changes := range changesByProvider {
uris := make([]string, 0, len(changes))
for _, c := range changes {
uris = append(uris, c.uri)
}

// Subrequest for each provider containing only its subset of changes.
subRequest := request
subRequest.Change.URIs = uris

infos, getErr := changeProvider.Get(ctx, subRequest)
if getErr != nil {
return nil, getErr
}

if len(infos) != len(changes) {
return nil, fmt.Errorf("provider returned %d results for %d URIs", len(infos), len(changes))
}

// Put the changes back in their original positions in the results.
for i, changeInfo := range infos {
results[changes[i].index] = changeInfo
}
}

return results, nil
}

// groupChangesByProvider classifies each URI by trying ParseChangeID functions and groups them by the matched provider.
// Returns an error if a URI matches no known type or matches a type whose provider was not configured.
func (p *provider) groupChangesByProvider(uris []string) (map[changeprovider.ChangeProvider][]matchedURI, error) {
grouped := make(map[changeprovider.ChangeProvider][]matchedURI)
for i, uri := range uris {
matchedProvider, err := p.matchURIToChangeProvider(uri)
if err != nil {
return nil, err
}
grouped[matchedProvider] = append(grouped[matchedProvider], matchedURI{
index: i,
uri: uri,
})
}

return grouped, nil
}

// matchURIToChangeProvider returns the provider for the given URI by trying each ParseChangeID function.
// Returns an error if no parser matches, or if a parser matches, but the corresponding provider was not configured.
func (p *provider) matchURIToChangeProvider(uri string) (changeprovider.ChangeProvider, error) {
if _, err := github.ParseChangeID(uri); err == nil {
if p.github == nil {
return nil, fmt.Errorf("URI %q is a GitHub change but no GitHub provider is configured", uri)
}
return p.github, nil
} else if _, err := phabricator.ParseChangeID(uri); err == nil {
if p.phabricator == nil {
return nil, fmt.Errorf("URI %q is a Phabricator change but no Phabricator provider is configured", uri)
}
return p.phabricator, nil
} else if _, err := git.ParseChangeID(uri); err == nil {
if p.git == nil {
return nil, fmt.Errorf("URI %q is a git change but no git provider is configured", uri)
}
return p.git, nil
}

return nil, fmt.Errorf("URI %q does not match any known change type", uri)
}
Loading
Loading