Skip to content

[RFC] Support Array type: Calcite Schema-on-Read #4943

@LantaoJin

Description

@LantaoJin

Problem Statement
PPL with Calcite enabling cannot handle the fields with array values. The issue was described in #4173.

Current State
#4909 provided a mitigation that handle those array columns by picking the first value instead of failure as a short-term solution. But array values are common usage in OpenTelemetry metrics/logs, as a long-term solution, Calcite should identify whether a field/column is a real array type or not. But this information is not provided in OpenSearch Mapping information, for example
The mapping in OpenSearch is

{
  "mappings": {
    "properties": {
      "nums": {
        "type": "long"
      },
      "others": {
        "type": "keyword"
      }
    }
  }
}

But the value stored in nums is

{ "nums": [1, 2, 3], "others": ["a", "b", "c"] }

Assumption
We assume that a field is either entirely array type or entirely non-array type. Cases where some documents are arrays and some are non-arrays are not within the scope of this solution, at least not in the OpenTeleMetry scenario.

Long-Term Goals

  • Schema in Calcite (a table name (String) with its RelDataType) could restore the correct data type (Array Type or Non-Array Type) on-the-fly.
  • The feature can be disabled to strong schema mode (force using the OpenSearch existed mapping)
  • When the feature enabled, no specific performance downgrade, 5% or less could be acceptable when enabled.

Proposal
Build Calcite table/index schema-on-read for OpenSearch datasource.

Approach
Background: Current, Calcite builds its Catalog in memory by registering a table and its schema at the first query by user (lazy). During the table registering, we send a GetMappingsRequest via node client to server and convert the mapping to a RelDataType. Since there is no array type in OpenSearch, the column type of nums is converted to Long and others is converted to String (in above example).

The new approach includes serval steps:

  1. Send a GetMappingsRequest to server
  2. When feature is enabled, send a MultiSearchRequest to server
GET /_msearch
{ "index": "test"}
{ "query": {"exists": { "field": "nums" } }, "size": 1, "_source": "nums" }
{ "index": "test"}
{ "query": {"exists": { "field": "others" } }, "size": 1, "_source": "others" }
  1. If the SearchResponse returns a Array results, mapping the Long type to Array<Long>.
  2. If the SearchResponse returns empty, means no data in this existed field, submits the same SearchResponse for this field in an async scheduler which will refresh schema (replace the existed table schema) in a configurable period (e.g. 10 mins, 30min, 1 hour), until returns data or exceeds the max refresh limitation (3 for example).

Alternative 1
Add ActionFilter to monitor the Index Actions, such as IndexCreate, IndexDelete, Bulk, etc. rather than above push mode.
Alternative 2
Store all columns as VARIANT type.
Implementation Discussion

  • ActionFilter (mentioned in alternative 2) can reduce the frequent async queries but it may have additional cost to intercept all actions in transport layer.
  • VARIANT solution (mentioned in alternative 2) is a big changing which also has performance downgrade risk, especially on a large dataset.
  • This solution has less changes and no significant performance downgrade on a large dataset.

Metadata

Metadata

Assignees

No one assigned

    Labels

    PPLPiped processing language

    Type

    No type

    Projects

    Status

    Not Started

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions