Skip to content
This repository has been archived by the owner on May 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #3 from LF-Engineering/implement-kinesis-firehose-…
Browse files Browse the repository at this point in the history
…wrapper

Implement kinesis firehose wrapper with chunks
  • Loading branch information
aultron authored Sep 21, 2021
2 parents 02732f2 + 70426d2 commit 880c2cc
Show file tree
Hide file tree
Showing 8 changed files with 1,049 additions and 131 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ GO_LINT=golint -set_exit_status
GO_VET=go vet
GO_IMPORTS=goimports -w
GO_ERRCHECK=errcheck -asserts -ignore '[FS]?[Pp]rint*'
GO_FILES=context.go email.go error.go es.go exec.go json.go log.go mbox.go redacted.go request.go threads.go time.go utils.go uuid.go
GO_FILES=context.go email.go error.go es.go exec.go json.go log.go mbox.go redacted.go request.go threads.go time.go utils.go uuid.go firehose/firehose.go
all: check build
check: fmt lint imports vet errcheck
lint: ${GO_FILES}
Expand Down
277 changes: 277 additions & 0 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package firehose

import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"os"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/firehose"
"github.com/aws/aws-sdk-go-v2/service/firehose/types"
)

const (
region = "AWS_REGION"
defaultRegion = "us-east-1"
maxChunkSize = 1020000
)

//Config aws configuration
type Config struct {
Endpoint string
Region string
}

// PutResponse per record
type PutResponse struct {
RecordID string
Error error
}

// ClientProvider for kinesis firehose
type ClientProvider struct {
firehose *firehose.Client
region string
endPoint string
}

// NewClientProvider initiate new client provider
func NewClientProvider() (*ClientProvider, error) {
c := &ClientProvider{}
c.region = os.Getenv(region)
if c.region == "" {
log.Printf("No AWS Region found for env var AWS_REGION. setting defaultRegion=%s \n", defaultRegion)
c.region = defaultRegion
}

if os.Getenv("LOCALSTACK_HOSTNAME") != "" {
c.endPoint = os.Getenv("LOCALSTACK_HOSTNAME")
}

customResolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
if c.endPoint != "" {
return aws.Endpoint{
URL: fmt.Sprintf("http://%s:4566", c.endPoint),
SigningRegion: c.region,
}, nil
}

// returning EndpointNotFoundError will allow the service to fall back to its default resolution
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
})

cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion(c.region),
config.WithEndpointResolver(customResolver),
)
if err != nil {
return nil, err
}
c.firehose = firehose.NewFromConfig(cfg)

return c, nil
}

// CreateDeliveryStream creating firehose delivery stream channel
// You must provide channel name as required parameter
// If channel created successfully it will return nil else it will return error
func (c *ClientProvider) CreateDeliveryStream(channel string) error {
params := &firehose.CreateDeliveryStreamInput{
DeliveryStreamName: aws.String(channel),
DeliveryStreamType: types.DeliveryStreamTypeDirectPut,
}
_, err := c.firehose.CreateDeliveryStream(context.Background(), params)
return err
}

// PutRecordBatch is operation for Amazon Kinesis Firehose
// Writes multiple data records into a delivery stream in a single call, which
// can achieve higher throughput per producer than when writing single records.
//
// Each PutRecordBatch request supports up to 500 records. Each record in the
// request can be as large as 1,000 KB (before 64-bit encoding), up to a limit
// of 4 MB for the entire request.
//
// You must specify the name of the delivery stream and the data record when
// using PutRecord. The data record consists of a data blob that can be up to
// 1,000 KB in size.
//
// The PutRecordBatch response includes a map of failed records.
// Even if the PutRecordBatch call succeeds
//
// Data records sent to Kinesis Data Firehose are stored for 24 hours from the
// time they are added to a delivery stream as it attempts to send the records
// to the destination. If the destination is unreachable for more than 24 hours,
// the data is no longer available.
//
// Don't concatenate two or more base64 strings to form the data fields of your
// records. Instead, concatenate the raw data, then perform base64 encoding.
func (c *ClientProvider) PutRecordBatch(channel string, records []interface{}) ([]*PutResponse, error) {
ch := make(chan *chanPutResponse)
chunk := make([]interface{}, 0)
requestCounter := 0
smallerChunks := make([][]byte, 0)
for _, record := range records {
r, err := json.Marshal(record)
if err != nil {
return []*PutResponse{}, err
}
if len(r) > maxChunkSize {
smallerChunks = c.chunkSlice(r, maxChunkSize)
}
chunkSize, err := json.Marshal(chunk)
if err != nil {
return []*PutResponse{}, err
}
if (len(chunkSize)+len(r)) < 3670016 && len(chunk) < 500 {
if len(smallerChunks) > 0 {
var smallChunk interface{}
for _, c := range smallerChunks {
err := json.Unmarshal(c, &smallChunk)
if err != nil {
return []*PutResponse{}, err
}
chunk = append(chunk, smallChunk)
}
// TODO: reset smallChunks slice to 0?
} else {
chunk = append(chunk, record)
}
} else {
requestCounter++
go func() {
result, err := c.send(channel, chunk)
if err != nil {
ch <- &chanPutResponse{Error: err}
}
ch <- &chanPutResponse{Result: result}
}()

chunk = make([]interface{}, 0)
chunk = append(chunk, record)
}
}

if len(chunk) > 0 {
requestCounter++
go func() {
result, err := c.send(channel, chunk)
if err != nil {
ch <- &chanPutResponse{Error: err}
}
ch <- &chanPutResponse{Result: result}
}()
}

var res []*PutResponse
for i := 0; i < requestCounter; i++ {
select {
case r := <-ch:
if r.Error != nil {
return []*PutResponse{}, r.Error
}
res = append(res, r.Result...)
}
}

return res, nil
}

