Skip to content

Commit

Permalink
kafka consumer done
Browse files Browse the repository at this point in the history
  • Loading branch information
hirowf committed Apr 10, 2022
1 parent 3b42e74 commit 356e16b
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 8 deletions.
19 changes: 19 additions & 0 deletions application/factory/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package factory

import (
"github.com/jinzhu/gorm"
"github.com/santosant/codepix/application/usecase"
"github.com/santosant/codepix/infrastructure/repository"
)

func TransactionUseCaseFactory(database *gorm.DB) usecase.TransactionUseCase {
pixRepository := repository.PixKeyRepositoryDb{Db: database}
transactionRepository := repository.TransactionRepositoryDb{Db: database}

transactionUseCase := usecase.TransactionUseCase{
TransactionRepository: &transactionRepository,
PixRepository: &pixRepository,
}

return transactionUseCase
}
113 changes: 109 additions & 4 deletions application/kafka/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"fmt"
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/jinzhu/gorm"
"github.com/santosant/codepix/application/factory"
appmodel "github.com/santosant/codepix/application/model"
"github.com/santosant/codepix/application/usecase"
"github.com/santosant/codepix/domain/model"
"os"
)

type KafkaProcessor struct {
Expand All @@ -20,25 +25,125 @@ func NewKafkaProcessor(database *gorm.DB, producer *ckafka.Producer, deliveryCha
}
}

func (l *KafkaProcessor) Consume() {
func (k *KafkaProcessor) Consume() {
configMap := &ckafka.ConfigMap{
"bootstrap.servers": "kafka:9092",
"group.id": "consumergroup",
"bootstrap.servers": os.Getenv("kafkaBootstrapServers"),
"group.id": os.Getenv("kafkaConsumerGroupId"),
"auto.offset.reset": "earliest",
}
c, err := ckafka.NewConsumer(configMap)
if err != nil {
panic(err)
}

topics := []string{"test"}
topics := []string{os.Getenv("kafkaTransactionTopic"), os.Getenv("kafkaTransactionConfirmationTopic")}
c.SubscribeTopics(topics, nil)

fmt.Println("kafka consume has been started")
for {
msg, err := c.ReadMessage(-1)
if err == nil {
k.processMessage(msg)
fmt.Println(string(msg.Value))
}
}
}

func (k *KafkaProcessor) processMessage(msg *ckafka.Message) {
transactionsTopic := "transactions"
transactionsConfirmationTopic := "transaction_confirmation"

switch topic := *msg.TopicPartition.Topic; topic {
case transactionsTopic:
k.processTransaction(msg)
case transactionsConfirmationTopic:
k.processTransactionConfirmation(msg)
default:
fmt.Println("not a valid topic", string(msg.Value))
}
}

func (k *KafkaProcessor) processTransaction(msg *ckafka.Message) error {
transaction := appmodel.NewTransction()
err := transaction.ParseJson(msg.Value)
if err != nil {
return err
}

transactionUseCase := factory.TransactionUseCaseFactory(k.Database)

createdTransaction, err := transactionUseCase.Register(
transaction.AccountID,
transaction.Amount,
transaction.PixKeyTo,
transaction.PixKeyKindTo,
transaction.Description,
transaction.ID,
)

if err != nil {
fmt.Println("error registering transaction", err)
return err
}

topic := "bank" + createdTransaction.PixKeyTo.Account.Bank.Code
transaction.ID = createdTransaction.ID
transaction.Status = model.TransactionPending
transactionJson, err := transaction.ToJson()

if err != nil {
return err
}

err = Publish(string(transactionJson), topic, k.Producer, k.DeliveryChan)
if err != nil {
return err
}

return nil
}

func (k *KafkaProcessor) processTransactionConfirmation(msg *ckafka.Message) error {
transaction := appmodel.NewTransction()
err := transaction.ParseJson(msg.Value)
if err != nil {
return err
}

transactionUseCase := factory.TransactionUseCaseFactory(k.Database)

if transaction.Status == model.TransactionConfirmed {
err = k.confirmTransaction(transaction, transactionUseCase)
if err != nil {
return err
}
return nil
} else if transaction.Status == model.TransactionCompleted {
_, err := transactionUseCase.Complete(transaction.ID)
if err != nil {
return err
}
return nil
}
return err
}

func (k *KafkaProcessor) confirmTransaction(transaction *appmodel.Transaction, trasactionUseCase usecase.TransactionUseCase) error {
confirmedTransaction, err := trasactionUseCase.Confirm(transaction.ID)
if err != nil {
return err
}

topic := "bank" + confirmedTransaction.AccountFrom.Bank.Code
transactionJson, err := transaction.ToJson()
if err != nil {
return err
}

err = Publish(string(transactionJson), topic, k.Producer, k.DeliveryChan)
if err != nil {
return err
}

return nil
}
3 changes: 2 additions & 1 deletion application/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package kafka
import (
"fmt"
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
"os"
)

func NewKafkaProducer() *ckafka.Producer {
configMap := &ckafka.ConfigMap{
"bootstrap.servers": "kafka:9092",
"bootstrap.servers": os.Getenv("kafkaBootstrapServers"),
}
p, err := ckafka.NewProducer(configMap)
if err != nil {
Expand Down
58 changes: 58 additions & 0 deletions application/model/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package model

import (
"encoding/json"
"github.com/go-playground/validator/v10"
"github.com/google/martian/log"
)

type Transaction struct {
ID string `json:"id" validate:"required,uuid4"`
AccountID string `json:"accountId" validate:"required,uuid4"`
Amount float64 `json:"amount" validate:"required,numeric"`
PixKeyTo string `json:"pixKeyTo" validate:"required"`
PixKeyKindTo string `json:"pixKeyKindTo" validate:"required"`
Description string `json:"description" validate:"required"`
Status string `json:"status" validate:"-"`
Error string `json:"error"`
}

func (t *Transaction) isValid() error {
v := validator.New()
err := v.Struct(t)
if err != nil {
log.Errorf("error during Transaction validation: %s", err.Error())
return err
}
return nil
}

func (t *Transaction) ParseJson(data []byte) error {
err := json.Unmarshal(data, t)
if err != nil {
return err
}
err = t.isValid()
if err != nil {
return err
}
return nil
}

func (t *Transaction) ToJson() ([]byte, error) {
err := t.isValid()
if err != nil {
return nil, err
}

result, err := json.Marshal(t)
if err != nil {
return nil, err
}

return result, nil
}

func NewTransction() *Transaction {
return &Transaction{}
}
37 changes: 37 additions & 0 deletions cmd/all.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cmd

import (
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/santosant/codepix/application/grpc"
"github.com/santosant/codepix/application/kafka"
"github.com/santosant/codepix/infrastructure/db"
"os"

"github.com/spf13/cobra"
)

var (
gRPCPortNumber int
)

// allCmd represents the all command
var allCmd = &cobra.Command{
Use: "all",
Short: "Run gRPC and Kafka Consumer",
Run: func(cmd *cobra.Command, args []string) {
database := db.ConnectDB(os.Getenv("env"))
go grpc.StartGrpcServer(database, portNumber)

deliveryChan := make(chan ckafka.Event)
producer := kafka.NewKafkaProducer()

go kafka.DeliveryReport(deliveryChan)
KafkaProcessor := kafka.NewKafkaProcessor(database, producer, deliveryChan)
KafkaProcessor.Consume()
},
}

func init() {
rootCmd.AddCommand(allCmd)
allCmd.Flags().IntVarP(&gRPCPortNumber, "grpc-port", "p", 50051, "gRPC Port")
}
5 changes: 2 additions & 3 deletions cmd/kafka.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cmd

import (
"fmt"
ckafka "github.com/confluentinc/confluent-kafka-go/kafka"
kafka "github.com/santosant/codepix/application/kafka"
"github.com/santosant/codepix/infrastructure/db"
Expand All @@ -14,12 +13,12 @@ var kafkaCmd = &cobra.Command{
Use: "kafka",
Short: "Start consuming transactions using Apache kafka",
Run: func(cmd *cobra.Command, args []string) {
fmt.Println("Production message")
//fmt.Println("Production message")
deliveryChan := make(chan ckafka.Event)
database := db.ConnectDB(os.Getenv("env"))
producer := kafka.NewKafkaProducer()

kafka.Publish("Hello Kafka", "test", producer, deliveryChan)
//kafka.Publish("Hello Kafka", "test", producer, deliveryChan)
go kafka.DeliveryReport(deliveryChan)

KafkaProcessor := kafka.NewKafkaProcessor(database, producer, deliveryChan)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/go-playground/validator/v10 v10.4.1
github.com/golang/protobuf v1.5.2
github.com/google/martian v2.1.0+incompatible
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
Expand Down

0 comments on commit 356e16b

Please sign in to comment.