Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require (
github.com/confluentinc/go-editor v0.11.0
github.com/confluentinc/go-prompt v0.2.40
github.com/confluentinc/go-ps1 v1.0.2
github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.18
github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.20-0.20260316174929-09a1f69c2b9e
github.com/confluentinc/mds-sdk-go-public/mdsv1 v0.0.0-20240923163156-b922b35891f9
github.com/confluentinc/mds-sdk-go-public/mdsv2alpha1 v0.0.0-20240923163156-b922b35891f9
github.com/confluentinc/properties v0.0.0-20190814194548-42c10394a787
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ github.com/confluentinc/go-prompt v0.2.40 h1:tveghQJ+FVOVvF0dgQaZEm7YZSQ3r3tyuLM
github.com/confluentinc/go-prompt v0.2.40/go.mod h1:Bc4kUldoYFpXx7frWasxAG0wpp+xOcu/Ehq930/FJBw=
github.com/confluentinc/go-ps1 v1.0.2 h1:+4cKOzWs3AWmxL2s96oHu0QutZESDRXECnhFzm2ic4o=
github.com/confluentinc/go-ps1 v1.0.2/go.mod h1:qmgG9xQgFd4u7/CS6eA9nNP8eQqOtXuRHmZ4+w7Vprs=
github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.18 h1:M9/yzpG+eOTp/peiHNHRQWax0If6+GAJIC/NcvbzAOI=
github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.18/go.mod h1:qJAUraWU9BERQWyalrPsxt+ECELWPV+qsyOaP00VidY=
github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.20-0.20260316174929-09a1f69c2b9e h1:3JFHt6KP6eI1ZyUppvJ+BOA7fjPLGy58ocAs0/LfGSw=
github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.20-0.20260316174929-09a1f69c2b9e/go.mod h1:qJAUraWU9BERQWyalrPsxt+ECELWPV+qsyOaP00VidY=
github.com/confluentinc/mds-sdk-go-public/mdsv1 v0.0.0-20240923163156-b922b35891f9 h1:RQmmuDAQOmNFqPH+h3DkhLH44A9R/pp5gSGaf0kTOYk=
github.com/confluentinc/mds-sdk-go-public/mdsv1 v0.0.0-20240923163156-b922b35891f9/go.mod h1:2Ug8TKPkJXnazoTwlf72RawtttFZYNB3WB/8w3Hgfro=
github.com/confluentinc/mds-sdk-go-public/mdsv2alpha1 v0.0.0-20240923163156-b922b35891f9 h1:YfBq6AtVqOipWYfoIK6SN8fRPb6jTq7LiIc9BWDnXPo=
Expand Down
2 changes: 1 addition & 1 deletion internal/kafka/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd.AddCommand(newQuotaCommand(prerunner))
cmd.AddCommand(newRegionCommand(prerunner))
cmd.AddCommand(newReplicaCommand(prerunner))
cmd.AddCommand(newShareGroupCommand(prerunner))
cmd.AddCommand(newShareGroupCommand(cfg, prerunner))
cmd.AddCommand(newTopicCommand(cfg, prerunner))

return cmd
Expand Down
27 changes: 19 additions & 8 deletions internal/kafka/command_share_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/config"
)

type shareGroupCommand struct {
Expand All @@ -20,18 +21,28 @@ type shareGroupOut struct {
TopicSubscriptions []string `human:"Topic Subscriptions,omitempty" serialized:"topic_subscriptions,omitempty"`
}

func newShareGroupCommand(prerunner pcmd.PreRunner) *cobra.Command {
func newShareGroupCommand(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command {
cmd := &cobra.Command{
Use: "share-group",
Short: "Manage Kafka share groups.",
Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin},
Use: "share-group",
Short: "Manage Kafka share groups.",
}

c := &shareGroupCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)}
c := &shareGroupCommand{}

cmd.AddCommand(c.newConsumerCommand())
cmd.AddCommand(c.newDescribeCommand())
cmd.AddCommand(c.newListCommand())
if cfg.IsCloudLogin() {
c.AuthenticatedCLICommand = pcmd.NewAuthenticatedCLICommand(cmd, prerunner)

cmd.AddCommand(c.newConsumerCommand())
cmd.AddCommand(c.newDescribeCommand())
cmd.AddCommand(c.newListCommand())
} else {
c.AuthenticatedCLICommand = pcmd.NewAuthenticatedWithMDSCLICommand(cmd, prerunner)
c.PersistentPreRunE = prerunner.InitializeOnPremKafkaRest(c.AuthenticatedCLICommand)

cmd.AddCommand(c.newConsumerCommandOnPrem())
cmd.AddCommand(c.newDescribeCommandOnPrem())
cmd.AddCommand(c.newListCommandOnPrem())
}

return cmd
}
Expand Down
62 changes: 62 additions & 0 deletions internal/kafka/command_share_group_consumer_list_onprem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/examples"
"github.com/confluentinc/cli/v4/pkg/kafkarest"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *shareGroupCommand) newConsumerListCommandOnPrem() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Kafka share group consumers.",
Args: cobra.NoArgs,
RunE: c.consumerListOnPrem,
Example: examples.BuildExampleString(
examples.Example{
Text: `List all consumers in share group "my-share-group".`,
Code: "confluent kafka share-group consumer list --group my-share-group",
},
),
}

