Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public enum BuiltinFunctionName {
MVAPPEND(FunctionName.of("mvappend")),
MVJOIN(FunctionName.of("mvjoin")),
MVINDEX(FunctionName.of("mvindex")),
MVZIP(FunctionName.of("mvzip")),
SPLIT(FunctionName.of("split")),
MVDEDUP(FunctionName.of("mvdedup")),
FORALL(FunctionName.of("forall")),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.expression.function.CollectionUDF;

import static org.apache.calcite.sql.type.SqlTypeUtil.createArrayType;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.linq4j.tree.Types;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.type.CompositeOperandTypeChecker;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.SqlReturnTypeInference;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.opensearch.sql.expression.function.ImplementorUDF;
import org.opensearch.sql.expression.function.UDFOperandMetadata;

/**
* MVZip function that combines two multivalue fields pairwise with a delimiter.
*
* <p>This function zips together two arrays by combining the first value of left with the first
* value of right, the second with the second, and so on, up to the length of the shorter array.
*
* <p>Behavior:
*
* <ul>
* <li>Returns null if either left or right is null
* <li>Returns an empty array if one or both arrays are empty
* <li>Stops at the length of the shorter array (like Python's zip)
* <li>Uses the provided delimiter to join values (default: ",")
* </ul>
*/
public class MVZipFunctionImpl extends ImplementorUDF {

public MVZipFunctionImpl() {
// Use ANY: return null if any argument is null
super(new MVZipImplementor(), NullPolicy.ANY);
}

@Override
public SqlReturnTypeInference getReturnTypeInference() {
return sqlOperatorBinding -> {
RelDataTypeFactory typeFactory = sqlOperatorBinding.getTypeFactory();

// mvzip returns an array of VARCHAR (strings)
RelDataType elementType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
return createArrayType(
typeFactory, typeFactory.createTypeWithNullability(elementType, true), true);
};
}

@Override
public UDFOperandMetadata getOperandMetadata() {
// First two arguments must be arrays, optional STRING delimiter
return UDFOperandMetadata.wrap(
(CompositeOperandTypeChecker)
OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.ARRAY)
.or(
OperandTypes.family(
SqlTypeFamily.ARRAY, SqlTypeFamily.ARRAY, SqlTypeFamily.CHARACTER)));
}

public static class MVZipImplementor implements NotNullImplementor {
@Override
public Expression implement(
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
// Handle both 2-argument (with default delimiter) and 3-argument cases
if (translatedOperands.size() == 2) {
// mvzip(left, right) - use default delimiter ","
return Expressions.call(
Types.lookupMethod(
MVZipFunctionImpl.class, "mvzip", List.class, List.class, String.class),
translatedOperands.get(0),
translatedOperands.get(1),
Expressions.constant(","));
} else if (translatedOperands.size() == 3) {
// mvzip(left, right, delimiter)
return Expressions.call(
Types.lookupMethod(
MVZipFunctionImpl.class, "mvzip", List.class, List.class, String.class),
translatedOperands.get(0),
translatedOperands.get(1),
translatedOperands.get(2));
} else {
throw new IllegalArgumentException(
"mvzip expects 2 or 3 arguments, got " + translatedOperands.size());
}
}
}

/**
* Combines two multivalue arrays pairwise with a delimiter.
*
* @param left The left multivalue array
* @param right The right multivalue array
* @param delimiter The delimiter to use for joining values (default: ",")
* @return A list of combined values, empty list if either array is empty, or null if either input
* is null
*/
public static List<Object> mvzip(List<?> left, List<?> right, String delimiter) {
if (left == null || right == null) {
return null;
}

List<Object> result = new ArrayList<>();
int minLength = Math.min(left.size(), right.size());

for (int i = 0; i < minLength; i++) {
String combined =
Objects.toString(left.get(i), "") + delimiter + Objects.toString(right.get(i), "");
result.add(combined);
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.sql.expression.function.CollectionUDF.FilterFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.ForallFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MVAppendFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MVZipFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MapAppendFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.MapRemoveFunctionImpl;
import org.opensearch.sql.expression.function.CollectionUDF.ReduceFunctionImpl;
Expand Down Expand Up @@ -392,6 +393,7 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
public static final SqlOperator MAP_APPEND = new MapAppendFunctionImpl().toUDF("map_append");
public static final SqlOperator MAP_REMOVE = new MapRemoveFunctionImpl().toUDF("MAP_REMOVE");
public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("mvappend");
public static final SqlOperator MVZIP = new MVZipFunctionImpl().toUDF("mvzip");
public static final SqlOperator FILTER = new FilterFunctionImpl().toUDF("filter");
public static final SqlOperator TRANSFORM = new TransformFunctionImpl().toUDF("transform");
public static final SqlOperator REDUCE = new ReduceFunctionImpl().toUDF("reduce");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVDEDUP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVZIP;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOW;
Expand Down Expand Up @@ -1035,6 +1036,7 @@ void populate() {
registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
registerOperator(MVDEDUP, SqlLibraryOperators.ARRAY_DISTINCT);
registerOperator(MVZIP, PPLBuiltinOperators.MVZIP);
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);
Expand Down
107 changes: 106 additions & 1 deletion docs/user/ppl/functions/collection.md
Original file line number Diff line number Diff line change
Expand Up @@ -724,4 +724,109 @@ fetched rows / total rows = 1/1
| [alex,celestino,claudia] |
+--------------------------+
```


## MVZIP

### Description

Usage: mvzip(mv_left, mv_right, [delim]) combines the values in two multivalue arrays by pairing corresponding elements and joining them into strings. The delimiter is used to specify a delimiting character to join the two values. This is similar to the Python zip command.

The values are stitched together combining the first value of mv_left with the first value of mv_right, then the second with the second, and so on. Each pair is concatenated into a string using the delimiter. The function stops at the length of the shorter array.

The delimiter is optional. When specified, it must be enclosed in quotation marks. The default delimiter is a comma ( , ).

Returns null if either input is null. Returns an empty array if either input array is empty.

Argument type: mv_left: ARRAY, mv_right: ARRAY, delim: STRING (optional)
Return type: ARRAY of STRING
Example

```ppl
source=people
| eval hosts = array('host1', 'host2'), ports = array('80', '443'), nserver = mvzip(hosts, ports, ':')
| fields nserver
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+----------------------+
| nserver |
|----------------------|
| [host1:80,host2:443] |
+----------------------+
```

```ppl
source=people
| eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), result = mvzip(arr1, arr2, '|')
| fields result
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+---------------+
| result |
|---------------|
| [a|x,b|y,c|z] |
+---------------+
```

```ppl
source=people
| eval arr1 = array('1', '2', '3'), arr2 = array('a', 'b'), result = mvzip(arr1, arr2, '-')
| fields result
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+-----------+
| result |
|-----------|
| [1-a,2-b] |
+-----------+
```

```ppl
source=people
| eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), arr3 = array('1', '2', '3'), result = mvzip(mvzip(arr1, arr2, '-'), arr3, ':')
| fields result
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+---------------------+
| result |
|---------------------|
| [a-x:1,b-y:2,c-z:3] |
+---------------------+
```

```ppl
source=people
| eval arr1 = array('a', 'b'), arr2 = array(), result = mvzip(arr1, arr2)
| fields result
| head 1
```

Expected output:

```text
fetched rows / total rows = 1/1
+--------+
| result |
|--------|
| [] |
+--------+
```
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,75 @@ public void testMvindexRangeSingleElement() throws IOException {
verifyDataRows(actual, rows(List.of(3)));
}

@Test
public void testMvzipBasic() throws IOException {
// Basic example from spec: eval nserver=mvzip(hosts,ports)
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval hosts = array('host1', 'host2'), ports = array(80, 443), nserver"
+ " = mvzip(hosts, ports) | head 1 | fields nserver",
TEST_INDEX_BANK));

verifySchema(actual, schema("nserver", "array"));
verifyDataRows(actual, rows(List.of("host1,80", "host2,443")));
}

@Test
public void testMvzipWithCustomDelimiter() throws IOException {
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), result"
+ " = mvzip(arr1, arr2, '|') | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of("a|x", "b|y", "c|z")));
}

@Test
public void testMvzipNested() throws IOException {
// Example from spec: mvzip(mvzip(field1,field2,"|"),field3,"|")
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval field1 = array('a', 'b'), field2 = array('c', 'd'), field3 ="
+ " array('e', 'f'), result = mvzip(mvzip(field1, field2, '|'), field3, '|') |"
+ " head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of("a|c|e", "b|d|f")));
}

@Test
public void testMvzipWithEmptyArray() throws IOException {
// When one array is empty, result should be empty array (not null)
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvzip(array(), array('a', 'b')) | head 1 | fields"
+ " result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of()));
}

@Test
public void testMvzipWithBothEmptyArrays() throws IOException {
// When both arrays are empty, result should be empty array (not null)
JSONObject actual =
executeQuery(
String.format(
"source=%s | eval result = mvzip(array(), array()) | head 1 | fields result",
TEST_INDEX_BANK));

verifySchema(actual, schema("result", "array"));
verifyDataRows(actual, rows(List.of()));
}

@Test
public void testMvdedupWithDuplicates() throws IOException {
JSONObject actual =
Expand Down
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ ARRAY_LENGTH: 'ARRAY_LENGTH';
MVAPPEND: 'MVAPPEND';
MVJOIN: 'MVJOIN';
MVINDEX: 'MVINDEX';
MVZIP: 'MVZIP';
MVDEDUP: 'MVDEDUP';
SPLIT: 'SPLIT';
FORALL: 'FORALL';
Expand Down
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -1125,6 +1125,7 @@ collectionFunctionName
| MVJOIN
| MVINDEX
| MVDEDUP
| MVZIP
| SPLIT
| FORALL
| EXISTS
Expand Down
Loading
Loading