// chunkSlice creates small chunks that are less than or equal to 1 MB
func (c *ClientProvider) chunkSlice(slice []byte, chunkSize int) [][]byte {
var chunks [][]byte
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize

if end > len(slice) {
end = len(slice)
}

chunks = append(chunks, slice[i:end])
}

return chunks
}

// PutRecord is operation for Amazon Kinesis Firehose.
// Writes a single data record into an Amazon Kinesis Data Firehose delivery
// stream.
//
// By default, each delivery stream can take in up to 2,000 transactions per
// second, 5,000 records per second, or 5 MB per second.
//
// You must specify the name of the delivery stream and the data record when
// using PutRecord. The data record consists of a data blob that can be up to
// 1,000 KB in size, and any kind of data. You must specify the name of the delivery stream and the data record when
// using PutRecord. The data record consists of a data blob that can be up to
// 1,000 KB in size, and any kind of data.
//
// Kinesis Data Firehose buffers records before delivering them to the destination.
// To disambiguate the data blobs at the destination, a common solution is to
// use delimiters in the data, such as a newline (\n) or some other character
// unique within the data. This allows the consumer application to parse individual
// data items when reading the data from the destination.
//
// The PutRecord operation returns a RecordId, which is a unique string assigned
// to each record.
func (c *ClientProvider) PutRecord(channel string, record interface{}) (*PutResponse, error) {
b, err := json.Marshal(record)
if err != nil {
return &PutResponse{}, err
}
if len(b) > 1020000 {
return &PutResponse{}, errors.New("record exceeded the limit of 1 mb")
}

params := &firehose.PutRecordInput{
DeliveryStreamName: aws.String(channel),
Record: &types.Record{Data: b},
}
res, err := c.firehose.PutRecord(context.Background(), params)
if err != nil {
return &PutResponse{}, err
}
return &PutResponse{RecordID: *res.RecordId, Error: nil}, nil
}

func (c *ClientProvider) send(channel string, records []interface{}) ([]*PutResponse, error) {
inputs := make([]types.Record, 0)
for _, r := range records {
b, err := json.Marshal(r)
if err != nil {
return []*PutResponse{}, err
}
inputs = append(inputs, types.Record{Data: b})
}

params := &firehose.PutRecordBatchInput{
DeliveryStreamName: aws.String(channel),
Records: inputs,
}
recordBatch, err := c.firehose.PutRecordBatch(context.Background(), params)
if err != nil {
return []*PutResponse{}, err
}

res := make([]*PutResponse, 0)
for _, r := range recordBatch.RequestResponses {
var err error
if r.ErrorMessage != nil {
err = errors.New(*r.ErrorMessage)
}

if r.RecordId != nil {
res = append(res, &PutResponse{RecordID: *r.RecordId, Error: err})
}
}
return res, nil
}

type chanPutResponse struct {
Result []*PutResponse
Error error
}
14 changes: 5 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
module github.com/LF-Engineering/insights-datasource-shared

go 1.16
go 1.15

