Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ var queryGetObjectsTables string
//go:embed queries/get_objects_terse_catalogs.sql
var queryGetObjectsTerseCatalogs string

const geographyGeoArrowJson = `{"crs":"EPSG:4326","edges":"spherical"}`

type snowflakeConn interface {
driver.Conn
driver.ConnBeginTx
Expand Down Expand Up @@ -524,12 +526,10 @@ func (c *connectionImpl) toArrowField(columnInfo driverbase.ColumnInfo) arrow.Fi
field.Type = arrow.BinaryTypes.Binary
field.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
"ARROW:extension:metadata": geographyGeoArrowJson,
})
case "GEOMETRY":
// With GEOMETRY_OUTPUT_FORMAT=WKB, data arrives as binary WKB.
// TODO: SRID for GEOMETRY requires inspecting data or a separate query.
// Same cross-driver issue as adbc-drivers/redshift#2 and adbc-drivers/databricks#350.
field.Type = arrow.BinaryTypes.Binary
field.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
Expand Down Expand Up @@ -582,7 +582,7 @@ func descToField(name, typ, isnull, primary string, comment sql.NullString, useH
field.Type = arrow.BinaryTypes.Binary
field.Metadata = arrow.MetadataFrom(map[string]string{
"ARROW:extension:name": "geoarrow.wkb",
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
"ARROW:extension:metadata": geographyGeoArrowJson,
})
case "GEOMETRY":
field.Type = arrow.BinaryTypes.Binary
Expand Down
94 changes: 67 additions & 27 deletions go/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,9 +553,12 @@ func (st *statement) executeIngest(ctx context.Context) (int64, error) {
// Build the COPY query. If the schema has geo columns, this is a COPY
// transform that converts WKB/WKT → GEOGRAPHY/GEOMETRY inline during
// COPY INTO; otherwise it is the plain copy query.
copyQ, geoOverrides := st.buildCopyQuery(schema)
copyQ, geoOverrides, err := st.buildCopyQuery(schema)
if err != nil {
return -1, err
}

err := st.initIngest(ctx, geoOverrides)
err = st.initIngest(ctx, geoOverrides)
if err != nil {
return -1, err
}
Expand All @@ -577,9 +580,9 @@ func (st *statement) executeIngest(ctx context.Context) (int64, error) {
// Geo column detection covers both arrow.EXTENSION types and
// ARROW:extension:name field metadata so that data arriving over the C Data
// Interface (where extension types are not registered) is also recognized.
func (st *statement) buildCopyQuery(schema *arrow.Schema) (string, map[string]string) {
func (st *statement) buildCopyQuery(schema *arrow.Schema) (string, map[string]string, error) {
if schema == nil {
return copyQuery, nil
return copyQuery, nil, nil
}

// Detect geo columns: either a registered arrow.ExtensionType or the
Expand All @@ -603,21 +606,21 @@ func (st *statement) buildCopyQuery(schema *arrow.Schema) (string, map[string]st
}

switch extName {
case "geoarrow.wkb", "geoarrow.wkb_view", "geoarrow.wkt", "geoarrow.wkt_view":
case "geoarrow.wkb", "geoarrow.wkt":
geoCols = append(geoCols, geoCol{name: f.Name, extName: extName, extMeta: extMeta})
}
}

if len(geoCols) == 0 {
return copyQuery, nil
return copyQuery, nil, nil
}

// Build a COPY transform with inline geo conversion. Each geo column's
// target type is resolved per-column so a non-4326 CRS can promote that
// column to GEOMETRY while sibling 4326 columns stay GEOGRAPHY.
geoOverrides := make(map[string]string, len(geoCols))
var selectCols []string
for _, f := range schema.Fields() {
for fieldIndex, f := range schema.Fields() {
quoted := quoteIdentifier(f.Name)
parqRef := fmt.Sprintf("$1:%s", quoted)

Expand All @@ -638,7 +641,11 @@ func (st *statement) buildCopyQuery(schema *arrow.Schema) (string, map[string]st

// Geo column: apply conversion function.
isWKB := strings.Contains(gc.extName, "wkb")
geoType := st.ingestOptions.resolveGeoType(gc.extMeta)
geoType, err := st.ingestOptions.resolveGeoType(fieldIndex, f.Name, gc.extMeta)
if err != nil {
return "", nil, err
}

geoOverrides[gc.name] = geoType
var expr string
if geoType == "geography" {
Expand All @@ -648,7 +655,7 @@ func (st *statement) buildCopyQuery(schema *arrow.Schema) (string, map[string]st
expr = fmt.Sprintf("TRY_TO_GEOGRAPHY(%s::VARCHAR) AS %s", parqRef, quoted)
}
} else {
srid := extractSRIDFromMeta(gc.extMeta)
srid, _ := extractSRIDFromMeta(gc.extMeta)
if srid != 0 {
if isWKB {
expr = fmt.Sprintf("ST_SETSRID(TO_GEOMETRY(%s::BINARY), %d) AS %s", parqRef, srid, quoted)
Expand All @@ -671,35 +678,45 @@ func (st *statement) buildCopyQuery(schema *arrow.Schema) (string, map[string]st
strings.Join(selectCols, ", "),
bindStageName,
)
return transformQ, geoOverrides
return transformQ, geoOverrides, nil
}

// resolveGeoType picks the Snowflake target type for a single geoarrow column.
// When the user has set ingest_geo_type explicitly, that value is honored for
// every column (current behavior). Otherwise the column's CRS metadata decides:
// any non-EPSG:4326 SRID promotes the column to GEOMETRY so the SRID survives
// the round trip; missing CRS, EPSG:4326, or unparsable CRS stays GEOGRAPHY.
func (opts *ingestOptions) resolveGeoType(extMeta string) string {
func (opts *ingestOptions) resolveGeoType(fieldIndex int, fieldName string, extMeta string) (string, error) {
if opts.geoTypeExplicit {
return opts.geoType
return opts.geoType, nil
}
srid := extractSRIDFromMeta(extMeta)
if srid != 0 && srid != 4326 {
return "geometry"
srid, edges := extractSRIDFromMeta(extMeta)
if srid == 4326 && edges == "spherical" {
return "geography", nil
} else if edges == "spherical" {
// Snowflake GEOGRAPHY is always SRID 4326, so if the user
// specified spherical edges but a different SRID, we should
// error/ask them to explicitly set the geo type
return "", adbc.Error{
Msg: fmt.Sprintf("[snowflake] field #%d (%s) is a GeoArrow array with spherical edges but an SRID of %d; Snowflake GEOGRAPHY is always SRID 4326, so explicitly set %s to choose whether to ingest this as GEOGRAPHY or GEOMETRY", fieldIndex+1, quoteIdentifier(fieldName), srid, OptionStatementIngestGeoType),
Code: adbc.StatusInvalidData,
}
}
return "geography"
return "geometry", nil
}

// extractSRIDFromMeta extracts the SRID from geoarrow extension metadata string.
// The metadata is a JSON string that may contain a "crs" field.
// extractSRIDFromMeta extracts the SRID and edges from GeoArrow extension
// metadata string. The metadata is a JSON string that may contain a "crs"
// field.
//
// Supported formats:
// - PROJJSON: {"crs": {"id": {"authority": "EPSG", "code": 4326}}}
// - Simple string: "EPSG:4326" (as CRS value)
//
// Returns 0 if no SRID can be determined.
func extractSRIDFromMeta(metadata string) int {
func extractSRIDFromMeta(metadata string) (int, string) {
if metadata == "" {
return 0
return 0, ""
}

type projID struct {
Expand All @@ -709,38 +726,61 @@ func extractSRIDFromMeta(metadata string) int {
type projCRS struct {
ID projID `json:"id"`
}

type projIDString struct {
Authority string `json:"authority"`
Code string `json:"code"`
}
type projCRSString struct {
ID projIDString `json:"id"`
}

type geoarrowMeta struct {
CRS json.RawMessage `json:"crs"`
CRS json.RawMessage `json:"crs"`
Edges string `json:"edges"`
}

var meta geoarrowMeta
if err := json.Unmarshal([]byte(metadata), &meta); err != nil {
return 0
return 0, ""
}

if len(meta.CRS) == 0 {
return 0
return 0, meta.Edges
}

// CRS can be a string like "EPSG:4326" or a PROJJSON object
var crsStr string
if err := json.Unmarshal(meta.CRS, &crsStr); err == nil {
if strings.HasPrefix(crsStr, "EPSG:") {
if code, err := strconv.Atoi(crsStr[5:]); err == nil {
return code
return code, meta.Edges
}
} else if crsStr == "OGC:CRS84" {
return 4326, meta.Edges
}
return 0
return 0, meta.Edges
}
Comment on lines 754 to 763

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other one that will show up here that is identical to "EPSG:4326" is "OGC:CRS84" (I wish this weren't true for what it's worth 😬 ). You can return an SRID of 4326 for this case.


var crs projCRS
if err := json.Unmarshal(meta.CRS, &crs); err == nil {
if strings.EqualFold(crs.ID.Authority, "EPSG") && crs.ID.Code != 0 {
return crs.ID.Code
return crs.ID.Code, meta.Edges
}
}
Comment on lines 766 to 770

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other one that will show up here is {"authority":"OGC","code":"CRS84"}. In Arrow C++ we also handle {"authority":"EPSG","code":"4326"} (where 4326 is a string) but I've never seen that in the wild.


var crsString projCRSString
if err := json.Unmarshal(meta.CRS, &crsString); err == nil {
if strings.EqualFold(crsString.ID.Authority, "EPSG") {
if code, err := strconv.Atoi(crsString.ID.Code); err == nil {
return code, meta.Edges
}
} else if strings.EqualFold(crsString.ID.Authority, "OGC") && strings.EqualFold(crsString.ID.Code, "CRS84") {
return 4326, meta.Edges
}
}

return 0
return 0, meta.Edges
}

// ExecuteQuery executes the current query or prepared statement
Expand Down
78 changes: 65 additions & 13 deletions go/statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ func TestToSnowflakeType(t *testing.T) {
func TestBuildCopyQueryDetectsGeoViaFieldMetadata(t *testing.T) {
geoMeta := arrow.NewMetadata(
[]string{"ARROW:extension:name", "ARROW:extension:metadata"},
[]string{"geoarrow.wkb", ""},
[]string{"geoarrow.wkb", `{"crs":"EPSG:4326", "edges":"spherical"}`},
)
schema := arrow.NewSchema([]arrow.Field{
{Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
{Name: "geom", Type: arrow.BinaryTypes.Binary, Nullable: true, Metadata: geoMeta},
}, nil)

st := &statement{ingestOptions: DefaultIngestOptions()}
copyQ, overrides := st.buildCopyQuery(schema)
copyQ, overrides, err := st.buildCopyQuery(schema)
assert.NoError(t, err)

assert.Equal(t, "geography", overrides["geom"], "geo column must be created as GEOGRAPHY")
assert.NotEqual(t, copyQuery, copyQ, "must not fall back to plain copyQuery when geo cols are present")
Expand All @@ -113,7 +114,8 @@ func TestBuildCopyQueryGeometryWithSRID(t *testing.T) {
}, nil)

st := &statement{ingestOptions: DefaultIngestOptions()}
copyQ, overrides := st.buildCopyQuery(schema)
copyQ, overrides, err := st.buildCopyQuery(schema)
assert.NoError(t, err)

assert.Equal(t, "geometry", overrides["geom"])
assert.Contains(t, copyQ, "ST_SETSRID(TO_GEOMETRY")
Expand All @@ -138,7 +140,8 @@ func TestBuildCopyQueryExplicitGeoTypeOverrides(t *testing.T) {
opts.geoType = "geometry"
opts.geoTypeExplicit = true
st := &statement{ingestOptions: opts}
copyQ, overrides := st.buildCopyQuery(schema)
copyQ, overrides, err := st.buildCopyQuery(schema)
assert.NoError(t, err)

assert.Equal(t, "geometry", overrides["geom"], "explicit geoType must override CRS-derived default")
assert.Contains(t, copyQ, "TO_GEOMETRY")
Expand All @@ -158,7 +161,8 @@ func TestBuildCopyQueryQuotesExoticColumnNames(t *testing.T) {
}, nil)

st := &statement{ingestOptions: DefaultIngestOptions()}
copyQ, _ := st.buildCopyQuery(schema)
copyQ, _, err := st.buildCopyQuery(schema)
assert.NoError(t, err)

// Snowflake escapes embedded " by doubling: weird"geom → "weird""geom"
assert.Contains(t, copyQ, `"weird""geom"`, "column with embedded quote must use Snowflake doubled-quote escaping")
Expand All @@ -174,7 +178,8 @@ func TestBuildCopyQueryNoGeoColumnsReturnsPlainCopy(t *testing.T) {
}, nil)

st := &statement{ingestOptions: DefaultIngestOptions()}
copyQ, overrides := st.buildCopyQuery(schema)
copyQ, overrides, err := st.buildCopyQuery(schema)
assert.NoError(t, err)

assert.Empty(t, overrides)
assert.Equal(t, copyQuery, copyQ, "non-geo schemas must use the plain copyQuery constant verbatim")
Expand All @@ -185,18 +190,65 @@ func TestExtractSRIDFromMeta(t *testing.T) {
name string
metadata string
expected int
edges string
}{
{"empty", "", 0},
{"PROJJSON 4326", `{"crs":{"id":{"authority":"EPSG","code":4326}}}`, 4326},
{"EPSG string", `{"crs":"EPSG:3857"}`, 3857},
{"no CRS", `{"edges":"planar"}`, 0},
{"null CRS", `{"crs":null}`, 0},
{"invalid JSON", `not json`, 0},
{"empty", "", 0, ""},
{"PROJJSON 4326", `{"crs":{"id":{"authority":"EPSG","code":4326}}}`, 4326, ""},
{"PROJJSON 4326 with edges", `{"crs":{"id":{"authority":"EPSG","code":4326}},"edges":"spherical"}`, 4326, "spherical"},
{"PROJJSON CRS84", `{"crs":{"id":{"authority":"OGC","code":"CRS84"}},"edges":"spherical"}`, 4326, "spherical"},
{"PROJJSON EPSG", `{"crs":{"id":{"authority":"EPSG","code":"4326"}},"edges":"spherical"}`, 4326, "spherical"},
{"EPSG string", `{"crs":"EPSG:3857"}`, 3857, ""},
{"EPSG string with edges", `{"crs":"EPSG:3857","edges":"spherical"}`, 3857, "spherical"},
{"no CRS", `{"edges":"planar"}`, 0, "planar"},
{"null CRS", `{"crs":null}`, 0, ""},
{"OGC", `{"crs":"OGC:CRS84"}`, 4326, ""},
{"OGC with edges", `{"crs":"OGC:CRS84","edges":"spherical"}`, 4326, "spherical"},
{"invalid JSON", `not json`, 0, ""},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, extractSRIDFromMeta(tt.metadata))
srid, edges := extractSRIDFromMeta(tt.metadata)
assert.Equal(t, tt.expected, srid)
assert.Equal(t, tt.edges, edges)
})
}
}

func TestResolveGeoType(t *testing.T) {
opts := &ingestOptions{}

ty, err := opts.resolveGeoType(0, "geom", `{"crs":"EPSG:4326"}`)
assert.NoError(t, err)
assert.Equal(t, "geometry", ty)

ty, err = opts.resolveGeoType(0, "geom", `{"crs":"EPSG:3857"}`)
assert.NoError(t, err)
assert.Equal(t, "geometry", ty)

ty, err = opts.resolveGeoType(0, "geom", `{"crs":"EPSG:4326", "edges":"spherical"}`)
assert.NoError(t, err)
assert.Equal(t, "geography", ty)

_, err = opts.resolveGeoType(0, "geom", `{"crs":"EPSG:3857", "edges":"spherical"}`)
assert.ErrorContains(t, err, `field #1 ("geom") is a GeoArrow array with spherical edges`)

// explicit geoType should override CRS-derived default
opts.geoType = "geometry"
opts.geoTypeExplicit = true
ty, err = opts.resolveGeoType(0, "geom", `{"crs":"EPSG:4326"}`)
assert.NoError(t, err)
assert.Equal(t, "geometry", ty)

opts.geoType = "geography"
opts.geoTypeExplicit = true
ty, err = opts.resolveGeoType(0, "geom", `{"crs":"EPSG:3857"}`)
assert.NoError(t, err)
assert.Equal(t, "geography", ty)

opts.geoType = "geography"
opts.geoTypeExplicit = true
ty, err = opts.resolveGeoType(0, "geom", `{"crs":"EPSG:3857", "edges":"spherical"}`)
assert.NoError(t, err)
assert.Equal(t, "geography", ty)
}
Loading