diff --git a/.gitignore b/.gitignore index 3db3c8c747..bfa04f037f 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,5 @@ coverage.html perf/perf pulsar-perf bin + +vendor/ diff --git a/go.mod b/go.mod index 6f09ee18bc..7cf845ab61 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/spf13/cobra v1.6.1 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.32.0 - go.uber.org/atomic v1.7.0 + go.uber.org/atomic v1.11.0 golang.org/x/mod v0.20.0 golang.org/x/oauth2 v0.11.0 golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 diff --git a/go.sum b/go.sum index e8237bb9e8..e63393260e 100644 --- a/go.sum +++ b/go.sum @@ -407,6 +407,8 @@ go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lI go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 24ffa401f9..13fa9b4c41 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -257,7 +257,7 @@ func (p *availablePermits) flowIfNeed() { availablePermits := current requestedPermits := current // check if permits changed - if !p.permits.CAS(current, 0) { + if !p.permits.CompareAndSwap(current, 0) { return } @@ -2084,13 +2084,13 @@ func (pc *partitionConsumer) expectMoreIncomingMessages() { if !pc.options.autoReceiverQueueSize { return } - if pc.scaleReceiverQueueHint.CAS(true, false) { + if pc.scaleReceiverQueueHint.CompareAndSwap(true, false) { oldSize := pc.currentQueueSize.Load() maxSize := int32(pc.options.receiverQueueSize) newSize := int32(math.Min(float64(maxSize), float64(oldSize*2))) usagePercent := pc.client.memLimit.CurrentUsagePercent() if usagePercent < receiverQueueExpansionMemThreshold && newSize > oldSize { - pc.currentQueueSize.CAS(oldSize, newSize) + pc.currentQueueSize.CompareAndSwap(oldSize, newSize) pc.availablePermits.add(newSize - oldSize) pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize) } @@ -2116,7 +2116,7 @@ func (pc *partitionConsumer) shrinkReceiverQueueSize() { minSize := int32(math.Min(float64(initialReceiverQueueSize), float64(pc.options.receiverQueueSize))) newSize := int32(math.Max(float64(minSize), float64(oldSize/2))) if newSize < oldSize { - pc.currentQueueSize.CAS(oldSize, newSize) + pc.currentQueueSize.CompareAndSwap(oldSize, newSize) pc.availablePermits.add(newSize - oldSize) pc.log.Debugf("update currentQueueSize from %d -> %d", oldSize, newSize) } diff --git a/pulsar/message_chunking_test.go b/pulsar/message_chunking_test.go index 59fdb5ec61..c2e1113df3 100644 --- a/pulsar/message_chunking_test.go +++ b/pulsar/message_chunking_test.go @@ -57,8 +57,6 @@ func TestInvalidChunkingConfig(t *testing.T) { } func TestLargeMessage(t *testing.T) { - rand.Seed(time.Now().Unix()) - client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -208,7 +206,6 @@ func TestMaxPendingChunkMessages(t *testing.T) { } func TestExpireIncompleteChunks(t *testing.T) { - rand.Seed(time.Now().Unix()) client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -240,8 +237,6 @@ func TestExpireIncompleteChunks(t *testing.T) { } func TestChunksEnqueueFailed(t *testing.T) { - rand.Seed(time.Now().Unix()) - client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -278,8 +273,6 @@ func TestChunksEnqueueFailed(t *testing.T) { } func TestSeekChunkMessages(t *testing.T) { - rand.Seed(time.Now().Unix()) - client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -343,8 +336,6 @@ func TestSeekChunkMessages(t *testing.T) { } func TestChunkAckAndNAck(t *testing.T) { - rand.Seed(time.Now().Unix()) - client, err := NewClient(ClientOptions{ URL: lookupURL, }) @@ -400,8 +391,6 @@ func TestChunkAckAndNAck(t *testing.T) { } func TestChunkSize(t *testing.T) { - rand.Seed(time.Now().Unix()) - client, err := NewClient(ClientOptions{ URL: lookupURL, }) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 4b950dff11..448f780cfd 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1089,7 +1089,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (Mes doneCh := make(chan struct{}) p.internalSendAsync(ctx, msg, func(ID MessageID, _ *ProducerMessage, e error) { - if isDone.CAS(false, true) { + if isDone.CompareAndSwap(false, true) { err = e msgID = ID close(doneCh) @@ -1394,7 +1394,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) ) if sr.totalChunks > 1 { - if sr.chunkID == 0 { + switch sr.chunkID { + case 0: sr.chunkRecorder.setFirstChunkID( &messageID{ int64(response.MessageId.GetLedgerId()), @@ -1403,7 +1404,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) p.partitionIdx, 0, }) - } else if sr.chunkID == sr.totalChunks-1 { + case sr.totalChunks - 1: sr.chunkRecorder.setLastChunkID( &messageID{ int64(response.MessageId.GetLedgerId()), @@ -1546,7 +1547,7 @@ func (p *partitionProducer) setProducerState(state producerState) { // set a new producerState and return the last state // returns bool if the new state has been set or not func (p *partitionProducer) casProducerState(oldState, newState producerState) bool { - return p.state.CAS(int32(oldState), int32(newState)) + return p.state.CompareAndSwap(int32(oldState), int32(newState)) } func (p *partitionProducer) Close() { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 3af4a89c3d..b827929964 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -1977,8 +1977,8 @@ func TestWaitForExclusiveProducer(t *testing.T) { Topic: topicName, ProducerAccessMode: ProducerAccessModeWaitForExclusive, }) - defer producer2.Close() assert.NoError(t, err) + defer producer2.Close() assert.NotNil(t, producer2) id, err := producer2.Send(context.Background(), &ProducerMessage{ diff --git a/pulsar/transaction_coordinator_client.go b/pulsar/transaction_coordinator_client.go index 9382da6ec1..ec29b03226 100644 --- a/pulsar/transaction_coordinator_client.go +++ b/pulsar/transaction_coordinator_client.go @@ -319,7 +319,7 @@ func (t *transactionHandler) endTxn(op *endTxnOp) { } func (t *transactionHandler) close() { - if !t.state.CAS(txnHandlerReady, txnHandlerClosed) { + if !t.state.CompareAndSwap(txnHandlerReady, txnHandlerClosed) { return } close(t.closeCh) diff --git a/pulsaradmin/pkg/admin/admin.go b/pulsaradmin/pkg/admin/admin.go index 0b1b96254e..787c4a4593 100644 --- a/pulsaradmin/pkg/admin/admin.go +++ b/pulsaradmin/pkg/admin/admin.go @@ -57,6 +57,9 @@ type Client interface { ResourceQuotas() ResourceQuotas FunctionsWorker() FunctionsWorker Packages() Packages + Transactions() Transactions + Proxy() Proxy + LoadBalancer() LoadBalancer } type pulsarClient struct { diff --git a/pulsaradmin/pkg/admin/broker_stats.go b/pulsaradmin/pkg/admin/broker_stats.go index c9f9cb01e3..c46a344cb8 100644 --- a/pulsaradmin/pkg/admin/broker_stats.go +++ b/pulsaradmin/pkg/admin/broker_stats.go @@ -37,6 +37,15 @@ type BrokerStats interface { // GetAllocatorStats returns stats from broker GetAllocatorStats(allocatorName string) (*utils.AllocatorStats, error) + + // GetFunctionsMetrics returns Functions metrics from broker + GetFunctionsMetrics() ([]utils.Metrics, error) + + // GetBrokerResourceAvailability returns broker resource availability + GetBrokerResourceAvailability(namespace string) (map[string]utils.ResourceUsage, error) + + // GetPendingBookieOpsStats returns pending bookie operations stats + GetPendingBookieOpsStats() (map[string]interface{}, error) } type brokerStats struct { @@ -103,3 +112,24 @@ func (bs *brokerStats) GetAllocatorStats(allocatorName string) (*utils.Allocator } return &allocatorStats, nil } + +func (bs *brokerStats) GetFunctionsMetrics() ([]utils.Metrics, error) { + endpoint := bs.pulsar.endpoint(bs.basePath, "/functions-metrics") + var metrics []utils.Metrics + err := bs.pulsar.Client.Get(endpoint, &metrics) + return metrics, err +} + +func (bs *brokerStats) GetBrokerResourceAvailability(namespace string) (map[string]utils.ResourceUsage, error) { + var resources map[string]utils.ResourceUsage + endpoint := bs.pulsar.endpoint(bs.basePath, "/broker-resource-availability", namespace) + err := bs.pulsar.Client.Get(endpoint, &resources) + return resources, err +} + +func (bs *brokerStats) GetPendingBookieOpsStats() (map[string]interface{}, error) { + var stats map[string]interface{} + endpoint := bs.pulsar.endpoint(bs.basePath, "/pending-bookie-ops-stats") + err := bs.pulsar.Client.Get(endpoint, &stats) + return stats, err +} diff --git a/pulsaradmin/pkg/admin/load_balancer.go b/pulsaradmin/pkg/admin/load_balancer.go new file mode 100644 index 0000000000..cf54d23284 --- /dev/null +++ b/pulsaradmin/pkg/admin/load_balancer.go @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" +) + +// LoadBalancer is admin interface for load balancer management +type LoadBalancer interface { + // GetLoadBalancerBrokerRanking returns the broker ranking + GetLoadBalancerBrokerRanking() (map[string]interface{}, error) + + // GetBundleUnloadingMetrics returns bundle unloading metrics + GetBundleUnloadingMetrics() (*utils.BundleUnloadingMetrics, error) + + // GetLeaderBroker returns the leader broker for load balancing + GetLeaderBroker() (*utils.BrokerInfo, error) + + // UpdateLoadManagerLeader updates the load manager leader + UpdateLoadManagerLeader() error +} + +type loadBalancer struct { + pulsar *pulsarClient + basePath string +} + +// LoadBalancer is used to access the load balancer endpoints +func (c *pulsarClient) LoadBalancer() LoadBalancer { + return &loadBalancer{ + pulsar: c, + basePath: "/load-manager", + } +} + +func (lb *loadBalancer) GetLoadBalancerBrokerRanking() (map[string]interface{}, error) { + var ranking map[string]interface{} + endpoint := lb.pulsar.endpoint(lb.basePath, "brokerRanking") + err := lb.pulsar.Client.Get(endpoint, &ranking) + return ranking, err +} + +func (lb *loadBalancer) GetBundleUnloadingMetrics() (*utils.BundleUnloadingMetrics, error) { + var metrics utils.BundleUnloadingMetrics + endpoint := lb.pulsar.endpoint(lb.basePath, "bundle-unloading") + err := lb.pulsar.Client.Get(endpoint, &metrics) + return &metrics, err +} + +func (lb *loadBalancer) GetLeaderBroker() (*utils.BrokerInfo, error) { + var broker utils.BrokerInfo + endpoint := lb.pulsar.endpoint(lb.basePath, "leader") + err := lb.pulsar.Client.Get(endpoint, &broker) + return &broker, err +} + +func (lb *loadBalancer) UpdateLoadManagerLeader() error { + endpoint := lb.pulsar.endpoint(lb.basePath, "leader") + return lb.pulsar.Client.Post(endpoint, nil) +} \ No newline at end of file diff --git a/pulsaradmin/pkg/admin/namespace.go b/pulsaradmin/pkg/admin/namespace.go index e92f020cb2..bcb5fd4418 100644 --- a/pulsaradmin/pkg/admin/namespace.go +++ b/pulsaradmin/pkg/admin/namespace.go @@ -166,6 +166,9 @@ type Namespaces interface { // When deduplication is enabled, the broker will prevent to store the same Message multiple times SetDeduplicationStatus(namespace string, enableDeduplication bool) error + // GetDeduplicationStatus returns the deduplication status for all topics within a namespace + GetDeduplicationStatus(namespace string) (bool, error) + // SetPersistence sets the persistence configuration for all the topics on a namespace SetPersistence(namespace string, persistence utils.PersistencePolicies) error @@ -212,9 +215,15 @@ type Namespaces interface { // SetSubscriptionAuthMode sets the given subscription auth mode on all topics on a namespace SetSubscriptionAuthMode(namespace utils.NameSpaceName, mode utils.SubscriptionAuthMode) error + // GetSubscriptionAuthMode returns the subscription auth mode for a namespace + GetSubscriptionAuthMode(namespace utils.NameSpaceName) (utils.SubscriptionAuthMode, error) + // SetEncryptionRequiredStatus sets the encryption required status for all topics within a namespace SetEncryptionRequiredStatus(namespace utils.NameSpaceName, encrypt bool) error + // GetEncryptionRequiredStatus returns the encryption required status for all topics within a namespace + GetEncryptionRequiredStatus(namespace utils.NameSpaceName) (bool, error) + // UnsubscribeNamespace unsubscribe the given subscription on all topics on a namespace UnsubscribeNamespace(namespace utils.NameSpaceName, sName string) error @@ -295,6 +304,12 @@ type Namespaces interface { // RemoveSubscriptionExpirationTime removes subscription expiration time from a namespace, // defaulting to broker settings RemoveSubscriptionExpirationTime(namespace utils.NameSpaceName) error + + // GetBundles returns the bundles for a namespace + GetBundles(namespace utils.NameSpaceName) (*utils.BundlesData, error) + + // GetNamespaceStats returns stats for a namespace + GetNamespaceStats(namespace utils.NameSpaceName) (map[string]interface{}, error) } type namespaces struct { @@ -940,3 +955,48 @@ func (n *namespaces) RemoveSubscriptionExpirationTime(namespace utils.NameSpaceN endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionExpirationTime") return n.pulsar.Client.Delete(endpoint) } + +func (n *namespaces) GetDeduplicationStatus(namespace string) (bool, error) { + var result bool + nsName, err := utils.GetNamespaceName(namespace) + if err != nil { + return false, err + } + endpoint := n.pulsar.endpoint(n.basePath, nsName.String(), "deduplication") + err = n.pulsar.Client.Get(endpoint, &result) + return result, err +} + +func (n *namespaces) GetSubscriptionAuthMode(namespace utils.NameSpaceName) (utils.SubscriptionAuthMode, error) { + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "subscriptionAuthMode") + data, err := n.pulsar.Client.GetWithQueryParams(endpoint, nil, nil, false) + if err != nil { + return "", err + } + mode, err := utils.ParseSubscriptionAuthMode(strings.ReplaceAll(string(data), "\"", "")) + if err != nil { + return "", err + } + return mode, nil +} + +func (n *namespaces) GetEncryptionRequiredStatus(namespace utils.NameSpaceName) (bool, error) { + var result bool + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "encryptionRequired") + err := n.pulsar.Client.Get(endpoint, &result) + return result, err +} + +func (n *namespaces) GetBundles(namespace utils.NameSpaceName) (*utils.BundlesData, error) { + var result utils.BundlesData + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "bundles") + err := n.pulsar.Client.Get(endpoint, &result) + return &result, err +} + +func (n *namespaces) GetNamespaceStats(namespace utils.NameSpaceName) (map[string]interface{}, error) { + var result map[string]interface{} + endpoint := n.pulsar.endpoint(n.basePath, namespace.String(), "stats") + err := n.pulsar.Client.Get(endpoint, &result) + return result, err +} diff --git a/pulsaradmin/pkg/admin/namespace_test.go b/pulsaradmin/pkg/admin/namespace_test.go index 8fa687ff41..9604b3e117 100644 --- a/pulsaradmin/pkg/admin/namespace_test.go +++ b/pulsaradmin/pkg/admin/namespace_test.go @@ -341,3 +341,102 @@ func TestNamespaces_GetOffloadThresholdInSeconds(t *testing.T) { expected := int64(60) assert.Equal(t, expected, offloadThresholdInSeconds) } + + +func TestNamespaces_DeduplicationStatus(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace := "public/default" + + // set deduplication status and get it + err = admin.Namespaces().SetDeduplicationStatus(namespace, true) + assert.Equal(t, nil, err) + status, err := admin.Namespaces().GetDeduplicationStatus(namespace) + assert.Equal(t, nil, err) + assert.Equal(t, true, status) + + // set to false and verify + err = admin.Namespaces().SetDeduplicationStatus(namespace, false) + assert.Equal(t, nil, err) + status, err = admin.Namespaces().GetDeduplicationStatus(namespace) + assert.Equal(t, nil, err) + assert.Equal(t, false, status) +} + +func TestNamespaces_EncryptionRequiredStatus(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // set encryption required status and get it + err = admin.Namespaces().SetEncryptionRequiredStatus(*namespace, true) + assert.Equal(t, nil, err) + status, err := admin.Namespaces().GetEncryptionRequiredStatus(*namespace) + assert.Equal(t, nil, err) + assert.Equal(t, true, status) + + // set to false and verify + err = admin.Namespaces().SetEncryptionRequiredStatus(*namespace, false) + assert.Equal(t, nil, err) + status, err = admin.Namespaces().GetEncryptionRequiredStatus(*namespace) + assert.Equal(t, nil, err) + assert.Equal(t, false, status) +} + +func TestNamespaces_SubscriptionAuthMode(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // set subscription auth mode and get it + err = admin.Namespaces().SetSubscriptionAuthMode(*namespace, utils.Prefix) + assert.Equal(t, nil, err) + mode, err := admin.Namespaces().GetSubscriptionAuthMode(*namespace) + assert.Equal(t, nil, err) + assert.Equal(t, utils.Prefix, mode) + + // set to None and verify + err = admin.Namespaces().SetSubscriptionAuthMode(*namespace, utils.None) + assert.Equal(t, nil, err) + mode, err = admin.Namespaces().GetSubscriptionAuthMode(*namespace) + assert.Equal(t, nil, err) + assert.Equal(t, utils.None, mode) +} + +func TestNamespaces_GetBundles(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // get bundles for namespace + bundles, err := admin.Namespaces().GetBundles(*namespace) + assert.Equal(t, nil, err) + assert.NotNil(t, bundles) + assert.NotEmpty(t, bundles.Boundaries) +} + +func TestNamespaces_GetNamespaceStats(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + namespace, _ := utils.GetNamespaceName("public/default") + + // get namespace stats + stats, err := admin.Namespaces().GetNamespaceStats(*namespace) + assert.Equal(t, nil, err) + assert.NotNil(t, stats) +} diff --git a/pulsaradmin/pkg/admin/proxy.go b/pulsaradmin/pkg/admin/proxy.go new file mode 100644 index 0000000000..e6f541356e --- /dev/null +++ b/pulsaradmin/pkg/admin/proxy.go @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" +) + +// Proxy is admin interface for proxy management +type Proxy interface { + // GetProxies returns the list of active proxies in the cluster + GetProxies() ([]string, error) + + // GetProxyStats returns the stats of all active proxies in the cluster + GetProxyStats() ([]utils.ProxyStats, error) + + // GetConnectionsStats returns connection stats for all active proxies + GetConnectionsStats() ([]utils.ConnectionStats, error) + + // GetTopicsStats returns topic stats for all active proxies + GetTopicsStats() ([]utils.ProxyTopicStats, error) +} + +type proxy struct { + pulsar *pulsarClient + basePath string +} + +// Proxy is used to access the proxy endpoints +func (c *pulsarClient) Proxy() Proxy { + return &proxy{ + pulsar: c, + basePath: "/proxy-stats", + } +} + +func (p *proxy) GetProxies() ([]string, error) { + var proxies []string + endpoint := p.pulsar.endpoint(p.basePath, "proxies") + err := p.pulsar.Client.Get(endpoint, &proxies) + return proxies, err +} + +func (p *proxy) GetProxyStats() ([]utils.ProxyStats, error) { + var stats []utils.ProxyStats + endpoint := p.pulsar.endpoint(p.basePath, "stats") + err := p.pulsar.Client.Get(endpoint, &stats) + return stats, err +} + +func (p *proxy) GetConnectionsStats() ([]utils.ConnectionStats, error) { + var stats []utils.ConnectionStats + endpoint := p.pulsar.endpoint(p.basePath, "connections") + err := p.pulsar.Client.Get(endpoint, &stats) + return stats, err +} + +func (p *proxy) GetTopicsStats() ([]utils.ProxyTopicStats, error) { + var stats []utils.ProxyTopicStats + endpoint := p.pulsar.endpoint(p.basePath, "topics") + err := p.pulsar.Client.Get(endpoint, &stats) + return stats, err +} \ No newline at end of file diff --git a/pulsaradmin/pkg/admin/proxy_test.go b/pulsaradmin/pkg/admin/proxy_test.go new file mode 100644 index 0000000000..49afde238a --- /dev/null +++ b/pulsaradmin/pkg/admin/proxy_test.go @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestProxy_GetProxies(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + // get list of proxies + proxies, err := admin.Proxy().GetProxies() + assert.NoError(t, err) + assert.NotNil(t, proxies) +} + +func TestProxy_GetProxyStats(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + // get proxy stats + stats, err := admin.Proxy().GetProxyStats() + assert.NoError(t, err) + assert.NotNil(t, stats) +} + +func TestProxy_GetConnectionsStats(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + // get connection stats + stats, err := admin.Proxy().GetConnectionsStats() + assert.NoError(t, err) + assert.NotNil(t, stats) +} + +func TestProxy_GetTopicsStats(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + // get topic stats + stats, err := admin.Proxy().GetTopicsStats() + assert.NoError(t, err) + assert.NotNil(t, stats) +} \ No newline at end of file diff --git a/pulsaradmin/pkg/admin/transactions.go b/pulsaradmin/pkg/admin/transactions.go new file mode 100644 index 0000000000..567076a9d0 --- /dev/null +++ b/pulsaradmin/pkg/admin/transactions.go @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "strconv" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" +) + +// Transactions is admin interface for transaction coordinator management +type Transactions interface { + // GetCoordinatorStats returns the stats of transaction coordinators + GetCoordinatorStats() (map[string]utils.TransactionCoordinatorStats, error) + + // GetCoordinatorStatsWithCoordinatorID returns the stats of a specific transaction coordinator + GetCoordinatorStatsWithCoordinatorID(coordinatorID int) (*utils.TransactionCoordinatorStats, error) + + // GetCoordinatorInternalStats returns internal stats of transaction coordinators + GetCoordinatorInternalStats(coordinatorID int, metadata bool) (map[string]interface{}, error) + + // GetPendingAckStats returns pending ack stats for a subscription + GetPendingAckStats(topic utils.TopicName, subName string, metadata bool) (map[string]interface{}, error) + + // GetPendingAckInternalStats returns internal pending ack stats for a subscription + GetPendingAckInternalStats(topic utils.TopicName, subName string, metadata bool) (map[string]interface{}, error) + + // GetTransactionInBufferStats returns transaction in buffer stats for a topic + GetTransactionInBufferStats(topic utils.TopicName, mostSigBits, leastSigBits int64, metadata bool) (map[string]interface{}, error) + + // GetSlowTransactions returns slow transactions + GetSlowTransactions(coordinatorID int, timeout int64) (map[string]interface{}, error) +} + +type transactions struct { + pulsar *pulsarClient + basePath string +} + +// Transactions is used to access the transactions endpoints +func (c *pulsarClient) Transactions() Transactions { + return &transactions{ + pulsar: c, + basePath: "/transactions", + } +} + +func (t *transactions) GetCoordinatorStats() (map[string]utils.TransactionCoordinatorStats, error) { + var stats map[string]utils.TransactionCoordinatorStats + endpoint := t.pulsar.endpoint(t.basePath, "coordinatorStats") + err := t.pulsar.Client.Get(endpoint, &stats) + return stats, err +} + +func (t *transactions) GetCoordinatorStatsWithCoordinatorID(coordinatorID int) (*utils.TransactionCoordinatorStats, error) { + var stats utils.TransactionCoordinatorStats + endpoint := t.pulsar.endpoint(t.basePath, "coordinatorStats", strconv.Itoa(coordinatorID)) + err := t.pulsar.Client.Get(endpoint, &stats) + return &stats, err +} + +func (t *transactions) GetCoordinatorInternalStats(coordinatorID int, metadata bool) (map[string]interface{}, error) { + var stats map[string]interface{} + endpoint := t.pulsar.endpoint(t.basePath, "coordinatorInternalStats", strconv.Itoa(coordinatorID)) + params := map[string]string{ + "metadata": strconv.FormatBool(metadata), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, false) + return stats, err +} + +func (t *transactions) GetPendingAckStats(topic utils.TopicName, subName string, metadata bool) (map[string]interface{}, error) { + var stats map[string]interface{} + endpoint := t.pulsar.endpoint(t.basePath, "pendingAckStats", topic.GetRestPath(), "subscription", subName) + params := map[string]string{ + "metadata": strconv.FormatBool(metadata), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, false) + return stats, err +} + +func (t *transactions) GetPendingAckInternalStats(topic utils.TopicName, subName string, metadata bool) (map[string]interface{}, error) { + var stats map[string]interface{} + endpoint := t.pulsar.endpoint(t.basePath, "pendingAckInternalStats", topic.GetRestPath(), "subscription", subName) + params := map[string]string{ + "metadata": strconv.FormatBool(metadata), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, false) + return stats, err +} + +func (t *transactions) GetTransactionInBufferStats(topic utils.TopicName, mostSigBits, leastSigBits int64, metadata bool) (map[string]interface{}, error) { + var stats map[string]interface{} + endpoint := t.pulsar.endpoint(t.basePath, "transactionInBufferStats", topic.GetRestPath(), strconv.FormatInt(mostSigBits, 10), strconv.FormatInt(leastSigBits, 10)) + params := map[string]string{ + "metadata": strconv.FormatBool(metadata), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, false) + return stats, err +} + +func (t *transactions) GetSlowTransactions(coordinatorID int, timeout int64) (map[string]interface{}, error) { + var stats map[string]interface{} + endpoint := t.pulsar.endpoint(t.basePath, "slowTransactions", strconv.Itoa(coordinatorID)) + params := map[string]string{ + "timeout": strconv.FormatInt(timeout, 10), + } + _, err := t.pulsar.Client.GetWithQueryParams(endpoint, &stats, params, false) + return stats, err +} \ No newline at end of file diff --git a/pulsaradmin/pkg/admin/transactions_test.go b/pulsaradmin/pkg/admin/transactions_test.go new file mode 100644 index 0000000000..3fa7fdd3d1 --- /dev/null +++ b/pulsaradmin/pkg/admin/transactions_test.go @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package admin + +import ( + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/admin/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTransactions_GetCoordinatorStats(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + // get coordinator stats + stats, err := admin.Transactions().GetCoordinatorStats() + assert.NoError(t, err) + assert.NotNil(t, stats) +} + +func TestTransactions_GetCoordinatorStatsWithID(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + // get coordinator stats for coordinator ID 0 + stats, err := admin.Transactions().GetCoordinatorStatsWithCoordinatorID(0) + assert.NoError(t, err) + assert.NotNil(t, stats) +} + +func TestTransactions_GetSlowTransactions(t *testing.T) { + config := &config.Config{} + admin, err := New(config) + require.NoError(t, err) + require.NotNil(t, admin) + + // get slow transactions + stats, err := admin.Transactions().GetSlowTransactions(0, 1000) + assert.NoError(t, err) + assert.NotNil(t, stats) +} \ No newline at end of file diff --git a/pulsaradmin/pkg/utils/proxy_stats.go b/pulsaradmin/pkg/utils/proxy_stats.go new file mode 100644 index 0000000000..9e3000b5b3 --- /dev/null +++ b/pulsaradmin/pkg/utils/proxy_stats.go @@ -0,0 +1,76 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utils + +// ProxyStats represents statistics for a Pulsar proxy +type ProxyStats struct { + ProxyName string `json:"proxyName"` + DirectMemoryUsed int64 `json:"directMemoryUsed"` + JvmMemoryUsed int64 `json:"jvmMemoryUsed"` + SystemCpuUsage float64 `json:"systemCpuUsage"` + ProcessCpuUsage float64 `json:"processCpuUsage"` + ActiveConnections int64 `json:"activeConnections"` + TotalConnections int64 `json:"totalConnections"` + BytesIn int64 `json:"bytesIn"` + BytesOut int64 `json:"bytesOut"` + MsgIn int64 `json:"msgIn"` + MsgOut int64 `json:"msgOut"` + TopicLoadTimeMs int64 `json:"topicLoadTimeMs"` + BrokerCount int `json:"brokerCount"` + BundleCount int `json:"bundleCount"` + ConsumerCount int64 `json:"consumerCount"` + ProducerCount int64 `json:"producerCount"` + RequestParseTimeMs int64 `json:"requestParseTimeMs"` + RequestProcessTimeMs int64 `json:"requestProcessTimeMs"` +} + +// ConnectionStats represents connection statistics +type ConnectionStats struct { + Address string `json:"address"` + ConnectedSince string `json:"connectedSince"` + ClientVersion string `json:"clientVersion"` + ConnectTime int64 `json:"connectTime"` + RemoteAddress string `json:"remoteAddress"` + MsgRateIn float64 `json:"msgRateIn"` + MsgRateOut float64 `json:"msgRateOut"` + MsgThroughputIn float64 `json:"msgThroughputIn"` + MsgThroughputOut float64 `json:"msgThroughputOut"` + ConsumerCount int `json:"consumerCount"` + ProducerCount int `json:"producerCount"` +} + +// ProxyTopicStats represents topic statistics at proxy level +type ProxyTopicStats struct { + TopicName string `json:"topicName"` + ConsumerCount int `json:"consumerCount"` + ProducerCount int `json:"producerCount"` + MsgRateIn float64 `json:"msgRateIn"` + MsgRateOut float64 `json:"msgRateOut"` + MsgThroughputIn float64 `json:"msgThroughputIn"` + MsgThroughputOut float64 `json:"msgThroughputOut"` +} + +// BundleUnloadingMetrics represents bundle unloading metrics for load balancing +type BundleUnloadingMetrics struct { + LoadBalanceSuccessCount int64 `json:"loadBalanceSuccessCount"` + LoadBalanceFailCount int64 `json:"loadBalanceFailCount"` + UnloadBundleTotal int64 `json:"unloadBundleTotal"` + LoadAvg float64 `json:"loadAvg"` + OverloadedBrokerCount int `json:"overloadedBrokerCount"` + UnderLoadedBrokerCount int `json:"underLoadedBrokerCount"` +} \ No newline at end of file diff --git a/pulsaradmin/pkg/utils/transaction_stats.go b/pulsaradmin/pkg/utils/transaction_stats.go new file mode 100644 index 0000000000..e0733289c2 --- /dev/null +++ b/pulsaradmin/pkg/utils/transaction_stats.go @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package utils + +// TransactionCoordinatorStats represents transaction coordinator statistics +type TransactionCoordinatorStats struct { + CoordinatorID int `json:"coordinatorId"` + State string `json:"state"` + LeastSigBits int64 `json:"leastSigBits"` + MostSigBits int64 `json:"mostSigBits"` + TotalSize int64 `json:"totalSize"` + Clusters []string `json:"clusters"` + LowWaterMark int64 `json:"lowWaterMark"` + HighWaterMark int64 `json:"highWaterMark"` + TransactionCount int64 `json:"transactionCount"` + ProducedCount int64 `json:"producedCount"` + AckedCount int64 `json:"ackedCount"` + OngoingTxnCount int64 `json:"ongoingTxnCount"` + RecoveringTxnCount int64 `json:"recoveringTxnCount"` + TimeoutTxnCount int64 `json:"timeoutTxnCount"` + CommittedTxnCount int64 `json:"committedTxnCount"` + AbortedTxnCount int64 `json:"abortedTxnCount"` + ExecutionLatencyPercentiles map[string]float64 `json:"executionLatencyPercentiles"` + CommitLatencyPercentiles map[string]float64 `json:"commitLatencyPercentiles"` + AbortLatencyPercentiles map[string]float64 `json:"abortLatencyPercentiles"` +} \ No newline at end of file