Skip to content

Commit 95514a2

Browse files
committed
refactor: Update event and order processing logic to improve subscription handling and logging
1 parent bd8124d commit 95514a2

2 files changed

Lines changed: 38 additions & 14 deletions

File tree

internal/kafka/event_consumer.go

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,19 +79,41 @@ func (c *EventConsumer) processEventEvent(value []byte) error {
7979

8080
// Handle different operations
8181
switch rawEvent.Payload.Op {
82-
case "c": // Event creation - notify organization subscribers
83-
if err := c.SubscriberService.ProcessEventCreation(&eventEvent); err != nil {
84-
log.Printf("Error processing event creation notification from Debezium: %v", err)
85-
return err
82+
case "c": // Skip event creation notification
83+
log.Printf("Skipping notification for event creation: %s", eventID)
84+
85+
case "u": // Event update - notify only if before=PENDING and after=APPROVED
86+
// Check status transition for updates
87+
if rawEvent.Payload.Before != nil && rawEvent.Payload.After != nil {
88+
beforeStatus := rawEvent.Payload.Before.Status
89+
afterStatus := rawEvent.Payload.After.Status
90+
91+
if beforeStatus == "PENDING" && afterStatus == "APPROVED" {
92+
// This is a status change from PENDING to APPROVED - treat as creation
93+
if err := c.SubscriberService.ProcessEventCreation(&eventEvent); err != nil {
94+
log.Printf("Error processing event approval notification from Debezium: %v", err)
95+
return err
96+
}
97+
log.Printf("Successfully processed event approval (PENDING->APPROVED) notification for event %s", eventID)
98+
} else if afterStatus == "APPROVED" {
99+
// Other changes but final status is still APPROVED - process as update
100+
if err := c.SubscriberService.ProcessEventUpdate(&eventEvent); err != nil {
101+
log.Printf("Error processing event update notification from Debezium: %v", err)
102+
return err
103+
}
104+
log.Printf("Successfully processed event update notification for approved event %s", eventID)
105+
} else {
106+
// Status is not APPROVED - skip notification
107+
log.Printf("Skipping notification for event %s - status is %s (not APPROVED)", eventID, afterStatus)
108+
}
86109
}
87-
log.Printf("Successfully processed event creation notification for event %s", eventID)
88110

89-
case "u", "d": // Event update/delete - notify event subscribers
111+
case "d": // Event deletion - process normally for subscribers
90112
if err := c.SubscriberService.ProcessEventUpdate(&eventEvent); err != nil {
91-
log.Printf("Error processing event update notification from Debezium: %v", err)
113+
log.Printf("Error processing event deletion notification from Debezium: %v", err)
92114
return err
93115
}
94-
log.Printf("Successfully processed event update notification for event %s", eventID)
116+
log.Printf("Successfully processed event deletion notification for event %s", eventID)
95117

96118
default:
97119
log.Printf("Unhandled operation '%s' for event %s", rawEvent.Payload.Op, eventID)

internal/kafka/order_consumer.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -150,22 +150,24 @@ func (c *OrderConsumer) processOrderUpdated(value []byte) error {
150150
// For orders changing to 'completed' status, add subscriptions
151151
if order.Status == "completed" {
152152
// Add subscription to the event and session
153+
log.Printf("Order %s is now completed - adding subscriptions", order.OrderID)
154+
155+
log.Printf("Adding event subscription for event %s", order.EventID)
153156
if err := c.SubscriberService.AddSubscription(subscriber.SubscriberID, models.SubscriptionCategoryEvent, order.EventID); err != nil {
154157
log.Printf("Error adding event subscription: %v", err)
155158
}
156159

160+
log.Printf("Adding session subscription for session %s", order.SessionID)
157161
if err := c.SubscriberService.AddSubscription(subscriber.SubscriberID, models.SubscriptionCategorySession, order.SessionID); err != nil {
158162
log.Printf("Error adding session subscription: %v", err)
159163
}
160164

161-
if order.OrganizationID != "" {
162-
if err := c.SubscriberService.AddSubscription(subscriber.SubscriberID, models.SubscriptionCategoryOrganization, order.OrganizationID); err != nil {
163-
log.Printf("Error adding organization subscription: %v", err)
164-
}
165+
log.Printf("Adding organization subscription for organization %s", order.OrganizationID)
166+
if err := c.SubscriberService.AddSubscription(subscriber.SubscriberID, models.SubscriptionCategoryOrganization, order.OrganizationID); err != nil {
167+
log.Printf("Error adding organization subscription: %v", err)
165168
}
166-
167-
log.Printf("Added subscriptions for completed order %s", order.OrderID)
168169
}
170+
log.Printf("Added subscriptions for completed order %s", order.OrderID)
169171

170172
// Send appropriate order email based on status
171173
if err := c.SubscriberService.SendOrderConfirmationEmail(subscriber, &order); err != nil {

0 commit comments

Comments
 (0)