Skip to content

Commit f333880

Browse files
authored
chore: rename pubsub processor struct (#1389)
Signed-off-by: Alessandro Yuichi Okimoto <[email protected]>
1 parent 436963f commit f333880

12 files changed

+33
-33
lines changed

pkg/subscriber/cmd/server/server.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
319319
}
320320
defer batchClient.Close()
321321

322-
processors, err := s.registerProcessorMap(
322+
pubSubProcessors, err := s.registerPubSubProcessorMap(
323323
ctx,
324324
environmentClient,
325325
mysqlClient,
@@ -337,7 +337,7 @@ func (s *server) Run(ctx context.Context, metrics metrics.Metrics, logger *zap.L
337337
return err
338338
}
339339

340-
multiPubSub, err := s.startMultiPubSub(ctx, processors, logger)
340+
multiPubSub, err := s.startMultiPubSub(ctx, pubSubProcessors, logger)
341341
if err != nil {
342342
return err
343343
}
@@ -394,7 +394,7 @@ func (s *server) createMySQLClient(
394394

395395
func (s *server) startMultiPubSub(
396396
ctx context.Context,
397-
processors *processor.Processors,
397+
processors *processor.PubSubProcessors,
398398
logger *zap.Logger,
399399
) (*subscriber.MultiSubscriber, error) {
400400
multiSubscriber := subscriber.NewMultiSubscriber(
@@ -422,7 +422,7 @@ func (s *server) startMultiPubSub(
422422
// we should skip the error, just log it here
423423
continue
424424
}
425-
multiSubscriber.AddSubscriber(subscriber.NewSubscriber(
425+
multiSubscriber.AddSubscriber(subscriber.NewPubSubSubscriber(
426426
name, config, p,
427427
subscriber.WithLogger(logger),
428428
))
@@ -461,7 +461,7 @@ func (s *server) startMultiPubSub(
461461
return multiSubscriber, nil
462462
}
463463

464-
func (s *server) registerProcessorMap(
464+
func (s *server) registerPubSubProcessorMap(
465465
ctx context.Context,
466466
environmentClient environmentclient.Client,
467467
mysqlClient mysql.Client,
@@ -474,8 +474,8 @@ func (s *server) registerProcessorMap(
474474
sender notificationsender.Sender,
475475
registerer metrics.Registerer,
476476
logger *zap.Logger,
477-
) (*processor.Processors, error) {
478-
processors := processor.NewProcessors(registerer)
477+
) (*processor.PubSubProcessors, error) {
478+
processors := processor.NewPubSubProcessors(registerer)
479479
writer.RegisterMetrics(registerer)
480480

481481
processorsConfigBytes, err := os.ReadFile(*s.processorsConfig)

pkg/subscriber/processor/auditlog_persister.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func NewAuditLogPersister(
5050
config interface{},
5151
mysqlClient mysql.Client,
5252
logger *zap.Logger,
53-
) (subscriber.Processor, error) {
53+
) (subscriber.PubSubProcessor, error) {
5454
auditLogPersisterJsonConfig, ok := config.(map[string]interface{})
5555
if !ok {
5656
logger.Error("AuditLogPersister: invalid config")

pkg/subscriber/processor/domain_event_informer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func NewDomainEventInformer(
5050
environmentClient environmentclient.Client,
5151
sender sender.Sender,
5252
logger *zap.Logger,
53-
) subscriber.Processor {
53+
) subscriber.PubSubProcessor {
5454
return &domainEventInformer{
5555
environmentClient: environmentClient,
5656
sender: sender,

pkg/subscriber/processor/evaluation_events_evaluation_count_event_persister.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func NewEvaluationCountEventPersister(
7070
mysqlClient mysql.Client,
7171
evaluationCountCacher cache.MultiGetDeleteCountCache,
7272
logger *zap.Logger,
73-
) (subscriber.Processor, error) {
73+
) (subscriber.PubSubProcessor, error) {
7474
evaluationCountEventPersisterJsonConfig, ok := config.(map[string]interface{})
7575
if !ok {
7676
logger.Error("EvaluationCountEventPersister: invalid config")

pkg/subscriber/processor/events_dwh_persister.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func NewEventsDWHPersister(
6363
ftClient featureclient.Client,
6464
persisterName string,
6565
logger *zap.Logger,
66-
) (subscriber.Processor, error) {
66+
) (subscriber.PubSubProcessor, error) {
6767
jsonConfig, ok := config.(map[string]interface{})
6868
if !ok {
6969
logger.Error("eventsDWHPersister: invalid config")

pkg/subscriber/processor/events_ops_persister.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func NewEventsOPSPersister(
5858
ftClient featureclient.Client,
5959
persisterName string,
6060
logger *zap.Logger,
61-
) (subscriber.Processor, error) {
61+
) (subscriber.PubSubProcessor, error) {
6262
jsonConfig, ok := config.(map[string]interface{})
6363
if !ok {
6464
logger.Error("eventsOPSPersister: invalid config")

pkg/subscriber/processor/metrics_event_persister.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type metricsEventPersister struct {
5151
func NewMetricsEventPersister(
5252
registerer metrics.Registerer,
5353
logger *zap.Logger,
54-
) subscriber.Processor {
54+
) subscriber.PubSubProcessor {
5555
return &metricsEventPersister{
5656
storage: storage.NewStorage(logger, registerer),
5757
logger: logger,

pkg/subscriber/processor/processors.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,22 +39,22 @@ var (
3939
unsupportedProcessorErr = errors.New("subscriber: unsupported processor")
4040
)
4141

42-
type Processors struct {
43-
processorMap map[string]subscriber.Processor
42+
type PubSubProcessors struct {
43+
processorMap map[string]subscriber.PubSubProcessor
4444
}
4545

46-
func NewProcessors(r metrics.Registerer) *Processors {
46+
func NewPubSubProcessors(r metrics.Registerer) *PubSubProcessors {
4747
registerMetrics(r)
48-
return &Processors{
49-
processorMap: make(map[string]subscriber.Processor),
48+
return &PubSubProcessors{
49+
processorMap: make(map[string]subscriber.PubSubProcessor),
5050
}
5151
}
5252

53-
func (p *Processors) RegisterProcessor(name string, processor subscriber.Processor) {
53+
func (p *PubSubProcessors) RegisterProcessor(name string, processor subscriber.PubSubProcessor) {
5454
p.processorMap[name] = processor
5555
}
5656

57-
func (p *Processors) GetProcessorByName(name string) (subscriber.Processor, error) {
57+
func (p *PubSubProcessors) GetProcessorByName(name string) (subscriber.PubSubProcessor, error) {
5858
if p, ok := p.processorMap[name]; ok {
5959
return p, nil
6060
}

pkg/subscriber/processor/push_sender.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func NewPushSender(
6363
batchClient btclient.Client,
6464
mysqlClient mysql.Client,
6565
logger *zap.Logger,
66-
) subscriber.Processor {
66+
) subscriber.PubSubProcessor {
6767
return &pushSender{
6868
featureClient: featureClient,
6969
batchClient: batchClient,

pkg/subscriber/processor/segment_user_persister.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func NewSegmentUserPersister(
6565
batchClient btclient.Client,
6666
mysqlClient mysql.Client,
6767
logger *zap.Logger,
68-
) (subscriber.Processor, error) {
68+
) (subscriber.PubSubProcessor, error) {
6969
segmentPersisterJsonConfig, ok := config.(map[string]interface{})
7070
if !ok {
7171
logger.Error("SegmentUserPersister: invalid config")

0 commit comments

Comments
 (0)