Skip to content

Commit fbf4cb4

Browse files
authored
add clickhouse buffer (#93)
1 parent 3aaae75 commit fbf4cb4

File tree

3 files changed

+33
-1
lines changed

3 files changed

+33
-1
lines changed

gen/source/sink/Clickhouse.pkl.go

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkl/source/sink/Sinks.pkl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class Clickhouse extends Sink {
3737
asyncInsert: String = "1"
3838
waitForAsyncInsert: String = "1"
3939
maxPartitionsPerInsertBlock: Int = 1000
40+
channelBuffersize: UInt = 10_000
4041
}
4142

4243
typealias Method = "PATCH"|"POST"|"PUT"|"DELETE"|"GET"

sources/sink/clickhouse/clickhouse.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"errors"
1111
"fmt"
1212
"io"
13+
"sync"
1314
"time"
1415

1516
"github.com/ClickHouse/ch-go"
@@ -335,7 +336,29 @@ func (c *Clickhouse) Sink(val any) {
335336
chData <- v
336337
close(chData)
337338
case chan any:
338-
chData = v
339+
newCh := make(chan any, c.clickConfig.GetChannelBuffersize())
340+
wg := sync.WaitGroup{}
341+
for vals := range v {
342+
wg.Add(1)
343+
switch item := vals.(type) {
344+
case []map[string]interface{}:
345+
go func() {
346+
for _, chV := range item {
347+
newCh <- chV
348+
}
349+
wg.Done()
350+
}()
351+
continue
352+
default:
353+
newCh <- vals
354+
wg.Done()
355+
}
356+
}
357+
go func() {
358+
wg.Wait()
359+
close(newCh)
360+
}()
361+
chData = newCh
339362
default:
340363
c.Log().Error().Err(errors.New("unknown type val")).Msg("failed write clickhouse")
341364
return

0 commit comments

Comments
 (0)