Skip to content

Commit 24d5742

Browse files
Support mvzip eval function (#4805) (#4961)
* support mvzip eval function # Conflicts: # core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java # core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java # integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java # ppl/src/main/antlr/OpenSearchPPLLexer.g4 # ppl/src/main/antlr/OpenSearchPPLParser.g4 # ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java # Conflicts: # core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java # docs/user/ppl/functions/collection.rst # ppl/src/main/antlr/OpenSearchPPLParser.g4 # ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLArrayFunctionTest.java * add anonymizer test * fix UT * updates * update to use List.class * Update md doc * Addressing comments from coderabbit --------- (cherry picked from commit 52a691a) Signed-off-by: Kai Huang <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 74dbdb6 commit 24d5742

File tree

10 files changed

+457
-1
lines changed

10 files changed

+457
-1
lines changed

core/src/main/java/org/opensearch/sql/expression/function/BuiltinFunctionName.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public enum BuiltinFunctionName {
7575
MVAPPEND(FunctionName.of("mvappend")),
7676
MVJOIN(FunctionName.of("mvjoin")),
7777
MVINDEX(FunctionName.of("mvindex")),
78+
MVZIP(FunctionName.of("mvzip")),
7879
SPLIT(FunctionName.of("split")),
7980
MVDEDUP(FunctionName.of("mvdedup")),
8081
FORALL(FunctionName.of("forall")),
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.expression.function.CollectionUDF;
7+
8+
import static org.apache.calcite.sql.type.SqlTypeUtil.createArrayType;
9+
10+
import java.util.ArrayList;
11+
import java.util.List;
12+
import java.util.Objects;
13+
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
14+
import org.apache.calcite.adapter.enumerable.NullPolicy;
15+
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
16+
import org.apache.calcite.linq4j.tree.Expression;
17+
import org.apache.calcite.linq4j.tree.Expressions;
18+
import org.apache.calcite.linq4j.tree.Types;
19+
import org.apache.calcite.rel.type.RelDataType;
20+
import org.apache.calcite.rel.type.RelDataTypeFactory;
21+
import org.apache.calcite.rex.RexCall;
22+
import org.apache.calcite.sql.type.CompositeOperandTypeChecker;
23+
import org.apache.calcite.sql.type.OperandTypes;
24+
import org.apache.calcite.sql.type.SqlReturnTypeInference;
25+
import org.apache.calcite.sql.type.SqlTypeFamily;
26+
import org.apache.calcite.sql.type.SqlTypeName;
27+
import org.opensearch.sql.expression.function.ImplementorUDF;
28+
import org.opensearch.sql.expression.function.UDFOperandMetadata;
29+
30+
/**
31+
* MVZip function that combines two multivalue fields pairwise with a delimiter.
32+
*
33+
* <p>This function zips together two arrays by combining the first value of left with the first
34+
* value of right, the second with the second, and so on, up to the length of the shorter array.
35+
*
36+
* <p>Behavior:
37+
*
38+
* <ul>
39+
* <li>Returns null if either left or right is null
40+
* <li>Returns an empty array if one or both arrays are empty
41+
* <li>Stops at the length of the shorter array (like Python's zip)
42+
* <li>Uses the provided delimiter to join values (default: ",")
43+
* </ul>
44+
*/
45+
public class MVZipFunctionImpl extends ImplementorUDF {
46+
47+
public MVZipFunctionImpl() {
48+
// Use ANY: return null if any argument is null
49+
super(new MVZipImplementor(), NullPolicy.ANY);
50+
}
51+
52+
@Override
53+
public SqlReturnTypeInference getReturnTypeInference() {
54+
return sqlOperatorBinding -> {
55+
RelDataTypeFactory typeFactory = sqlOperatorBinding.getTypeFactory();
56+
57+
// mvzip returns an array of VARCHAR (strings)
58+
RelDataType elementType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
59+
return createArrayType(
60+
typeFactory, typeFactory.createTypeWithNullability(elementType, true), true);
61+
};
62+
}
63+
64+
@Override
65+
public UDFOperandMetadata getOperandMetadata() {
66+
// First two arguments must be arrays, optional STRING delimiter
67+
return UDFOperandMetadata.wrap(
68+
(CompositeOperandTypeChecker)
69+
OperandTypes.family(SqlTypeFamily.ARRAY, SqlTypeFamily.ARRAY)
70+
.or(
71+
OperandTypes.family(
72+
SqlTypeFamily.ARRAY, SqlTypeFamily.ARRAY, SqlTypeFamily.CHARACTER)));
73+
}
74+
75+
public static class MVZipImplementor implements NotNullImplementor {
76+
@Override
77+
public Expression implement(
78+
RexToLixTranslator translator, RexCall call, List<Expression> translatedOperands) {
79+
// Handle both 2-argument (with default delimiter) and 3-argument cases
80+
if (translatedOperands.size() == 2) {
81+
// mvzip(left, right) - use default delimiter ","
82+
return Expressions.call(
83+
Types.lookupMethod(
84+
MVZipFunctionImpl.class, "mvzip", List.class, List.class, String.class),
85+
translatedOperands.get(0),
86+
translatedOperands.get(1),
87+
Expressions.constant(","));
88+
} else if (translatedOperands.size() == 3) {
89+
// mvzip(left, right, delimiter)
90+
return Expressions.call(
91+
Types.lookupMethod(
92+
MVZipFunctionImpl.class, "mvzip", List.class, List.class, String.class),
93+
translatedOperands.get(0),
94+
translatedOperands.get(1),
95+
translatedOperands.get(2));
96+
} else {
97+
throw new IllegalArgumentException(
98+
"mvzip expects 2 or 3 arguments, got " + translatedOperands.size());
99+
}
100+
}
101+
}
102+
103+
/**
104+
* Combines two multivalue arrays pairwise with a delimiter.
105+
*
106+
* @param left The left multivalue array
107+
* @param right The right multivalue array
108+
* @param delimiter The delimiter to use for joining values (default: ",")
109+
* @return A list of combined values, empty list if either array is empty, or null if either input
110+
* is null
111+
*/
112+
public static List<Object> mvzip(List<?> left, List<?> right, String delimiter) {
113+
if (left == null || right == null) {
114+
return null;
115+
}
116+
117+
List<Object> result = new ArrayList<>();
118+
int minLength = Math.min(left.size(), right.size());
119+
120+
for (int i = 0; i < minLength; i++) {
121+
String combined =
122+
Objects.toString(left.get(i), "") + delimiter + Objects.toString(right.get(i), "");
123+
result.add(combined);
124+
}
125+
126+
return result;
127+
}
128+
}

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.sql.expression.function.CollectionUDF.FilterFunctionImpl;
4848
import org.opensearch.sql.expression.function.CollectionUDF.ForallFunctionImpl;
4949
import org.opensearch.sql.expression.function.CollectionUDF.MVAppendFunctionImpl;
50+
import org.opensearch.sql.expression.function.CollectionUDF.MVZipFunctionImpl;
5051
import org.opensearch.sql.expression.function.CollectionUDF.MapAppendFunctionImpl;
5152
import org.opensearch.sql.expression.function.CollectionUDF.MapRemoveFunctionImpl;
5253
import org.opensearch.sql.expression.function.CollectionUDF.ReduceFunctionImpl;
@@ -391,6 +392,7 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
391392
public static final SqlOperator MAP_APPEND = new MapAppendFunctionImpl().toUDF("map_append");
392393
public static final SqlOperator MAP_REMOVE = new MapRemoveFunctionImpl().toUDF("MAP_REMOVE");
393394
public static final SqlOperator MVAPPEND = new MVAppendFunctionImpl().toUDF("mvappend");
395+
public static final SqlOperator MVZIP = new MVZipFunctionImpl().toUDF("mvzip");
394396
public static final SqlOperator FILTER = new FilterFunctionImpl().toUDF("filter");
395397
public static final SqlOperator TRANSFORM = new TransformFunctionImpl().toUDF("transform");
396398
public static final SqlOperator REDUCE = new ReduceFunctionImpl().toUDF("reduce");

core/src/main/java/org/opensearch/sql/expression/function/PPLFuncImpTable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@
154154
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVDEDUP;
155155
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVINDEX;
156156
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVJOIN;
157+
import static org.opensearch.sql.expression.function.BuiltinFunctionName.MVZIP;
157158
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOT;
158159
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOTEQUAL;
159160
import static org.opensearch.sql.expression.function.BuiltinFunctionName.NOW;
@@ -1042,6 +1043,7 @@ void populate() {
10421043
registerOperator(ARRAY, PPLBuiltinOperators.ARRAY);
10431044
registerOperator(MVAPPEND, PPLBuiltinOperators.MVAPPEND);
10441045
registerOperator(MVDEDUP, SqlLibraryOperators.ARRAY_DISTINCT);
1046+
registerOperator(MVZIP, PPLBuiltinOperators.MVZIP);
10451047
registerOperator(MAP_APPEND, PPLBuiltinOperators.MAP_APPEND);
10461048
registerOperator(MAP_CONCAT, SqlLibraryOperators.MAP_CONCAT);
10471049
registerOperator(MAP_REMOVE, PPLBuiltinOperators.MAP_REMOVE);

docs/user/ppl/functions/collection.md

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,4 +724,109 @@ fetched rows / total rows = 1/1
724724
| [alex,celestino,claudia] |
725725
+--------------------------+
726726
```
727-
727+
728+
## MVZIP
729+
730+
### Description
731+
732+
Usage: mvzip(mv_left, mv_right, [delim]) combines the values in two multivalue arrays by pairing corresponding elements and joining them into strings. The delimiter is used to specify a delimiting character to join the two values. This is similar to the Python zip command.
733+
734+
The values are stitched together combining the first value of mv_left with the first value of mv_right, then the second with the second, and so on. Each pair is concatenated into a string using the delimiter. The function stops at the length of the shorter array.
735+
736+
The delimiter is optional. When specified, it must be enclosed in quotation marks. The default delimiter is a comma ( , ).
737+
738+
Returns null if either input is null. Returns an empty array if either input array is empty.
739+
740+
Argument type: mv_left: ARRAY, mv_right: ARRAY, delim: STRING (optional)
741+
Return type: ARRAY of STRING
742+
Example
743+
744+
```ppl
745+
source=people
746+
| eval hosts = array('host1', 'host2'), ports = array('80', '443'), nserver = mvzip(hosts, ports, ':')
747+
| fields nserver
748+
| head 1
749+
```
750+
751+
Expected output:
752+
753+
```text
754+
fetched rows / total rows = 1/1
755+
+----------------------+
756+
| nserver |
757+
|----------------------|
758+
| [host1:80,host2:443] |
759+
+----------------------+
760+
```
761+
762+
```ppl
763+
source=people
764+
| eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), result = mvzip(arr1, arr2, '|')
765+
| fields result
766+
| head 1
767+
```
768+
769+
Expected output:
770+
771+
```text
772+
fetched rows / total rows = 1/1
773+
+---------------+
774+
| result |
775+
|---------------|
776+
| [a|x,b|y,c|z] |
777+
+---------------+
778+
```
779+
780+
```ppl
781+
source=people
782+
| eval arr1 = array('1', '2', '3'), arr2 = array('a', 'b'), result = mvzip(arr1, arr2, '-')
783+
| fields result
784+
| head 1
785+
```
786+
787+
Expected output:
788+
789+
```text
790+
fetched rows / total rows = 1/1
791+
+-----------+
792+
| result |
793+
|-----------|
794+
| [1-a,2-b] |
795+
+-----------+
796+
```
797+
798+
```ppl
799+
source=people
800+
| eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), arr3 = array('1', '2', '3'), result = mvzip(mvzip(arr1, arr2, '-'), arr3, ':')
801+
| fields result
802+
| head 1
803+
```
804+
805+
Expected output:
806+
807+
```text
808+
fetched rows / total rows = 1/1
809+
+---------------------+
810+
| result |
811+
|---------------------|
812+
| [a-x:1,b-y:2,c-z:3] |
813+
+---------------------+
814+
```
815+
816+
```ppl
817+
source=people
818+
| eval arr1 = array('a', 'b'), arr2 = array(), result = mvzip(arr1, arr2)
819+
| fields result
820+
| head 1
821+
```
822+
823+
Expected output:
824+
825+
```text
826+
fetched rows / total rows = 1/1
827+
+--------+
828+
| result |
829+
|--------|
830+
| [] |
831+
+--------+
832+
```

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteArrayFunctionIT.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,75 @@ public void testMvindexRangeSingleElement() throws IOException {
490490
verifyDataRows(actual, rows(List.of(3)));
491491
}
492492

493+
@Test
494+
public void testMvzipBasic() throws IOException {
495+
// Basic example from spec: eval nserver=mvzip(hosts,ports)
496+
JSONObject actual =
497+
executeQuery(
498+
String.format(
499+
"source=%s | eval hosts = array('host1', 'host2'), ports = array(80, 443), nserver"
500+
+ " = mvzip(hosts, ports) | head 1 | fields nserver",
501+
TEST_INDEX_BANK));
502+
503+
verifySchema(actual, schema("nserver", "array"));
504+
verifyDataRows(actual, rows(List.of("host1,80", "host2,443")));
505+
}
506+
507+
@Test
508+
public void testMvzipWithCustomDelimiter() throws IOException {
509+
JSONObject actual =
510+
executeQuery(
511+
String.format(
512+
"source=%s | eval arr1 = array('a', 'b', 'c'), arr2 = array('x', 'y', 'z'), result"
513+
+ " = mvzip(arr1, arr2, '|') | head 1 | fields result",
514+
TEST_INDEX_BANK));
515+
516+
verifySchema(actual, schema("result", "array"));
517+
verifyDataRows(actual, rows(List.of("a|x", "b|y", "c|z")));
518+
}
519+
520+
@Test
521+
public void testMvzipNested() throws IOException {
522+
// Example from spec: mvzip(mvzip(field1,field2,"|"),field3,"|")
523+
JSONObject actual =
524+
executeQuery(
525+
String.format(
526+
"source=%s | eval field1 = array('a', 'b'), field2 = array('c', 'd'), field3 ="
527+
+ " array('e', 'f'), result = mvzip(mvzip(field1, field2, '|'), field3, '|') |"
528+
+ " head 1 | fields result",
529+
TEST_INDEX_BANK));
530+
531+
verifySchema(actual, schema("result", "array"));
532+
verifyDataRows(actual, rows(List.of("a|c|e", "b|d|f")));
533+
}
534+
535+
@Test
536+
public void testMvzipWithEmptyArray() throws IOException {
537+
// When one array is empty, result should be empty array (not null)
538+
JSONObject actual =
539+
executeQuery(
540+
String.format(
541+
"source=%s | eval result = mvzip(array(), array('a', 'b')) | head 1 | fields"
542+
+ " result",
543+
TEST_INDEX_BANK));
544+
545+
verifySchema(actual, schema("result", "array"));
546+
verifyDataRows(actual, rows(List.of()));
547+
}
548+
549+
@Test
550+
public void testMvzipWithBothEmptyArrays() throws IOException {
551+
// When both arrays are empty, result should be empty array (not null)
552+
JSONObject actual =
553+
executeQuery(
554+
String.format(
555+
"source=%s | eval result = mvzip(array(), array()) | head 1 | fields result",
556+
TEST_INDEX_BANK));
557+
558+
verifySchema(actual, schema("result", "array"));
559+
verifyDataRows(actual, rows(List.of()));
560+
}
561+
493562
@Test
494563
public void testMvdedupWithDuplicates() throws IOException {
495564
JSONObject actual =

ppl/src/main/antlr/OpenSearchPPLLexer.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ ARRAY_LENGTH: 'ARRAY_LENGTH';
447447
MVAPPEND: 'MVAPPEND';
448448
MVJOIN: 'MVJOIN';
449449
MVINDEX: 'MVINDEX';
450+
MVZIP: 'MVZIP';
450451
MVDEDUP: 'MVDEDUP';
451452
SPLIT: 'SPLIT';
452453
FORALL: 'FORALL';

ppl/src/main/antlr/OpenSearchPPLParser.g4

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,7 @@ collectionFunctionName
11041104
| MVJOIN
11051105
| MVINDEX
11061106
| MVDEDUP
1107+
| MVZIP
11071108
| SPLIT
11081109
| FORALL
11091110
| EXISTS

0 commit comments

Comments
 (0)