77package org .gridsuite .study .server .elasticsearch ;
88
99import co .elastic .clients .elasticsearch ._types .FieldValue ;
10+ import co .elastic .clients .elasticsearch ._types .aggregations .*;
11+ import co .elastic .clients .elasticsearch ._types .aggregations .Aggregation ;
1012import co .elastic .clients .elasticsearch ._types .query_dsl .*;
1113import com .powsybl .iidm .network .VariantManagerConstants ;
1214import org .apache .commons .lang3 .StringUtils ;
15+ import org .apache .commons .lang3 .tuple .Pair ;
16+ import org .gridsuite .study .server .dto .BasicEquipmentInfos ;
1317import org .gridsuite .study .server .dto .EquipmentInfos ;
1418import org .gridsuite .study .server .dto .TombstonedEquipmentInfos ;
19+ import org .slf4j .Logger ;
20+ import org .slf4j .LoggerFactory ;
1521import org .springframework .data .domain .PageRequest ;
16- import org .springframework .data .elasticsearch .client .elc .NativeQuery ;
17- import org .springframework .data .elasticsearch .client .elc .NativeQueryBuilder ;
18- import org .springframework .data .elasticsearch .client .elc .Queries ;
22+ import org .springframework .data .elasticsearch .client .elc .*;
1923import org .springframework .data .elasticsearch .core .ElasticsearchOperations ;
2024import org .springframework .data .elasticsearch .core .SearchHit ;
25+ import org .springframework .data .elasticsearch .core .SearchHits ;
2126import org .springframework .lang .NonNull ;
2227import org .springframework .stereotype .Service ;
2328
@@ -41,6 +46,8 @@ public enum FieldSelector {
4146
4247 private static final int PAGE_MAX_SIZE = 400 ;
4348
49+ private static final int COMPOSITE_AGGREGATION_BATCH_SIZE = 1000 ;
50+
4451 public static final Map <String , Integer > EQUIPMENT_TYPE_SCORES = Map .ofEntries (
4552 entry ("SUBSTATION" , 15 ),
4653 entry ("VOLTAGE_LEVEL" , 14 ),
@@ -72,6 +79,8 @@ public enum FieldSelector {
7279
7380 private final ElasticsearchOperations elasticsearchOperations ;
7481
82+ private static final Logger LOGGER = LoggerFactory .getLogger (EquipmentInfosService .class );
83+
7584 public EquipmentInfosService (EquipmentInfosRepository equipmentInfosRepository , TombstonedEquipmentInfosRepository tombstonedEquipmentInfosRepository , ElasticsearchOperations elasticsearchOperations ) {
7685 this .equipmentInfosRepository = equipmentInfosRepository ;
7786 this .tombstonedEquipmentInfosRepository = tombstonedEquipmentInfosRepository ;
@@ -105,6 +114,101 @@ public long getEquipmentInfosCount() {
105114 return equipmentInfosRepository .count ();
106115 }
107116
117+ private CompositeAggregation buildCompositeAggregation (String field , Map <String , FieldValue > afterKey ) {
118+ List <Map <String , CompositeAggregationSource >> sources = List .of (
119+ Map .of (field , CompositeAggregationSource .of (s -> s .terms (t -> t .field (field + ".keyword" )))
120+ )
121+ );
122+
123+ CompositeAggregation .Builder compositeAggregationBuilder = new CompositeAggregation .Builder ()
124+ .size (COMPOSITE_AGGREGATION_BATCH_SIZE )
125+ .sources (sources );
126+
127+ if (afterKey != null ) {
128+ compositeAggregationBuilder .after (afterKey );
129+ }
130+
131+ return compositeAggregationBuilder .build ();
132+ }
133+
134+ /**
135+ * Constructs a NativeQuery with a composite aggregation.
136+ *
137+ * @param compositeName The name of the composite aggregation.
138+ * @param compositeAggregation The composite aggregation configuration.
139+ * @return A NativeQuery object configured with the specified composite aggregation.
140+ */
141+ private NativeQuery buildCompositeAggregationQuery (String compositeName , CompositeAggregation compositeAggregation ) {
142+ Aggregation aggregation = Aggregation .of (a -> a .composite (compositeAggregation ));
143+
144+ return new NativeQueryBuilder ()
145+ .withAggregation (compositeName , aggregation )
146+ .build ();
147+ }
148+
149+ /**
150+ * This method is used to extract the results of a composite aggregation from Elasticsearch search hits.
151+ *
152+ * @param searchHits The search hits returned from an Elasticsearch query.
153+ * @param compositeName The name of the composite aggregation.
154+ * @return A Pair consisting of two elements:
155+ * The left element of the Pair is a list of maps, where each map represents a bucket's key. Each bucket is a result of the composite aggregation.
156+ * The right element of the Pair is the afterKey map, which is used for pagination in Elasticsearch.
157+ * If there are no more pages, the afterKey will be null.
158+ * @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html">Elasticsearch Composite Aggregation Documentation</a>
159+ */
160+ private Pair <List <Map <String , FieldValue >>, Map <String , FieldValue >> extractCompositeAggregationResults (SearchHits <EquipmentInfos > searchHits , String compositeName ) {
161+ ElasticsearchAggregations aggregations = (ElasticsearchAggregations ) searchHits .getAggregations ();
162+
163+ List <Map <String , FieldValue >> results = new ArrayList <>();
164+ if (aggregations != null ) {
165+ Map <String , ElasticsearchAggregation > aggregationList = aggregations .aggregationsAsMap ();
166+ if (!aggregationList .isEmpty ()) {
167+ Aggregate aggregate = aggregationList .get (compositeName ).aggregation ().getAggregate ();
168+ if (aggregate .isComposite () && aggregate .composite () != null ) {
169+ for (CompositeBucket bucket : aggregate .composite ().buckets ().array ()) {
170+ Map <String , FieldValue > key = bucket .key ();
171+ results .add (key );
172+ }
173+ return Pair .of (results , aggregate .composite ().afterKey ());
174+ }
175+ }
176+ }
177+ return Pair .of (results , null );
178+ }
179+
180+ public List <UUID > getEquipmentInfosDistinctNetworkUuids () {
181+ List <UUID > networkUuids = new ArrayList <>();
182+ Map <String , FieldValue > afterKey = null ;
183+ String compositeName = "composite_agg" ;
184+ String networkUuidField = BasicEquipmentInfos .Fields .networkUuid ;
185+
186+ do {
187+ CompositeAggregation compositeAggregation = buildCompositeAggregation (networkUuidField , afterKey );
188+ NativeQuery query = buildCompositeAggregationQuery (compositeName , compositeAggregation );
189+
190+ SearchHits <EquipmentInfos > searchHits = elasticsearchOperations .search (query , EquipmentInfos .class );
191+ Pair <List <Map <String , FieldValue >>, Map <String , FieldValue >> searchResults = extractCompositeAggregationResults (searchHits , compositeName );
192+
193+ searchResults .getLeft ().stream ()
194+ .map (result -> result .get (networkUuidField ))
195+ .filter (Objects ::nonNull )
196+ .map (FieldValue ::stringValue )
197+ .map (UUID ::fromString )
198+ .forEach (networkUuids ::add );
199+
200+ afterKey = searchResults .getRight ();
201+ } while (afterKey != null && !afterKey .isEmpty ());
202+
203+ return networkUuids ;
204+ }
205+
206+ public List <UUID > getOrphanEquipmentInfosNetworkUuids (List <UUID > networkUuidsInDatabase ) {
207+ List <UUID > networkUuids = getEquipmentInfosDistinctNetworkUuids ();
208+ networkUuids .removeAll (networkUuidsInDatabase );
209+ return networkUuids ;
210+ }
211+
108212 public long getTombstonedEquipmentInfosCount () {
109213 return tombstonedEquipmentInfosRepository .count ();
110214 }
0 commit comments