@@ -800,6 +800,33 @@ def quantile(
800800 else :
801801 return df if is_dataframe else df .transpose ().iloc [0 ]
802802
803+ def unique (self , query_compiler : "QueryCompiler" ) -> pd .Series :
804+
805+ query_params , _ = self ._resolve_tasks (query_compiler )
806+ body = Query (query_params .query )
807+
808+ fields = query_compiler ._mappings .all_source_fields ()
809+ assert len (fields ) == 1 # Unique is only for eland.Series
810+ field = fields [0 ]
811+ bucket_key = f"unique_{ field .column } "
812+
813+ body .composite_agg_bucket_terms (
814+ name = bucket_key ,
815+ field = field .aggregatable_es_field_name ,
816+ )
817+
818+ # Composite aggregation
819+ body .composite_agg_start (size = DEFAULT_PAGINATION_SIZE , name = "unique_buckets" )
820+
821+ unique_buckets : List [Any ] = sum (
822+ self .bucket_generator (query_compiler , body , agg_name = "unique_buckets" ), [] # type: ignore
823+ )
824+
825+ return np .array (
826+ [bucket ["key" ][bucket_key ] for bucket in unique_buckets ],
827+ dtype = field .pd_dtype ,
828+ )
829+
803830 def aggs_groupby (
804831 self ,
805832 query_compiler : "QueryCompiler" ,
@@ -920,7 +947,9 @@ def aggs_groupby(
920947 size = DEFAULT_PAGINATION_SIZE , name = "groupby_buckets" , dropna = dropna
921948 )
922949
923- for buckets in self .bucket_generator (query_compiler , body ):
950+ for buckets in self .bucket_generator (
951+ query_compiler , body , agg_name = "groupby_buckets"
952+ ):
924953 # We recieve response row-wise
925954 for bucket in buckets :
926955 # groupby columns are added to result same way they are returned
@@ -984,7 +1013,7 @@ def aggs_groupby(
9841013
9851014 @staticmethod
9861015 def bucket_generator (
987- query_compiler : "QueryCompiler" , body : "Query"
1016+ query_compiler : "QueryCompiler" , body : "Query" , agg_name : str
9881017 ) -> Generator [Sequence [Dict [str , Any ]], None , Sequence [Dict [str , Any ]]]:
9891018 """
9901019 This can be used for all groupby operations.
@@ -1015,7 +1044,7 @@ def bucket_generator(
10151044 )
10161045
10171046 # Pagination Logic
1018- composite_buckets : Dict [str , Any ] = res ["aggregations" ]["groupby_buckets" ]
1047+ composite_buckets : Dict [str , Any ] = res ["aggregations" ][agg_name ]
10191048
10201049 after_key : Optional [Dict [str , Any ]] = composite_buckets .get (
10211050 "after_key" , None
@@ -1028,7 +1057,7 @@ def bucket_generator(
10281057 yield buckets
10291058
10301059 body .composite_agg_after_key (
1031- name = "groupby_buckets" ,
1060+ name = agg_name ,
10321061 after_key = after_key ,
10331062 )
10341063 else :
0 commit comments