cmd.Flags().String("group", "", "Share group ID.")
cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet())
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddOutputFlag(cmd)

cobra.CheckErr(cmd.MarkFlagRequired("group"))

return cmd
}

func (c *shareGroupCommand) consumerListOnPrem(cmd *cobra.Command, _ []string) error {
restClient, restContext, clusterId, err := initKafkaRest(c.AuthenticatedCLICommand, cmd)
if err != nil {
return err
}

group, err := cmd.Flags().GetString("group")
if err != nil {
return err
}

consumers, resp, err := restClient.ShareGroupV3Api.ListKafkaShareGroupConsumers(restContext, clusterId, group)
if err != nil {
return kafkarest.NewError(restClient.GetConfig().BasePath, err, resp)
}

list := output.NewList(cmd)
for _, consumer := range consumers.Data {
list.Add(&shareGroupConsumerOut{
Cluster: consumer.ClusterId,
ShareGroup: group,
Consumer: consumer.ConsumerId,
Client: consumer.ClientId,
})
}
return list.Print()
}
16 changes: 16 additions & 0 deletions internal/kafka/command_share_group_consumer_onprem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kafka

import (
"github.com/spf13/cobra"
)

func (c *shareGroupCommand) newConsumerCommandOnPrem() *cobra.Command {
cmd := &cobra.Command{
Use: "consumer",
Short: "Manage Kafka share group consumers.",
}

cmd.AddCommand(c.newConsumerListCommandOnPrem())

return cmd
}
75 changes: 75 additions & 0 deletions internal/kafka/command_share_group_describe_onprem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package kafka

import (
"sort"

"github.com/spf13/cobra"

"github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/kafkarest"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *shareGroupCommand) newDescribeCommandOnPrem() *cobra.Command {
cmd := &cobra.Command{
Use: "describe <group>",
Short: "Describe a Kafka share group.",
Args: cobra.ExactArgs(1),
RunE: c.describeOnPrem,
}

cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet())
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *shareGroupCommand) describeOnPrem(cmd *cobra.Command, args []string) error {
restClient, restContext, clusterId, err := initKafkaRest(c.AuthenticatedCLICommand, cmd)
if err != nil {
return err
}

shareGroup, resp, err := restClient.ShareGroupV3Api.GetKafkaShareGroup(restContext, clusterId, args[0])
if err != nil {
return kafkarest.NewError(restClient.GetConfig().BasePath, err, resp)
}

table := output.NewTable(cmd)
table.Add(&shareGroupOut{
Cluster: shareGroup.ClusterId,
ShareGroup: shareGroup.ShareGroupId,
Coordinator: getStringBroker(shareGroup.Coordinator.Related),
State: shareGroup.State,
ConsumerCount: shareGroup.ConsumerCount,
PartitionCount: shareGroup.PartitionCount,
TopicSubscriptions: getShareGroupTopicNamesOnPrem(shareGroup),
})
return table.Print()
}

func getShareGroupTopicNamesOnPrem(shareGroup kafkarestv3.ShareGroupData) []string {
if len(shareGroup.AssignedTopicPartitions) == 0 {
Comment thread
cqin-confluent marked this conversation as resolved.
return []string{}
}

topicSet := make(map[string]bool)
for _, tp := range shareGroup.AssignedTopicPartitions {
topicSet[tp.TopicName] = true
Comment thread
cqin-confluent marked this conversation as resolved.
}

if len(topicSet) == 0 {
return []string{}
}

topics := make([]string, 0, len(topicSet))
for topic := range topicSet {
topics = append(topics, topic)
}

sort.Strings(topics)
return topics
Comment on lines +59 to +74
Copy link

Copilot AI Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getShareGroupTopicNamesOnPrem builds topics by iterating over a Go map (for topic := range topicSet), which yields a non-deterministic order. This can make describe output unstable (especially for -o json|yaml) and can cause flaky tests/users seeing different ordering. Collect the topic names and sort them before returning (e.g., sort.Strings(topics) or slices.Sort(topics)).

Copilot uses AI. Check for mistakes.
}
49 changes: 49 additions & 0 deletions internal/kafka/command_share_group_list_onprem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package kafka

import (
"github.com/spf13/cobra"

pcmd "github.com/confluentinc/cli/v4/pkg/cmd"
"github.com/confluentinc/cli/v4/pkg/kafkarest"
"github.com/confluentinc/cli/v4/pkg/output"
)

