Skip to content

Commit

Permalink
refactor: changed transaction handling related to Notification package
Browse files Browse the repository at this point in the history
  • Loading branch information
kakcy committed Jan 21, 2025
1 parent 8dd5e57 commit c6f4aae
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 301 deletions.
73 changes: 11 additions & 62 deletions pkg/notification/api/admin_subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,9 @@ func (s *NotificationService) CreateAdminSubscription(
return nil, dt.Err()
}
var handler command.Handler = command.NewEmptyAdminSubscriptionCommandHandler()
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return nil, statusInternal.Err()
}
return nil, dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
adminSubscriptionStorage := v2ss.NewAdminSubscriptionStorage(tx)
if err := adminSubscriptionStorage.CreateAdminSubscription(ctx, subscription); err != nil {
err = s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
adminSubscriptionStorage := v2ss.NewAdminSubscriptionStorage(s.mysqlClient)
if err := adminSubscriptionStorage.CreateAdminSubscription(contextWithTx, subscription); err != nil {
return err
}
handler, err = command.NewAdminSubscriptionCommandHandler(editor, subscription)
Expand Down Expand Up @@ -388,26 +371,9 @@ func (s *NotificationService) updateAdminSubscription(
localizer locale.Localizer,
) error {
var handler command.Handler = command.NewEmptyAdminSubscriptionCommandHandler()
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return statusInternal.Err()
}
return dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
adminSubscriptionStorage := v2ss.NewAdminSubscriptionStorage(tx)
subscription, err := adminSubscriptionStorage.GetAdminSubscription(ctx, id)
err := s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
adminSubscriptionStorage := v2ss.NewAdminSubscriptionStorage(s.mysqlClient)
subscription, err := adminSubscriptionStorage.GetAdminSubscription(contextWithTx, id)
if err != nil {
return err
}
Expand All @@ -420,7 +386,7 @@ func (s *NotificationService) updateAdminSubscription(
return err
}
}
if err = adminSubscriptionStorage.UpdateAdminSubscription(ctx, subscription); err != nil {
if err = adminSubscriptionStorage.UpdateAdminSubscription(contextWithTx, subscription); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -485,26 +451,9 @@ func (s *NotificationService) DeleteAdminSubscription(
return nil, err
}
var handler command.Handler = command.NewEmptyAdminSubscriptionCommandHandler()
tx, err := s.mysqlClient.BeginTx(ctx)
if err != nil {
s.logger.Error(
"Failed to begin transaction",
log.FieldsFromImcomingContext(ctx).AddFields(
zap.Error(err),
)...,
)
dt, err := statusInternal.WithDetails(&errdetails.LocalizedMessage{
Locale: localizer.GetLocale(),
Message: localizer.MustLocalize(locale.InternalServerError),
})
if err != nil {
return nil, statusInternal.Err()
}
return nil, dt.Err()
}
err = s.mysqlClient.RunInTransaction(ctx, tx, func() error {
adminSubscriptionStorage := v2ss.NewAdminSubscriptionStorage(tx)
subscription, err := adminSubscriptionStorage.GetAdminSubscription(ctx, req.Id)
err = s.mysqlClient.RunInTransactionV2(ctx, func(contextWithTx context.Context, _ mysql.Transaction) error {
adminSubscriptionStorage := v2ss.NewAdminSubscriptionStorage(s.mysqlClient)
subscription, err := adminSubscriptionStorage.GetAdminSubscription(contextWithTx, req.Id)
if err != nil {
return err
}
Expand All @@ -515,7 +464,7 @@ func (s *NotificationService) DeleteAdminSubscription(
if err := handler.Handle(ctx, req.Command); err != nil {
return err
}
if err = adminSubscriptionStorage.DeleteAdminSubscription(ctx, req.Id); err != nil {
if err = adminSubscriptionStorage.DeleteAdminSubscription(contextWithTx, req.Id); err != nil {
return err
}
return nil
Expand Down
63 changes: 34 additions & 29 deletions pkg/notification/api/admin_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
v2ss "github.com/bucketeer-io/bucketeer/pkg/notification/storage/v2"
publishermock "github.com/bucketeer-io/bucketeer/pkg/pubsub/publisher/mock"
"github.com/bucketeer-io/bucketeer/pkg/rpc"
"github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql/mock"
mysqlmock "github.com/bucketeer-io/bucketeer/pkg/storage/v2/mysql/mock"
"github.com/bucketeer-io/bucketeer/pkg/token"
proto "github.com/bucketeer-io/bucketeer/proto/notification"
Expand Down Expand Up @@ -163,9 +164,8 @@ func TestCreateAdminSubscriptionMySQL(t *testing.T) {
{
desc: "success",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(nil)
s.domainEventPublisher.(*publishermock.MockPublisher).EXPECT().PublishMulti(
gomock.Any(), gomock.Any(),
Expand Down Expand Up @@ -272,9 +272,8 @@ func TestUpdateAdminSubscriptionMySQL(t *testing.T) {
{
desc: "err: ErrNotFound",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(v2ss.ErrAdminSubscriptionNotFound)
},
isSystemAdmin: true,
Expand All @@ -292,9 +291,8 @@ func TestUpdateAdminSubscriptionMySQL(t *testing.T) {
{
desc: "success: addSourceTypes",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(nil)
s.domainEventPublisher.(*publishermock.MockPublisher).EXPECT().PublishMulti(
gomock.Any(), gomock.Any(),
Expand All @@ -314,9 +312,8 @@ func TestUpdateAdminSubscriptionMySQL(t *testing.T) {
{
desc: "success: deleteSourceTypes",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(nil)
s.domainEventPublisher.(*publishermock.MockPublisher).EXPECT().PublishMulti(
gomock.Any(), gomock.Any(),
Expand All @@ -336,9 +333,8 @@ func TestUpdateAdminSubscriptionMySQL(t *testing.T) {
{
desc: "success: all commands",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(nil)
s.domainEventPublisher.(*publishermock.MockPublisher).EXPECT().PublishMulti(
gomock.Any(), gomock.Any(),
Expand Down Expand Up @@ -426,9 +422,8 @@ func TestEnableAdminSubscriptionMySQL(t *testing.T) {
{
desc: "success",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(nil)
s.domainEventPublisher.(*publishermock.MockPublisher).EXPECT().PublishMulti(
gomock.Any(), gomock.Any(),
Expand Down Expand Up @@ -499,9 +494,8 @@ func TestDisableAdminSubscriptionMySQL(t *testing.T) {
{
desc: "success",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(nil)
s.domainEventPublisher.(*publishermock.MockPublisher).EXPECT().PublishMulti(
gomock.Any(), gomock.Any(),
Expand Down Expand Up @@ -571,9 +565,8 @@ func TestDeleteAdminSubscriptionMySQL(t *testing.T) {
{
desc: "success",
setup: func(s *NotificationService) {
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().BeginTx(gomock.Any()).Return(nil, nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransaction(
gomock.Any(), gomock.Any(), gomock.Any(),
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().RunInTransactionV2(
gomock.Any(), gomock.Any(),
).Return(nil)
s.domainEventPublisher.(*publishermock.MockPublisher).EXPECT().PublishMulti(
gomock.Any(), gomock.Any(),
Expand Down Expand Up @@ -643,7 +636,11 @@ func TestGetAdminSubscriptionMySQL(t *testing.T) {
setup: func(s *NotificationService) {
row := mysqlmock.NewMockRow(mockController)
row.EXPECT().Scan(gomock.Any()).Return(nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().QueryRowContext(
qe := mock.NewMockQueryExecer(mockController)
s.mysqlClient.(*mock.MockClient).EXPECT().Qe(
gomock.Any(),
).Return(qe)
qe.EXPECT().QueryRowContext(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(row)
},
Expand Down Expand Up @@ -710,12 +707,16 @@ func TestListAdminSubscriptionsMySQL(t *testing.T) {
rows.EXPECT().Close().Return(nil)
rows.EXPECT().Next().Return(false)
rows.EXPECT().Err().Return(nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().QueryContext(
qe := mock.NewMockQueryExecer(mockController)
s.mysqlClient.(*mock.MockClient).EXPECT().Qe(
gomock.Any(),
).Return(qe).AnyTimes()
qe.EXPECT().QueryContext(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(rows, nil)
row := mysqlmock.NewMockRow(mockController)
row.EXPECT().Scan(gomock.Any()).Return(nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().QueryRowContext(
qe.EXPECT().QueryRowContext(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(row)
},
Expand Down Expand Up @@ -788,12 +789,16 @@ func TestListEnabledAdminSubscriptionsMySQL(t *testing.T) {
rows.EXPECT().Close().Return(nil)
rows.EXPECT().Next().Return(false)
rows.EXPECT().Err().Return(nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().QueryContext(
qe := mock.NewMockQueryExecer(mockController)
s.mysqlClient.(*mock.MockClient).EXPECT().Qe(
gomock.Any(),
).Return(qe).AnyTimes()
qe.EXPECT().QueryContext(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(rows, nil)
row := mysqlmock.NewMockRow(mockController)
row.EXPECT().Scan(gomock.Any()).Return(nil)
s.mysqlClient.(*mysqlmock.MockClient).EXPECT().QueryRowContext(
qe.EXPECT().QueryRowContext(
gomock.Any(), gomock.Any(), gomock.Any(),
).Return(row)
},
Expand Down
Loading

0 comments on commit c6f4aae

Please sign in to comment.