diff --git a/go.mod b/go.mod index 77a4f6c6a7..ac7cdda286 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/confluentinc/go-editor v0.11.0 github.com/confluentinc/go-prompt v0.2.40 github.com/confluentinc/go-ps1 v1.0.2 - github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.18 + github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.20-0.20260316174929-09a1f69c2b9e github.com/confluentinc/mds-sdk-go-public/mdsv1 v0.0.0-20240923163156-b922b35891f9 github.com/confluentinc/mds-sdk-go-public/mdsv2alpha1 v0.0.0-20240923163156-b922b35891f9 github.com/confluentinc/properties v0.0.0-20190814194548-42c10394a787 diff --git a/go.sum b/go.sum index e761d8c46f..34efc271f8 100644 --- a/go.sum +++ b/go.sum @@ -272,8 +272,8 @@ github.com/confluentinc/go-prompt v0.2.40 h1:tveghQJ+FVOVvF0dgQaZEm7YZSQ3r3tyuLM github.com/confluentinc/go-prompt v0.2.40/go.mod h1:Bc4kUldoYFpXx7frWasxAG0wpp+xOcu/Ehq930/FJBw= github.com/confluentinc/go-ps1 v1.0.2 h1:+4cKOzWs3AWmxL2s96oHu0QutZESDRXECnhFzm2ic4o= github.com/confluentinc/go-ps1 v1.0.2/go.mod h1:qmgG9xQgFd4u7/CS6eA9nNP8eQqOtXuRHmZ4+w7Vprs= -github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.18 h1:M9/yzpG+eOTp/peiHNHRQWax0If6+GAJIC/NcvbzAOI= -github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.18/go.mod h1:qJAUraWU9BERQWyalrPsxt+ECELWPV+qsyOaP00VidY= +github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.20-0.20260316174929-09a1f69c2b9e h1:3JFHt6KP6eI1ZyUppvJ+BOA7fjPLGy58ocAs0/LfGSw= +github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3 v0.3.20-0.20260316174929-09a1f69c2b9e/go.mod h1:qJAUraWU9BERQWyalrPsxt+ECELWPV+qsyOaP00VidY= github.com/confluentinc/mds-sdk-go-public/mdsv1 v0.0.0-20240923163156-b922b35891f9 h1:RQmmuDAQOmNFqPH+h3DkhLH44A9R/pp5gSGaf0kTOYk= github.com/confluentinc/mds-sdk-go-public/mdsv1 v0.0.0-20240923163156-b922b35891f9/go.mod h1:2Ug8TKPkJXnazoTwlf72RawtttFZYNB3WB/8w3Hgfro= github.com/confluentinc/mds-sdk-go-public/mdsv2alpha1 v0.0.0-20240923163156-b922b35891f9 h1:YfBq6AtVqOipWYfoIK6SN8fRPb6jTq7LiIc9BWDnXPo= diff --git a/internal/kafka/command.go b/internal/kafka/command.go index 5d3736d4b3..92dc3776ca 100644 --- a/internal/kafka/command.go +++ b/internal/kafka/command.go @@ -24,7 +24,7 @@ func New(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { cmd.AddCommand(newQuotaCommand(prerunner)) cmd.AddCommand(newRegionCommand(prerunner)) cmd.AddCommand(newReplicaCommand(prerunner)) - cmd.AddCommand(newShareGroupCommand(prerunner)) + cmd.AddCommand(newShareGroupCommand(cfg, prerunner)) cmd.AddCommand(newTopicCommand(cfg, prerunner)) return cmd diff --git a/internal/kafka/command_share_group.go b/internal/kafka/command_share_group.go index 7c6e20f66c..62b352a871 100644 --- a/internal/kafka/command_share_group.go +++ b/internal/kafka/command_share_group.go @@ -4,6 +4,7 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/config" ) type shareGroupCommand struct { @@ -20,18 +21,28 @@ type shareGroupOut struct { TopicSubscriptions []string `human:"Topic Subscriptions,omitempty" serialized:"topic_subscriptions,omitempty"` } -func newShareGroupCommand(prerunner pcmd.PreRunner) *cobra.Command { +func newShareGroupCommand(cfg *config.Config, prerunner pcmd.PreRunner) *cobra.Command { cmd := &cobra.Command{ - Use: "share-group", - Short: "Manage Kafka share groups.", - Annotations: map[string]string{pcmd.RunRequirement: pcmd.RequireCloudLogin}, + Use: "share-group", + Short: "Manage Kafka share groups.", } - c := &shareGroupCommand{pcmd.NewAuthenticatedCLICommand(cmd, prerunner)} + c := &shareGroupCommand{} - cmd.AddCommand(c.newConsumerCommand()) - cmd.AddCommand(c.newDescribeCommand()) - cmd.AddCommand(c.newListCommand()) + if cfg.IsCloudLogin() { + c.AuthenticatedCLICommand = pcmd.NewAuthenticatedCLICommand(cmd, prerunner) + + cmd.AddCommand(c.newConsumerCommand()) + cmd.AddCommand(c.newDescribeCommand()) + cmd.AddCommand(c.newListCommand()) + } else { + c.AuthenticatedCLICommand = pcmd.NewAuthenticatedWithMDSCLICommand(cmd, prerunner) + c.PersistentPreRunE = prerunner.InitializeOnPremKafkaRest(c.AuthenticatedCLICommand) + + cmd.AddCommand(c.newConsumerCommandOnPrem()) + cmd.AddCommand(c.newDescribeCommandOnPrem()) + cmd.AddCommand(c.newListCommandOnPrem()) + } return cmd } diff --git a/internal/kafka/command_share_group_consumer_list_onprem.go b/internal/kafka/command_share_group_consumer_list_onprem.go new file mode 100644 index 0000000000..efdb91afe1 --- /dev/null +++ b/internal/kafka/command_share_group_consumer_list_onprem.go @@ -0,0 +1,62 @@ +package kafka + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/examples" + "github.com/confluentinc/cli/v4/pkg/kafkarest" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *shareGroupCommand) newConsumerListCommandOnPrem() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List Kafka share group consumers.", + Args: cobra.NoArgs, + RunE: c.consumerListOnPrem, + Example: examples.BuildExampleString( + examples.Example{ + Text: `List all consumers in share group "my-share-group".`, + Code: "confluent kafka share-group consumer list --group my-share-group", + }, + ), + } + + cmd.Flags().String("group", "", "Share group ID.") + cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet()) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddOutputFlag(cmd) + + cobra.CheckErr(cmd.MarkFlagRequired("group")) + + return cmd +} + +func (c *shareGroupCommand) consumerListOnPrem(cmd *cobra.Command, _ []string) error { + restClient, restContext, clusterId, err := initKafkaRest(c.AuthenticatedCLICommand, cmd) + if err != nil { + return err + } + + group, err := cmd.Flags().GetString("group") + if err != nil { + return err + } + + consumers, resp, err := restClient.ShareGroupV3Api.ListKafkaShareGroupConsumers(restContext, clusterId, group) + if err != nil { + return kafkarest.NewError(restClient.GetConfig().BasePath, err, resp) + } + + list := output.NewList(cmd) + for _, consumer := range consumers.Data { + list.Add(&shareGroupConsumerOut{ + Cluster: consumer.ClusterId, + ShareGroup: group, + Consumer: consumer.ConsumerId, + Client: consumer.ClientId, + }) + } + return list.Print() +} diff --git a/internal/kafka/command_share_group_consumer_onprem.go b/internal/kafka/command_share_group_consumer_onprem.go new file mode 100644 index 0000000000..1affdc98e3 --- /dev/null +++ b/internal/kafka/command_share_group_consumer_onprem.go @@ -0,0 +1,16 @@ +package kafka + +import ( + "github.com/spf13/cobra" +) + +func (c *shareGroupCommand) newConsumerCommandOnPrem() *cobra.Command { + cmd := &cobra.Command{ + Use: "consumer", + Short: "Manage Kafka share group consumers.", + } + + cmd.AddCommand(c.newConsumerListCommandOnPrem()) + + return cmd +} diff --git a/internal/kafka/command_share_group_describe_onprem.go b/internal/kafka/command_share_group_describe_onprem.go new file mode 100644 index 0000000000..fe2c6a6995 --- /dev/null +++ b/internal/kafka/command_share_group_describe_onprem.go @@ -0,0 +1,75 @@ +package kafka + +import ( + "sort" + + "github.com/spf13/cobra" + + "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/kafkarest" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *shareGroupCommand) newDescribeCommandOnPrem() *cobra.Command { + cmd := &cobra.Command{ + Use: "describe ", + Short: "Describe a Kafka share group.", + Args: cobra.ExactArgs(1), + RunE: c.describeOnPrem, + } + + cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet()) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *shareGroupCommand) describeOnPrem(cmd *cobra.Command, args []string) error { + restClient, restContext, clusterId, err := initKafkaRest(c.AuthenticatedCLICommand, cmd) + if err != nil { + return err + } + + shareGroup, resp, err := restClient.ShareGroupV3Api.GetKafkaShareGroup(restContext, clusterId, args[0]) + if err != nil { + return kafkarest.NewError(restClient.GetConfig().BasePath, err, resp) + } + + table := output.NewTable(cmd) + table.Add(&shareGroupOut{ + Cluster: shareGroup.ClusterId, + ShareGroup: shareGroup.ShareGroupId, + Coordinator: getStringBroker(shareGroup.Coordinator.Related), + State: shareGroup.State, + ConsumerCount: shareGroup.ConsumerCount, + PartitionCount: shareGroup.PartitionCount, + TopicSubscriptions: getShareGroupTopicNamesOnPrem(shareGroup), + }) + return table.Print() +} + +func getShareGroupTopicNamesOnPrem(shareGroup kafkarestv3.ShareGroupData) []string { + if len(shareGroup.AssignedTopicPartitions) == 0 { + return []string{} + } + + topicSet := make(map[string]bool) + for _, tp := range shareGroup.AssignedTopicPartitions { + topicSet[tp.TopicName] = true + } + + if len(topicSet) == 0 { + return []string{} + } + + topics := make([]string, 0, len(topicSet)) + for topic := range topicSet { + topics = append(topics, topic) + } + + sort.Strings(topics) + return topics +} diff --git a/internal/kafka/command_share_group_list_onprem.go b/internal/kafka/command_share_group_list_onprem.go new file mode 100644 index 0000000000..8f9f911c75 --- /dev/null +++ b/internal/kafka/command_share_group_list_onprem.go @@ -0,0 +1,49 @@ +package kafka + +import ( + "github.com/spf13/cobra" + + pcmd "github.com/confluentinc/cli/v4/pkg/cmd" + "github.com/confluentinc/cli/v4/pkg/kafkarest" + "github.com/confluentinc/cli/v4/pkg/output" +) + +func (c *shareGroupCommand) newListCommandOnPrem() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List Kafka share groups.", + Args: cobra.NoArgs, + RunE: c.listOnPrem, + } + + cmd.Flags().AddFlagSet(pcmd.OnPremKafkaRestSet()) + pcmd.AddContextFlag(cmd, c.CLICommand) + pcmd.AddOutputFlag(cmd) + + return cmd +} + +func (c *shareGroupCommand) listOnPrem(cmd *cobra.Command, _ []string) error { + restClient, restContext, clusterId, err := initKafkaRest(c.AuthenticatedCLICommand, cmd) + if err != nil { + return err + } + + groups, resp, err := restClient.ShareGroupV3Api.ListKafkaShareGroups(restContext, clusterId) + if err != nil { + return kafkarest.NewError(restClient.GetConfig().BasePath, err, resp) + } + + list := output.NewList(cmd) + for _, group := range groups.Data { + list.Add(&shareGroupOut{ + Cluster: group.ClusterId, + ShareGroup: group.ShareGroupId, + Coordinator: getStringBroker(group.Coordinator.Related), + State: group.State, + ConsumerCount: group.ConsumerCount, + }) + } + list.Filter([]string{"Cluster", "ShareGroup", "Coordinator", "State", "ConsumerCount"}) + return list.Print() +} diff --git a/test/fixtures/output/kafka/help-onprem.golden b/test/fixtures/output/kafka/help-onprem.golden index 676a20318b..2f991b8011 100644 --- a/test/fixtures/output/kafka/help-onprem.golden +++ b/test/fixtures/output/kafka/help-onprem.golden @@ -11,6 +11,7 @@ Available Commands: link Manage inter-cluster links. partition Manage Kafka partitions. replica Manage Kafka replicas. + share-group Manage Kafka share groups. topic Manage Kafka topics. Global Flags: diff --git a/test/fixtures/output/kafka/share-group/consumer/help-onprem.golden b/test/fixtures/output/kafka/share-group/consumer/help-onprem.golden new file mode 100644 index 0000000000..bcee4c9223 --- /dev/null +++ b/test/fixtures/output/kafka/share-group/consumer/help-onprem.golden @@ -0,0 +1,14 @@ +Manage Kafka share group consumers. + +Usage: + confluent kafka share-group consumer [command] + +Available Commands: + list List Kafka share group consumers. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent kafka share-group consumer [command] --help" for more information about a command. diff --git a/test/fixtures/output/kafka/share-group/consumer/list-dne-onprem.golden b/test/fixtures/output/kafka/share-group/consumer/list-dne-onprem.golden new file mode 100644 index 0000000000..29de8214d2 --- /dev/null +++ b/test/fixtures/output/kafka/share-group/consumer/list-dne-onprem.golden @@ -0,0 +1 @@ +Error: REST request failed: This server does not host this share group. diff --git a/test/fixtures/output/kafka/share-group/consumer/list-help-onprem.golden b/test/fixtures/output/kafka/share-group/consumer/list-help-onprem.golden new file mode 100644 index 0000000000..b7da09c14b --- /dev/null +++ b/test/fixtures/output/kafka/share-group/consumer/list-help-onprem.golden @@ -0,0 +1,25 @@ +List Kafka share group consumers. + +Usage: + confluent kafka share-group consumer list [flags] + +Examples: +List all consumers in share group "my-share-group". + + $ confluent kafka share-group consumer list --group my-share-group + +Flags: + --group string REQUIRED: Share group ID. + --url string Base URL of REST Proxy Endpoint of Kafka Cluster (include "/kafka" for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent REST Proxy. + --client-cert-path string Path to client cert to be verified by Confluent REST Proxy. Include for mTLS authentication. + --client-key-path string Path to client private key, include for mTLS authentication. + --no-authentication Include if requests should be made without authentication headers and user will not be prompted for credentials. + --prompt Bypass use of available login credentials and prompt for Kafka Rest credentials. + --context string CLI context name. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/kafka/share-group/consumer/list-onprem.golden b/test/fixtures/output/kafka/share-group/consumer/list-onprem.golden new file mode 100644 index 0000000000..561652e2a0 --- /dev/null +++ b/test/fixtures/output/kafka/share-group/consumer/list-onprem.golden @@ -0,0 +1,4 @@ + Cluster | Share Group | Consumer | Client +------------+---------------+------------+----------- + cluster-1 | share-group-1 | consumer-1 | client-1 + cluster-1 | share-group-1 | consumer-2 | client-2 diff --git a/test/fixtures/output/kafka/share-group/describe-dne-onprem.golden b/test/fixtures/output/kafka/share-group/describe-dne-onprem.golden new file mode 100644 index 0000000000..29de8214d2 --- /dev/null +++ b/test/fixtures/output/kafka/share-group/describe-dne-onprem.golden @@ -0,0 +1 @@ +Error: REST request failed: This server does not host this share group. diff --git a/test/fixtures/output/kafka/share-group/describe-help-onprem.golden b/test/fixtures/output/kafka/share-group/describe-help-onprem.golden new file mode 100644 index 0000000000..be4ce4ab3b --- /dev/null +++ b/test/fixtures/output/kafka/share-group/describe-help-onprem.golden @@ -0,0 +1,19 @@ +Describe a Kafka share group. + +Usage: + confluent kafka share-group describe [flags] + +Flags: + --url string Base URL of REST Proxy Endpoint of Kafka Cluster (include "/kafka" for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent REST Proxy. + --client-cert-path string Path to client cert to be verified by Confluent REST Proxy. Include for mTLS authentication. + --client-key-path string Path to client private key, include for mTLS authentication. + --no-authentication Include if requests should be made without authentication headers and user will not be prompted for credentials. + --prompt Bypass use of available login credentials and prompt for Kafka Rest credentials. + --context string CLI context name. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/kafka/share-group/help-onprem.golden b/test/fixtures/output/kafka/share-group/help-onprem.golden new file mode 100644 index 0000000000..78b4e3a8de --- /dev/null +++ b/test/fixtures/output/kafka/share-group/help-onprem.golden @@ -0,0 +1,16 @@ +Manage Kafka share groups. + +Usage: + confluent kafka share-group [command] + +Available Commands: + consumer Manage Kafka share group consumers. + describe Describe a Kafka share group. + list List Kafka share groups. + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). + +Use "confluent kafka share-group [command] --help" for more information about a command. diff --git a/test/fixtures/output/kafka/share-group/list-help-onprem.golden b/test/fixtures/output/kafka/share-group/list-help-onprem.golden new file mode 100644 index 0000000000..13990b4686 --- /dev/null +++ b/test/fixtures/output/kafka/share-group/list-help-onprem.golden @@ -0,0 +1,19 @@ +List Kafka share groups. + +Usage: + confluent kafka share-group list [flags] + +Flags: + --url string Base URL of REST Proxy Endpoint of Kafka Cluster (include "/kafka" for embedded Rest Proxy). Must set flag or CONFLUENT_REST_URL. + --certificate-authority-path string Path to a PEM-encoded Certificate Authority to verify the Confluent REST Proxy. + --client-cert-path string Path to client cert to be verified by Confluent REST Proxy. Include for mTLS authentication. + --client-key-path string Path to client private key, include for mTLS authentication. + --no-authentication Include if requests should be made without authentication headers and user will not be prompted for credentials. + --prompt Bypass use of available login credentials and prompt for Kafka Rest credentials. + --context string CLI context name. + -o, --output string Specify the output format as "human", "json", or "yaml". (default "human") + +Global Flags: + -h, --help Show help for this command. + --unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets. + -v, --verbose count Increase verbosity (-v for warn, -vv for info, -vvv for debug, -vvvv for trace). diff --git a/test/fixtures/output/kafka/share-group/list-onprem.golden b/test/fixtures/output/kafka/share-group/list-onprem.golden new file mode 100644 index 0000000000..e957f5894b --- /dev/null +++ b/test/fixtures/output/kafka/share-group/list-onprem.golden @@ -0,0 +1,4 @@ + Cluster | Share Group | Coordinator | State | Consumer Count +------------+---------------+-------------+--------+----------------- + cluster-1 | share-group-1 | broker-1 | STABLE | 2 + cluster-1 | share-group-2 | broker-2 | EMPTY | 0 diff --git a/test/kafka_test.go b/test/kafka_test.go index 750388175d..c80b845bdc 100644 --- a/test/kafka_test.go +++ b/test/kafka_test.go @@ -790,6 +790,20 @@ func (s *CLITestSuite) TestKafkaShareGroup() { test.login = "cloud" s.runIntegrationTest(test) } + + kafkaRestURL := s.TestBackend.GetKafkaRestUrl() + tests = []CLITest{ + {args: "kafka share-group list", fixture: "kafka/share-group/list-onprem.golden"}, + {args: "kafka share-group describe share-group-1", contains: shareGroupTopic1, notContains: ""}, + {args: "kafka share-group describe share-group-1", contains: shareGroupTopic2, notContains: ""}, + {args: "kafka share-group describe share-group-1234", fixture: "kafka/share-group/describe-dne-onprem.golden", exitCode: 1}, + } + + for _, test := range tests { + test.login = "onprem" + test.env = []string{"CONFLUENT_REST_URL=" + kafkaRestURL} + s.runIntegrationTest(test) + } } func (s *CLITestSuite) TestKafkaShareGroupConsumer() { @@ -804,6 +818,18 @@ func (s *CLITestSuite) TestKafkaShareGroupConsumer() { test.login = "cloud" s.runIntegrationTest(test) } + + kafkaRestURL := s.TestBackend.GetKafkaRestUrl() + tests = []CLITest{ + {args: "kafka share-group consumer list --group share-group-1", fixture: "kafka/share-group/consumer/list-onprem.golden"}, + {args: "kafka share-group consumer list --group share-group-1234", fixture: "kafka/share-group/consumer/list-dne-onprem.golden", exitCode: 1}, + } + + for _, test := range tests { + test.login = "onprem" + test.env = []string{"CONFLUENT_REST_URL=" + kafkaRestURL} + s.runIntegrationTest(test) + } } func (s *CLITestSuite) TestKafka_Autocomplete() {