func (c *shareGroupCommand) newListCommandOnPrem() *cobra.Command {
cmd := &cobra.Command{
Use: "list",
Short: "List Kafka share groups.",
Args: cobra.NoArgs,
RunE: c.listOnPrem,
}

cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet())
pcmd.AddContextFlag(cmd, c.CLICommand)
pcmd.AddOutputFlag(cmd)

return cmd
}

func (c *shareGroupCommand) listOnPrem(cmd *cobra.Command, _ []string) error {
restClient, restContext, clusterId, err := initKafkaRest(c.AuthenticatedCLICommand, cmd)
if err != nil {
return err
}

groups, resp, err := restClient.ShareGroupV3Api.ListKafkaShareGroups(restContext, clusterId)
if err != nil {
return kafkarest.NewError(restClient.GetConfig().BasePath, err, resp)
}

list := output.NewList(cmd)
for _, group := range groups.Data {
list.Add(&shareGroupOut{
Cluster: group.ClusterId,
ShareGroup: group.ShareGroupId,
Coordinator: getStringBroker(group.Coordinator.Related),
State: group.State,
ConsumerCount: group.ConsumerCount,
})
}
list.Filter([]string{"Cluster", "ShareGroup", "Coordinator", "State", "ConsumerCount"})
return list.Print()
}
1 change: 1 addition & 0 deletions test/fixtures/output/kafka/help-onprem.golden
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Available Commands:
link Manage inter-cluster links.
partition Manage Kafka partitions.
replica Manage Kafka replicas.
share-group Manage Kafka share groups.
topic Manage Kafka topics.

Global Flags:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Manage Kafka share group consumers.

Usage:
confluent kafka share-group consumer [command]

Available Commands:
list List Kafka share group consumers.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Use "confluent kafka share-group consumer [command] --help" for more information about a command.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: REST request failed: This server does not host this share group.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
List Kafka share group consumers.

Usage:
confluent kafka share-group consumer list [flags]

Examples:
List all consumers in share group "my-share-group".

$ confluent kafka share-group consumer list --group my-share-group

Flags:
--group string REQUIRED: Share group ID.
--url string Base URL of REST Proxy Endpoint of Kafka Cluster (include "/kafka" for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent REST Proxy.
--client-cert-path string Path to client cert to be verified by Confluent REST Proxy. Include for mTLS authentication.
--client-key-path string Path to client private key, include for mTLS authentication.
--no-authentication Include if requests should be made without authentication headers and user will not be prompted for credentials.
--prompt Bypass use of available login credentials and prompt for Kafka Rest credentials.
--context string CLI context name.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Cluster | Share Group | Consumer | Client
------------+---------------+------------+-----------
cluster-1 | share-group-1 | consumer-1 | client-1
cluster-1 | share-group-1 | consumer-2 | client-2
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Error: REST request failed: This server does not host this share group.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Describe a Kafka share group.

Usage:
confluent kafka share-group describe <group> [flags]

Flags:
--url string Base URL of REST Proxy Endpoint of Kafka Cluster (include "/kafka" for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent REST Proxy.
--client-cert-path string Path to client cert to be verified by Confluent REST Proxy. Include for mTLS authentication.
--client-key-path string Path to client private key, include for mTLS authentication.
--no-authentication Include if requests should be made without authentication headers and user will not be prompted for credentials.
--prompt Bypass use of available login credentials and prompt for Kafka Rest credentials.
--context string CLI context name.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
16 changes: 16 additions & 0 deletions test/fixtures/output/kafka/share-group/help-onprem.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Manage Kafka share groups.

Usage:
confluent kafka share-group [command]

Available Commands:
consumer Manage Kafka share group consumers.
describe Describe a Kafka share group.
list List Kafka share groups.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).

Use "confluent kafka share-group [command] --help" for more information about a command.
19 changes: 19 additions & 0 deletions test/fixtures/output/kafka/share-group/list-help-onprem.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
List Kafka share groups.

Usage:
confluent kafka share-group list [flags]

Flags:
--url string Base URL of REST Proxy Endpoint of Kafka Cluster (include "/kafka" for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL.
--certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent REST Proxy.
--client-cert-path string Path to client cert to be verified by Confluent REST Proxy. Include for mTLS authentication.
--client-key-path string Path to client private key, include for mTLS authentication.
--no-authentication Include if requests should be made without authentication headers and user will not be prompted for credentials.
--prompt Bypass use of available login credentials and prompt for Kafka Rest credentials.
--context string CLI context name.
-o, --output string Specify the output format as "human", "json", or "yaml". (default "human")

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
-v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace).
4 changes: 4 additions & 0 deletions test/fixtures/output/kafka/share-group/list-onprem.golden
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Cluster | Share Group | Coordinator | State | Consumer Count
------------+---------------+-------------+--------+-----------------
cluster-1 | share-group-1 | broker-1 | STABLE | 2
cluster-1 | share-group-2 | broker-2 | EMPTY | 0
Loading