require (
github.com/LF-Engineering/dev-analytics-libraries v1.1.20
github.com/avast/retry-go v3.0.0+incompatible
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/elastic/go-elasticsearch/v8 v8.0.0-20210817124755-97fca1753fd7
github.com/google/uuid v1.3.0
github.com/aws/aws-sdk-go-v2 v1.8.0
github.com/aws/aws-sdk-go-v2/config v1.6.0
github.com/aws/aws-sdk-go-v2/service/firehose v1.4.2
github.com/json-iterator/go v1.1.11
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.6.1
golang.org/x/text v0.3.7
gopkg.in/resty.v1 v1.12.0
)
16 changes: 11 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,31 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/LF-Engineering/dev-analytics-libraries v1.1.20 h1:jDm0J9Cba7wu50Kt4yVEhNAklC9nUGGqsywtiWewXFc=
github.com/LF-Engineering/dev-analytics-libraries v1.1.20/go.mod h1:O+9mOX1nf6qGKrZne33F6speSzrGj6+Y1tPF6jh/mcw=
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go v1.36.15/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro=
github.com/aws/aws-sdk-go-v2 v1.3.1/go.mod h1:5SmWRTjN6uTRFNCc7rR69xHsdcUJnthmaRHGDsYhpTE=
github.com/aws/aws-sdk-go-v2 v1.8.0/go.mod h1:xEFuWz+3TYdlPRuo+CqATbeDWIWyaT5uAPwPaWtgse0=
github.com/aws/aws-sdk-go-v2/config v1.1.4/go.mod h1:op05ummoVoAqctpA80jVt/+hvEtLfuKmDyx0bIuvfbE=
github.com/aws/aws-sdk-go-v2/config v1.6.0/go.mod h1:TNtBVmka80lRPk5+S9ZqVfFszOQAGJJ9KbT3EM3CHNU=
github.com/aws/aws-sdk-go-v2/credentials v1.1.4/go.mod h1:UQwsT2w2XelrWoVV2v/zL2uce1RxmVCiHaZsoKLamZg=
github.com/aws/aws-sdk-go-v2/credentials v1.3.2/go.mod h1:PACKuTJdt6AlXvEq8rFI4eDmoqDFC5DpVKQbWysaDgM=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.5/go.mod h1:z/NKNlYxMzphl7TzjV+ctUebHF4CFNGGlSvmV/NKcJU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.4.0/go.mod h1:Mj/U8OpDbcVcoctrYwA2bak8k/HFPdcLzI/vaiXMwuM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.2.0/go.mod h1:Q5jATQc+f1MfZp3PDMhn6ry18hGvE0i8yvbXoKbnZaE=
github.com/aws/aws-sdk-go-v2/service/firehose v1.4.2/go.mod h1:3e5IJyFRA/3yCGkJE8cDndXvbC8qDhNeHuhAAOSI7o0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.5/go.mod h1:MW0O/RpmVpS6MWKn6W03XEJmqXlG7+d3iaYLzkd2fAc=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.2/go.mod h1:NXmNI41bdEsJMrD0v9rUvbGCB5GwdBEpKvUvIY3vTFg=
github.com/aws/aws-sdk-go-v2/service/ssm v1.3.0/go.mod h1:WX5OhauvURAo0+ljp29uEIitBkt3+Y3RGnVa9ix2xc8=
github.com/aws/aws-sdk-go-v2/service/sso v1.1.4/go.mod h1:yQayEbOWH75NaKFylsFocBc3yanYEGndlOaH4i/Lvno=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.2/go.mod h1:J21I6kF+d/6XHVk7kp/cx9YVD2TMD2TbLwtRGVcinXo=
github.com/aws/aws-sdk-go-v2/service/sts v1.2.1/go.mod h1:L1LH5nHMXxdkKj057ZUx7Wi50CCrkZ+9jkTnBnY2j/w=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.1/go.mod h1:hLZ/AnkIKHLuPGjEiyghNEdvJ2PP0MgOxcmv9EBJ4xs=
github.com/aws/smithy-go v1.3.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/aws/smithy-go v1.7.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
Expand All @@ -61,7 +70,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/elastic/go-elasticsearch/v8 v8.0.0-20201229214741-2366c2514674/go.mod h1:xe9a/L2aeOgFKKgrO3ibQTnMdpAeL0GC+5/HpGScSa4=
github.com/elastic/go-elasticsearch/v8 v8.0.0-20210817124755-97fca1753fd7/go.mod h1:xe9a/L2aeOgFKKgrO3ibQTnMdpAeL0GC+5/HpGScSa4=
github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -110,6 +118,7 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-github/v33 v33.0.0/go.mod h1:GMdDnVZY/2TsWgp/lkYnpSAh6TrzhANBBwm6k6TTEXg=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand All @@ -124,7 +133,6 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.4/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -169,8 +177,6 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/xanzy/ssh-agent v0.2.1/go.mod h1:mLlQY/MoOhWBj+gOGMQkOeiEvkx+8pJSI+0Bx9h2kr4=
Expand Down
Loading

0 comments on commit 880c2cc

Please sign in to comment.