Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SPath extends UnresolvedPlan {

@Nullable private final String outField;

private final String path;
@Nullable private final String path;

@Override
public UnresolvedPlan attach(UnresolvedPlan child) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,50 @@ public RelNode visitParse(Parse node, CalcitePlanContext context) {

@Override
public RelNode visitSpath(SPath node, CalcitePlanContext context) {
return visitEval(node.rewriteAsEval(), context);
if (node.getPath() != null) {
return visitEval(node.rewriteAsEval(), context);
} else {
return spathExtractAll(node, context);
}
}

private RelNode spathExtractAll(SPath node, CalcitePlanContext context) {
visitChildren(node, context);

// 1. Extract all fields from JSON in `inField` and merge with existing dynamic fields.
// _MAP = MAP_APPEND(_MAP, JSON_EXTRACT_ALL(inField))
RexNode inField = QualifiedNameResolver.resolveFieldOrThrow(1, 0, node.getInField(), context);
RexNode map = context.rexBuilder.makeCall(BuiltinFunctionName.JSON_EXTRACT_ALL, inField);
if (context.fieldBuilder.isDynamicFieldsExist()) {
map =
context.rexBuilder.makeCall(
BuiltinFunctionName.MAP_APPEND, context.fieldBuilder.getDynamicFieldsMap(), map);
}

// 2. Merge dynamic fields with static fields.
// static_field1 = APPEND(static_field1, _MAP[static_field1]), static_attr2 = ...
List<String> staticFieldNames = context.fieldBuilder.getStaticFieldNames();
List<RexNode> fields = new ArrayList<>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned in standup this morning that I think we're making this more complex by working directly on the List of fields.

I'd like to consider putting the bulk of this logic in a data class that can start supplying a higher-level interface for most field operations. Can incrementally move more commands to use it over time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is very special logic for spath to append values to static fields and remove appended ones from dynamic fields. I don't think it is something reusable for other use case, and should be coded in spath implementation rather than abstraction layer.
spath command is special since it generate new dynamic fields while others only consume.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If spath command is special enough for this, I imagine other commands would have their own special cases later. It would still be worth finding an abstraction here in my view. To me it seems like this is one of the main operations of adding dynamic fields to that structure & would eventually be reusable as more dynamic commands are visited.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expected commands which would add dynamic fields are: multikv and timechart (There are some more with lower priorities: chart, extract, kv, xpath, xmlkv, transpose, xyseries, pivot)

multikv would simply overwrite existing fields.
timechart would also overwrite existing fields (it is a bit different since it would rather make new frame rather than updating existing frame)

There might be a chance we could reuse current logic in the future, but I think we should generalize when we implement that. I don't have very good idea to generalize some part right now. Let me know if you already have clear picture which part should be generalized.

for (String fieldName : staticFieldNames) {
RexNode field = context.fieldBuilder.staticField(fieldName);
RexNode appended =
context.rexBuilder.makeCall(
BuiltinFunctionName.INTERNAL_APPEND,
field,
context.rexBuilder.createItemAccess(map, fieldName));
fields.add(context.relBuilder.alias(appended, fieldName));
}

// 3. Dedupe dynamic fields with static fields.
// _MAP = MAP_REMOVE(_MAP, [static_attr1, static_attr2, ...])
RexNode fieldNameArray = context.rexBuilder.createStringArrayLiteral(staticFieldNames);
RexNode dedupedMap =
context.rexBuilder.makeCall(BuiltinFunctionName.MAP_REMOVE, map, fieldNameArray);
fields.add(context.relBuilder.alias(dedupedMap, DynamicFieldsConstants.DYNAMIC_FIELDS_MAP));

context.relBuilder.project(fields);

return context.relBuilder.peek();
}

// might need to remove fields in _MAP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.calcite;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
Expand All @@ -25,7 +26,9 @@
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.exception.SemanticCheckException;
import org.opensearch.sql.expression.function.BuiltinFunctionName;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.expression.function.PPLFuncImpTable;

public class ExtendedRexBuilder extends RexBuilder {

Expand Down Expand Up @@ -170,4 +173,25 @@ public RexNode castToString(RexNode node) {
RelDataType nullableStringType = getTypeFactory().createTypeWithNullability(stringType, true);
return makeCast(nullableStringType, node, true, true);
}

public RexNode createItemAccess(RexNode field, String itemName) {
return PPLFuncImpTable.INSTANCE.resolve(
this, BuiltinFunctionName.INTERNAL_ITEM, field, makeLiteral(itemName));
}

public RexNode makeCall(BuiltinFunctionName fn, RexNode... nodes) {
return PPLFuncImpTable.INSTANCE.resolve(this, fn, nodes);
}

public RexNode createStringArrayLiteral(List<String> values) {
RelDataType stringType = getTypeFactory().createSqlType(SqlTypeName.VARCHAR);
RelDataType arrayType = getTypeFactory().createArrayType(stringType, -1);

List<RexNode> elements = new ArrayList<>();
for (String value : values) {
elements.add(makeLiteral(value));
}

return makeCall(arrayType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, elements);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public enum BuiltinFunctionName {
MAP_CONCAT(FunctionName.of("map_concat"), true),
MAP_REMOVE(FunctionName.of("map_remove"), true),
MVAPPEND(FunctionName.of("mvappend")),
INTERNAL_APPEND(FunctionName.of("append"), true),
MVJOIN(FunctionName.of("mvjoin")),
FORALL(FunctionName.of("forall")),
EXISTS(FunctionName.of("exists")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,32 @@
import java.util.ArrayList;
import java.util.List;

/** Core logic for `mvappend` command to collect elements from list of args */
public class MVAppendCore {
/**
* Core logic for `mvappend` and internal `append` function to collect elements from list of args.
*/
public class AppendCore {

/**
* Collect non-null elements from `args`. If an item is a list, it will collect non-null elements
* of the list. See {@ref MVAppendFunctionImplTest} for detailed behavior.
* of the list. See {@ref AppendFunctionImplTest} for detailed behavior.
*/
public static Object collectElements(Object... args) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: Can we make this typing any more specific than Object?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot since input/output could be any type.

List<Object> elements = collectElementsToList(args);

if (elements.isEmpty()) {
return null;
} else if (elements.size() == 1) {
// return the element in case of single element
return elements.get(0);
} else {
return elements;
}
}

/**
* Collect non-null elements from `args`. If an item is a list, it will collect non-null elements.
*/
public static List<Object> collectElements(Object... args) {
public static List<Object> collectElementsToList(Object... args) {
List<Object> elements = new ArrayList<>();

for (Object arg : args) {
Expand All @@ -28,7 +46,7 @@ public static List<Object> collectElements(Object... args) {
}
}

return elements.isEmpty() ? null : elements;
return elements;
}

private static void addListElements(List<?> list, List<Object> elements) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.CollectionUDF;

import java.util.List;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.expression.function.ImplementorUDF;
import org.opensearch.sql.expression.function.UDFOperandMetadata;

/**
* Internal append function that appends all elements from arguments to create an array. Returns
* null if there is no element. Returns the scalar value if there is single element. Otherwise,
* returns a list containing all the elements from inputs.
*/
public class AppendFunctionImpl extends ImplementorUDF {

public AppendFunctionImpl() {
super(new AppendImplementor(), NullPolicy.ALL);
}

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return sqlOperatorBinding -> {
RelDataTypeFactory typeFactory = sqlOperatorBinding.getTypeFactory();

if (sqlOperatorBinding.getOperandCount() == 0) {
return typeFactory.createSqlType(SqlTypeName.NULL);
}

// Return type is ANY as it could return scalar value (in case of single item) or array
return typeFactory.createSqlType(SqlTypeName.ANY);
};
}

@Override
public UDFOperandMetadata getOperandMetadata() {
return null;
}

public static class AppendImplementor implements NotNullImplementor {
@Override
public Expression implement(
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
return Expressions.call(
Types.lookupMethod(AppendFunctionImpl.class, "append", Object[].class),
Expressions.newArrayInit(Object.class, translatedOperands));
}
}

public static Object append(Object... args) {
return AppendCore.collectElements(args);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ public Expression implement(
}

public static Object mvappend(Object... args) {
return MVAppendCore.collectElements(args);
return collectElements(args);
}

/**
* Collect non-null elements from `args`. If an item is a list, it will collect non-null elements
* of the list. See {@ref MVAppendFunctionImplTest} for detailed behavior.
*/
public static List<Object> collectElements(Object... args) {
List<Object> elements = AppendCore.collectElementsToList(args);
return elements.isEmpty() ? null : elements;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.opensearch.sql.expression.function.UDFOperandMetadata;

/**
* MapAppend function that merges two maps. All the values will be converted to list for type
* consistency.
* MapAppend function that merges two maps. Value for the same key will be merged into an array by
* using {@ref: AppendCore}.
*/
public class MapAppendFunctionImpl extends ImplementorUDF {

Expand Down Expand Up @@ -89,7 +89,7 @@ private static Map<String, Object> verifyMap(Object map) {
static Map<String, Object> mapAppendImpl(Map<String, Object> map) {
Map<String, Object> result = new HashMap<>();
for (String key : map.keySet()) {
result.put(key, MVAppendCore.collectElements(map.get(key)));
result.put(key, AppendCore.collectElements(map.get(key)));
}
return result;
}
Expand All @@ -99,7 +99,7 @@ static Map<String, Object> mapAppendImpl(
Map<String, Object> result = new HashMap<>();

for (String key : mergeKeys(firstMap, secondMap)) {
result.put(key, MVAppendCore.collectElements(firstMap.get(key), secondMap.get(key)));
result.put(key, AppendCore.collectElements(firstMap.get(key), secondMap.get(key)));
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.expression.datetime.DateTimeFunctions;
import org.opensearch.sql.expression.function.CollectionUDF.AppendFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.ArrayFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.ExistsFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.FilterFunctionImpl;
Expand Down Expand Up @@ -389,9 +390,10 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
public static final SqlOperator FORALL = new ForallFunctionImpl().toUDF("forall");
public static final SqlOperator EXISTS = new ExistsFunctionImpl().toUDF("exists");
public static final SqlOperator ARRAY = new ArrayFunctionImpl().toUDF("array");
public static final SqlOperator MAP_APPEND = new MapAppendFunctionImpl().toUDF("map_append");
public static final SqlOperator MAP_APPEND = new MapAppendFunctionImpl().toUDF("MAP_APPEND");
public static final SqlOperator MAP_REMOVE = new MapRemoveFunctionImpl().toUDF("MAP_REMOVE");
public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("mvappend");
public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("MVAPPEND");
public static final SqlOperator INTERNAL_APPEND = new AppendFunctionImpl().toUDF("APPEND");
public static final SqlOperator FILTER = new FilterFunctionImpl().toUDF("filter");
public static final SqlOperator TRANSFORM = new TransformFunctionImpl().toUDF("transform");
public static final SqlOperator REDUCE = new ReduceFunctionImpl().toUDF("reduce");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.HOUR_OF_DAY;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IF;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.IFNULL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.INTERNAL_APPEND;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.INTERNAL_GROK;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.INTERNAL_ITEM;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.INTERNAL_PARSE;
Expand Down Expand Up @@ -957,6 +958,7 @@ void populate() {

registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
registerOperator(INTERNAL_APPEND, PPLBuiltinOperators.INTERNAL_APPEND);
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.CollectionUDF;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.Test;

public class AppendFunctionImplTest {

@Test
public void testAppendWithNoArguments() {
Object result = AppendFunctionImpl.append();
assertNull(result);
}

@Test
public void testAppendWithSingleElement() {
Object result = AppendFunctionImpl.append(42);
assertEquals(42, result);
}

@Test
public void testAppendWithMultipleElements() {
Object result = AppendFunctionImpl.append(1, 2, 3);
assertEquals(Arrays.asList(1, 2, 3), result);
}

@Test
public void testAppendWithNullValues() {
Object result = AppendFunctionImpl.append(null, 1, null);
assertEquals(1, result);
}

@Test
public void testAppendWithAllNulls() {
Object result = AppendFunctionImpl.append(null, null);
assertNull(result);
}

@Test
public void testAppendWithArrayFlattening() {
List<Integer> array1 = Arrays.asList(1, 2);
List<Integer> array2 = Arrays.asList(3, 4);
Object result = AppendFunctionImpl.append(array1, array2);
assertEquals(Arrays.asList(1, 2, 3, 4), result);
}

@Test
public void testAppendWithMixedTypes() {
List<Integer> array = Arrays.asList(1, 2);
Object result = AppendFunctionImpl.append(array, 3, "hello");
assertEquals(Arrays.asList(1, 2, 3, "hello"), result);
}

@Test
public void testAppendWithArrayAndNulls() {
List<Integer> array = Arrays.asList(1, 2);
Object result = AppendFunctionImpl.append(null, array, null, 3);
assertEquals(Arrays.asList(1, 2, 3), result);
}

@Test
public void testAppendWithSingleNull() {
Object result = AppendFunctionImpl.append((Object) null);
assertNull(result);
}

@Test
public void testAppendWithEmptyArray() {
List<Object> emptyArray = Arrays.asList();
Object result = AppendFunctionImpl.append(emptyArray, 1);
assertEquals(1, result);
}
}
Loading
Loading