-
Notifications
You must be signed in to change notification settings - Fork 3
/
main.go
98 lines (86 loc) · 2.72 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package main
import (
"fmt"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/gocql/gocql"
"github.com/pranjalmohansaxena/TKPModellingTechWorkshop/delivery"
"github.com/pranjalmohansaxena/TKPModellingTechWorkshop/repository"
"github.com/pranjalmohansaxena/TKPModellingTechWorkshop/usecase"
"go.uber.org/ratelimit"
)
func main() {
// kafka connection settings
topicName := "streaming-pipeline"
bootstrapServers := "127.0.0.1"
groupId := "sample_group"
autoOffsetReset := "latest"
timeout := "10s"
batchSize := 2
// Cassandra connection settings
cluster := gocql.NewCluster("127.0.0.1")
cluster.Port = 9042
cluster.Keyspace = "streaming_pipeline"
cluster.Consistency = gocql.Quorum
cluster.ConnectTimeout = time.Second * 10
cluster.Timeout = time.Second * 10
tableName := "pipeline_data"
rate := 5
keyspace := "streaming_pipeline"
fmt.Println("Initializing Kafka Consumer... ")
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": bootstrapServers,
"group.id": groupId,
"auto.offset.reset": autoOffsetReset,
})
if err != nil {
fmt.Println("Error occurred while creating Kafka Consumer Client: ", err)
return
}
fmt.Println("Initialized Kafka Consumer: ", consumer)
fmt.Println("Subscribing to the topic... ")
err = consumer.SubscribeTopics([]string{topicName}, nil)
if err != nil {
fmt.Println("Error occurred while subscribing to the topic", err)
return
}
fmt.Println("Subscribed to Kafka Topic: ", topicName)
fmt.Println("Initializing Datastore Repository Layer... ")
session, err := cluster.CreateSession()
if err != nil {
fmt.Println("Error occurred while creating cassandra session as: ", err)
return
}
respositoryLayer, err := repository.NewCassandraRepo(repository.CassandraParams{
Session: session,
TableName: tableName,
Keyspace: keyspace,
Rl: ratelimit.New(rate),
})
if err != nil {
fmt.Println("Error occurred while initializing Repository Layer as: ", err)
return
}
fmt.Println("Initializing Usecase Layer... ")
usecaseLayer, err := usecase.NewPipelineUsecase(usecase.Param{
Repository: respositoryLayer,
})
if err != nil {
fmt.Println("Error occurred while initializing Usecase Layer as: ", err)
return
}
fmt.Println("Initialized Usecase Layer: ", usecaseLayer)
fmt.Println("Initializing Kafka Delivery Layer... ")
deliveryLayer, err := delivery.NewKafkaDeliveryLayer(delivery.KafkaDeliveryParams{
Consumer: consumer,
Usecase: usecaseLayer,
Timeout: timeout,
BatchSize: batchSize,
})
if err != nil {
fmt.Println("Error occurred while initializing Kafka Delivery Layer as: ", err)
return
}
fmt.Println("Initialized Kafka Delivery Layer: ", deliveryLayer)
deliveryLayer.ConsumeEvents(topicName)
}