Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion csharp/arrow-adbc
Submodule arrow-adbc updated 588 files
4 changes: 2 additions & 2 deletions go/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ func (c *connectionImpl) GetObjects(ctx context.Context, depth adbc.ObjectDepth,
xdbcDataType := driverbase.ToXdbcDataType(field.Type)

if field.Type != nil {
getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcDataType = new(int16(field.Type.ID()))
getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcDataType = driverbase.Nullable(int16(field.Type.ID()))
}
getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcSqlDataType = new(int16(xdbcDataType))
getObjectsCatalog.CatalogDbSchemas[i].DbSchemaTables[j].TableColumns[k].XdbcSqlDataType = driverbase.Nullable(int16(xdbcDataType))
}
}
}
Expand Down
27 changes: 11 additions & 16 deletions go/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ type databaseImpl struct {
defaultAppName string
}

//nolint:staticcheck // ignore snowflake deprecated warnings for now
func (d *databaseImpl) GetOption(ctx context.Context, key string) (string, error) {
switch key {
case adbc.OptionKeyUsername:
Expand Down Expand Up @@ -121,13 +120,13 @@ func (d *databaseImpl) GetOption(ctx context.Context, key string) (string, error
case OptionAuthType:
return d.cfg.Authenticator.String(), nil
case OptionLoginTimeout:
return strconv.FormatFloat(d.cfg.LoginTimeout.Seconds(), 'f', -1, 64), nil
return strconv.FormatFloat(d.cfg.LoginTimeout.Seconds(), 'f', -1, 64), nil //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionRequestTimeout:
return strconv.FormatFloat(d.cfg.RequestTimeout.Seconds(), 'f', -1, 64), nil
return strconv.FormatFloat(d.cfg.RequestTimeout.Seconds(), 'f', -1, 64), nil //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionJwtExpireTimeout:
return strconv.FormatFloat(d.cfg.JWTExpireTimeout.Seconds(), 'f', -1, 64), nil
return strconv.FormatFloat(d.cfg.JWTExpireTimeout.Seconds(), 'f', -1, 64), nil //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionClientTimeout:
return strconv.FormatFloat(d.cfg.ClientTimeout.Seconds(), 'f', -1, 64), nil
return strconv.FormatFloat(d.cfg.ClientTimeout.Seconds(), 'f', -1, 64), nil //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionApplicationName:
return d.cfg.Application, nil
case OptionSSLSkipVerify:
Expand Down Expand Up @@ -164,7 +163,7 @@ func (d *databaseImpl) GetOption(ctx context.Context, key string) (string, error
}
return adbc.OptionValueDisabled, nil
case OptionLogTracing:
return d.cfg.Tracing, nil
return d.cfg.Tracing, nil //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionClientConfigFile:
return d.cfg.ClientConfigFile, nil
case OptionUseHighPrecision:
Expand Down Expand Up @@ -209,7 +208,6 @@ func ParseSnowflakeURI(uri string) (*gosnowflake.Config, error) {
return gosnowflake.ParseDSN(uri)
}

//nolint:staticcheck // ignore snowflake deprecated warnings for now
func (d *databaseImpl) SetOptions(ctx context.Context, cnOptions map[string]string) error {
uri, ok := cnOptions[adbc.OptionKeyURI]
if ok {
Expand All @@ -227,8 +225,7 @@ func (d *databaseImpl) SetOptions(ctx context.Context, cnOptions map[string]stri
}
// XXX(https://github.com/apache/arrow-adbc/issues/2792): Snowflake
// has a tendency to spam the log by default, so set the log level

d.cfg.Tracing = "fatal"
d.cfg.Tracing = "fatal" //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now

// set default application name to track
// unless user overrides it
Expand All @@ -246,8 +243,6 @@ func (d *databaseImpl) SetOptions(ctx context.Context, cnOptions map[string]stri
// SetOptionInternal sets the option for the database.
//
// cnOptions is nil if the option is being set post-initialiation.
//
//nolint:staticcheck // ignore snowflake deprecated warnings for now
func (d *databaseImpl) SetOptionInternal(k string, v string, cnOptions *map[string]string) error {
var err error
var ok bool
Expand Down Expand Up @@ -301,7 +296,7 @@ func (d *databaseImpl) SetOptionInternal(k string, v string, cnOptions *map[stri
if dur < 0 {
dur = -dur
}
d.cfg.LoginTimeout = dur
d.cfg.LoginTimeout = dur //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionRequestTimeout:
dur, err := time.ParseDuration(v)
if err != nil {
Expand All @@ -313,7 +308,7 @@ func (d *databaseImpl) SetOptionInternal(k string, v string, cnOptions *map[stri
if dur < 0 {
dur = -dur
}
d.cfg.RequestTimeout = dur
d.cfg.RequestTimeout = dur //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionJwtExpireTimeout:
dur, err := time.ParseDuration(v)
if err != nil {
Expand All @@ -325,7 +320,7 @@ func (d *databaseImpl) SetOptionInternal(k string, v string, cnOptions *map[stri
if dur < 0 {
dur = -dur
}
d.cfg.JWTExpireTimeout = dur
d.cfg.JWTExpireTimeout = dur //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionClientTimeout:
dur, err := time.ParseDuration(v)
if err != nil {
Expand All @@ -337,7 +332,7 @@ func (d *databaseImpl) SetOptionInternal(k string, v string, cnOptions *map[stri
if dur < 0 {
dur = -dur
}
d.cfg.ClientTimeout = dur
d.cfg.ClientTimeout = dur //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionApplicationName:
if !strings.HasPrefix(v, "[ADBC]") {
v = d.defaultAppName + v
Expand Down Expand Up @@ -513,7 +508,7 @@ func (d *databaseImpl) SetOptionInternal(k string, v string, cnOptions *map[stri
}
}
case OptionLogTracing:
d.cfg.Tracing = v
d.cfg.Tracing = v //nolint:staticcheck,nolintlint // ignore snowflake deprecated warnings for now
case OptionClientConfigFile:
d.cfg.ClientConfigFile = v
case OptionUseHighPrecision:
Expand Down
1 change: 1 addition & 0 deletions go/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (s *SnowflakeQuirks) SupportsConcurrentStatements() bool { return
func (s *SnowflakeQuirks) SupportsCurrentCatalogSchema() bool { return true }
func (s *SnowflakeQuirks) SupportsExecuteSchema() bool { return true }
func (s *SnowflakeQuirks) SupportsGetSetOptions() bool { return true }
func (s *SnowflakeQuirks) SupportsGetTableSchema() bool { return true }
func (s *SnowflakeQuirks) SupportsPartitionedData() bool { return false }
func (s *SnowflakeQuirks) SupportsStatistics() bool { return true }
func (s *SnowflakeQuirks) SupportsTransactions() bool { return true }
Expand Down
10 changes: 5 additions & 5 deletions go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ module github.com/adbc-drivers/snowflake/go
go 1.26.1

require (
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260531215508-9a56b1c7bd6d
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20260427080211-5b908ab0cfd8
github.com/adbc-drivers/driverbase-go/validation v0.0.0-20260427080211-5b908ab0cfd8
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260608064711-7f3f9a9f3990
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20260608002410-49f9e21a1d4a
github.com/adbc-drivers/driverbase-go/validation v0.0.0-20260608064711-7f3f9a9f3990
github.com/apache/arrow-adbc/go/adbc v1.11.0
github.com/apache/arrow-go/v18 v18.6.0
github.com/google/uuid v1.6.0
github.com/snowflakedb/gosnowflake/v2 v2.0.2
github.com/snowflakedb/gosnowflake/v2 v2.1.0
github.com/stretchr/testify v1.11.1
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78
go.opentelemetry.io/otel v1.44.0
Expand Down Expand Up @@ -89,7 +89,7 @@ require (
go.opentelemetry.io/otel/sdk v1.44.0 // indirect
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
golang.org/x/crypto v0.52.0 // indirect
golang.org/x/exp v0.0.0-20260529124908-c761662dc8c9 // indirect
golang.org/x/exp v0.0.0-20260603202125-055de637280b // indirect
golang.org/x/mod v0.36.0 // indirect
golang.org/x/net v0.55.0 // indirect
golang.org/x/oauth2 v0.36.0 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ github.com/AzureAD/microsoft-authentication-library-for-go v1.7.2 h1:RHK7bS+HQMs
github.com/AzureAD/microsoft-authentication-library-for-go v1.7.2/go.mod h1:HKpQxkWaGLJ+D/5H8QRpyQXA1eKjxkFlOMwck5+33Jk=
github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk=
github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260531215508-9a56b1c7bd6d h1:xv3wwpcS1ByVK0KYqUmNsxVUvTuZtfoAm5foFQ9a8oQ=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260531215508-9a56b1c7bd6d/go.mod h1:doFujhe7BcZTCWPPFlzT34PEAKyV7uqQgGIa2Teoxg8=
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20260427080211-5b908ab0cfd8 h1:cMopE5au+VwajOS3B3netWOJb4d/1hJ1O/1ez6ilsTM=
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20260427080211-5b908ab0cfd8/go.mod h1:uKkYVA+iUXij7mPFproEi6sOIWFVNOT87UcqucqqnBg=
github.com/adbc-drivers/driverbase-go/validation v0.0.0-20260427080211-5b908ab0cfd8 h1:AbVlFTLcSlVOrVuPC6guN5dhuEWFW1Nof/eTm8xdkog=
github.com/adbc-drivers/driverbase-go/validation v0.0.0-20260427080211-5b908ab0cfd8/go.mod h1:Tq+xMUZfJqVDBtdvzzn8BQoj6psg6INAngiAUZQhmTA=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260608064711-7f3f9a9f3990 h1:b8vaQHcgHqmalpmIqqPF+bVWrMQISUUjxwFfRbT0uy0=
github.com/adbc-drivers/driverbase-go/driverbase v0.0.0-20260608064711-7f3f9a9f3990/go.mod h1:rCDugBvob6aEFZu7zJ9SqPSyyys6ipNWcsfvzF/Ns74=
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20260608002410-49f9e21a1d4a h1:j1a8iNxRTV6ZaHnEuH8/NVlBMrBejVTBPxu4wueB72c=
github.com/adbc-drivers/driverbase-go/testutil v0.0.0-20260608002410-49f9e21a1d4a/go.mod h1:dUNGWra6WmjWWeqEC6WcFZFuw6wrbJNjcZIiS0REizQ=
github.com/adbc-drivers/driverbase-go/validation v0.0.0-20260608064711-7f3f9a9f3990 h1:pHtKruOLTXwNrK1IJVjt+f00lGPOXFQaWFuUjvISZjQ=
github.com/adbc-drivers/driverbase-go/validation v0.0.0-20260608064711-7f3f9a9f3990/go.mod h1:UDTbsQIddSqov8hUA+rrchZEyzv9P5pqDOZGZR3kthk=
github.com/andybalholm/brotli v1.2.1 h1:R+f5xP285VArJDRgowrfb9DqL18yVK0gKAW/F+eTWro=
github.com/andybalholm/brotli v1.2.1/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY=
github.com/apache/arrow-adbc/go/adbc v1.11.0 h1:zcSLtV8CQ27chkYWZmySvd4+pkkDtWhRtHz0LpglRAU=
Expand Down Expand Up @@ -126,8 +126,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/snowflakedb/gosnowflake/v2 v2.0.2 h1:8UZo+v1T2Y9sgoPk3JYT3RatAUd9o6q6yjL40TyHluA=
github.com/snowflakedb/gosnowflake/v2 v2.0.2/go.mod h1:c0hIqJ/dxgaMl7g1o8n4Ca3Mf5YCiiVx9igio/PNqC8=
github.com/snowflakedb/gosnowflake/v2 v2.1.0 h1:rfjs6NAMnbLKCBYlOarqQX/UKgQVrXi43TZNHCP5/jw=
github.com/snowflakedb/gosnowflake/v2 v2.1.0/go.mod h1:c0hIqJ/dxgaMl7g1o8n4Ca3Mf5YCiiVx9igio/PNqC8=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
Expand Down Expand Up @@ -166,8 +166,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.52.0 h1:RMs7fP2rXdep0CftQlK8Uf+kibLm7qkCcradZWYz988=
golang.org/x/crypto v0.52.0/go.mod h1:1QgfPxDqh0T2M/elOJtp9RvuR95kVjir0e6/BvEmGbc=
golang.org/x/exp v0.0.0-20260529124908-c761662dc8c9 h1:4d4PbuBNwaxMXkXI8yiIYjydtMU+04RHeuSxJdgKftM=
golang.org/x/exp v0.0.0-20260529124908-c761662dc8c9/go.mod h1:d2fgXJLVs4dYDHUk5lwMIfzRzSrWCfGZb0ZqeLa/Vcw=
golang.org/x/exp v0.0.0-20260603202125-055de637280b h1:v1uXiEBHo8QA0LiGCo7UgHMzHT4Kdfpl2zmtH5vaP1Q=
golang.org/x/exp v0.0.0-20260603202125-055de637280b/go.mod h1:d2fgXJLVs4dYDHUk5lwMIfzRzSrWCfGZb0ZqeLa/Vcw=
golang.org/x/mod v0.36.0 h1:JJjpVx6myfUsUdAzZuOSTTmRE0PfZeNWzzvKrP7amb4=
golang.org/x/mod v0.36.0/go.mod h1:moc6ELqsWcOw5Ef3xVprK5ul/MvtVvkIXLziUOICjUQ=
golang.org/x/net v0.55.0 h1:bcvxaJn3e1U6InsFWt1JUq1aSjnRxLzT2rtD2KfkDF8=
Expand Down
1 change: 1 addition & 0 deletions go/record_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,6 +1257,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
trace.SpanFromContext(ctx).AddEvent("newRecordReader", trace.WithAttributes(
attribute.Int("batches", len(batches)),
attribute.Int64("totalRows", ld.TotalRows()),
attribute.Bool("streamRetryEnabled", streamRetryEnabled),
attribute.Int("jsonDataLen", len(ld.JSONData())),
))

Expand Down
146 changes: 146 additions & 0 deletions go/record_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -807,3 +808,148 @@ func TestReaderCancellationSetsErrBeforeNextAndReleaseReturns(t *testing.T) {
require.FailNow(t, "reader.Release should not block after cancellation")
}
}

// shortReadStream delivers data in chunks no larger than chunkSize per Read.
// If errAtOffset > 0, it returns injectedErr once that many bytes have been
// delivered, simulating mid-frame truncation (gosnowflake#1781).
type shortReadStream struct {
data []byte
pos int
chunkSize int
errAtOffset int
injectedErr error
}

func (s *shortReadStream) Read(p []byte) (int, error) {
if s.errAtOffset > 0 && s.pos >= s.errAtOffset {
return 0, s.injectedErr
}
if s.pos >= len(s.data) {
return 0, io.EOF
}
n := min(len(p), s.chunkSize)
remaining := len(s.data) - s.pos
if n > remaining {
n = remaining
}
if s.errAtOffset > 0 && s.pos+n > s.errAtOffset {
n = s.errAtOffset - s.pos
}
copy(p, s.data[s.pos:s.pos+n])
s.pos += n
return n, nil
}

func (s *shortReadStream) Close() error { return nil }

func streamShortReads(data []byte, chunkSize int) func(context.Context) (io.ReadCloser, error) {
return func(context.Context) (io.ReadCloser, error) {
return &shortReadStream{data: data, chunkSize: chunkSize}, nil
}
}

func streamShortReadsThenError(data []byte, chunkSize, errAtOffset int, err error) func(context.Context) (io.ReadCloser, error) {
return func(context.Context) (io.ReadCloser, error) {
return &shortReadStream{
data: data,
chunkSize: chunkSize,
errAtOffset: errAtOffset,
injectedErr: err,
}, nil
}
}

// Sanity check that legal short reads alone (n < len(p), nil err) don't break
// IPC decoding — the failure in gosnowflake#1781 requires a mid-frame error.
func TestTryReadBatch_ShortReadsSucceed(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

schema := testSchema()
rec := buildTestRecord(alloc, schema, []int64{1, 2, 3, 4, 5})
defer rec.Release()

data := buildIPCBytes(alloc, schema, []arrow.RecordBatch{rec})
batch := &mockBatch{numRows: 5, streams: []func(context.Context) (io.ReadCloser, error){
streamShortReads(data, 1),
}}

recs, err := tryReadBatch(context.Background(), batch, alloc, identityTransform)
require.NoError(t, err)
require.Len(t, recs, 1)
defer recs[0].Release()
assert.EqualValues(t, 5, recs[0].NumRows())
}

// Reproduces gosnowflake#1781 and validates the streamRetryEnabled flag:
// with retries disabled (matching the production "no retry" branch in
// newRecordReader) the mid-frame IPC error surfaces; with retries enabled
// the same broken first stream is recovered by a second attempt.
func TestStreamRetryEnabled_RecoversFromShortReadMidFrameError(t *testing.T) {
schema := testSchema()

cases := []struct {
name string
streamRetryEnabled bool
expectErr bool
}{
{name: "disabled_surfacesIPCError", streamRetryEnabled: false, expectErr: true},
{name: "enabled_recovers", streamRetryEnabled: true, expectErr: false},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
alloc := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer alloc.AssertSize(t, 0)

rec := buildTestRecord(alloc, schema, []int64{11, 22, 33, 44})
defer rec.Release()

data := buildIPCBytes(alloc, schema, []arrow.RecordBatch{rec})
require.Greater(t, len(data), 32)
truncationOffset := len(data) - 8

batch := &mockBatch{numRows: 4, streams: []func(context.Context) (io.ReadCloser, error){
streamShortReadsThenError(data, 16, truncationOffset, io.ErrUnexpectedEOF),
streamShortReads(data, 16),
}}

// Mirror the production dispatch in newRecordReader.
ctx := context.Background()
var recs []arrow.RecordBatch
var err error
if tc.streamRetryEnabled {
recs, err = readBatchRecords(ctx, batch, alloc, identityTransform, defaultStreamMaxRetries)
} else {
out := make(chan arrow.RecordBatch, 4)
target := newBatchStreamTarget(0, batch, out, nil)
err = streamBatchToChannel(ctx, 0, batch, alloc, identityTransform, target)
close(out)
for r := range out {
recs = append(recs, r)
}
}

if tc.expectErr {
require.Error(t, err, "expected IPC error to surface without retry")
msg := err.Error()
assert.True(t,
strings.Contains(msg, "could not read message body") ||
strings.Contains(msg, "unexpected EOF") ||
strings.Contains(msg, "row count mismatch"),
"expected IPC body-read failure, got: %s", msg,
)
for _, r := range recs {
r.Release()
}
return
}

require.NoError(t, err, "retry should recover from mid-frame truncation")
require.Len(t, recs, 1)
defer recs[0].Release()
assert.EqualValues(t, 4, recs[0].NumRows())
assert.Equal(t, 2, batch.call, "retry should have been invoked")
})
}
}
5 changes: 5 additions & 0 deletions go/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ func (st *statement) GetOption(ctx context.Context, key string) (string, error)
switch key {
case OptionStatementQueryTag:
return st.queryTag, nil
case OptionStreamRetryEnabled:
if st.streamRetryEnabled {
return adbc.OptionValueEnabled, nil
}
return adbc.OptionValueDisabled, nil
default:
return st.Base().GetOption(ctx, key)
}
Expand Down
Loading