Skip to content

Commit

Permalink
Merge pull request #58 from kubescape/delay
Browse files Browse the repository at this point in the history
add delivery delay option to produceMessage
  • Loading branch information
kooomix authored Jan 7, 2025
2 parents a4b52e1 + 590146a commit 2b7edfe
Showing 1 changed file with 18 additions and 12 deletions.
30 changes: 18 additions & 12 deletions pulsar/connector/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,11 +89,12 @@ func newProducer(pulsarClient Client, createProducerOption ...CreateProducerOpti
}

type produceMessageOptions struct {
msgToSend interface{}
pulsarClient Client
ctx context.Context
properties map[string]string
key string
msgToSend interface{}
pulsarClient Client
ctx context.Context
properties map[string]string
key string
deliveryDelay time.Duration
}

type ProduceMessageOption func(*produceMessageOptions)
Expand Down Expand Up @@ -127,6 +129,13 @@ func WithProperties(properties map[string]string) ProduceMessageOption {
}
}

// through a `SubscriptionType=Shared` subscription. With other subscription types, the messages will still be delivered immediately.
func WithDelay(deliveryDelay time.Duration) ProduceMessageOption {
return func(o *produceMessageOptions) {
o.deliveryDelay = deliveryDelay
}
}

func ProduceMessage(producer pulsar.Producer, producerOpts ...ProduceMessageOption) error {
opts := &produceMessageOptions{}
for _, o := range producerOpts {
Expand All @@ -138,13 +147,10 @@ func ProduceMessage(producer pulsar.Producer, producerOpts ...ProduceMessageOpti
}

msg := pulsar.ProducerMessage{
Payload: msgBytes,
Properties: opts.properties,
Key: opts.key,
}

if err != nil {
return fmt.Errorf("ProduceMessage: failed to create producer: %w", err)
Payload: msgBytes,
Properties: opts.properties,
Key: opts.key,
DeliverAfter: opts.deliveryDelay,
}

// Publish the mock message to the topic
Expand Down

0 comments on commit 2b7edfe

Please sign in to comment.