@@ -12,6 +12,7 @@ import (
12
12
"time"
13
13
14
14
"github.com/goto/compass/core/asset"
15
+ "github.com/goto/compass/core/user"
15
16
"github.com/goto/compass/pkg/queryexpr"
16
17
"github.com/goto/salt/log"
17
18
)
@@ -216,6 +217,19 @@ func (repo *DiscoveryRepository) softDeleteAsset(ctx context.Context, discoveryO
216
217
})
217
218
}(time .Now ())
218
219
220
+ // First get the current version
221
+ currentVersion , err := repo .getCurrentAssetVersion (ctx , softDeleteAsset .URN )
222
+ if err != nil {
223
+ return asset.DiscoveryError {
224
+ Op : "GetCurrentVersion" ,
225
+ Err : fmt .Errorf ("failed to get current version for URN %s: %w" , softDeleteAsset .URN , err ),
226
+ }
227
+ }
228
+ newVersion , err := asset .IncreaseMinorVersion (currentVersion )
229
+ if err != nil {
230
+ return err
231
+ }
232
+
219
233
// Create the update request body
220
234
body := map [string ]interface {}{
221
235
"query" : map [string ]interface {}{
@@ -228,33 +242,36 @@ func (repo *DiscoveryRepository) softDeleteAsset(ctx context.Context, discoveryO
228
242
ctx._source.is_deleted = true;
229
243
ctx._source.updated_at = params.updated_at;
230
244
ctx._source.refreshed_at = params.refreshed_at;
231
- ctx._source.updated_by = params.updated_by
245
+ ctx._source.updated_by = params.updated_by;
246
+ ctx._source.version = params.version;
232
247
` ,
233
248
"lang" : "painless" ,
234
249
"params" : map [string ]interface {}{
235
250
"updated_at" : softDeleteAsset .UpdatedAt ,
236
251
"refreshed_at" : softDeleteAsset .RefreshedAt ,
237
- "updated_by" : softDeleteAsset .UpdatedBy ,
252
+ "updated_by" : user.User {ID : softDeleteAsset .UpdatedBy },
253
+ "version" : newVersion ,
238
254
},
239
255
},
240
256
}
241
257
242
- // Convert body to JSON
243
- var buf bytes.Buffer
244
- if err := json .NewEncoder (& buf ).Encode (body ); err != nil {
258
+ buf , err := encodeBodyRequest (body )
259
+ if err != nil {
245
260
return asset.DiscoveryError {
246
261
Op : "SoftDeleteByURN" ,
247
- Err : fmt . Errorf ( "failed to encode request body: %w" , err ) ,
262
+ Err : err ,
248
263
}
249
264
}
250
265
251
266
// Execute UpdateByQuery
252
267
res , err := repo .cli .client .UpdateByQuery (
253
268
[]string {defaultSearchIndex },
254
269
repo .cli .client .UpdateByQuery .WithContext (ctx ),
255
- repo .cli .client .UpdateByQuery .WithBody (& buf ),
270
+ repo .cli .client .UpdateByQuery .WithBody (buf ),
256
271
repo .cli .client .UpdateByQuery .WithRefresh (true ),
257
272
repo .cli .client .UpdateByQuery .WithIgnoreUnavailable (true ),
273
+ repo .cli .client .UpdateByQuery .WithWaitForCompletion (true ),
274
+ repo .cli .client .UpdateByQuery .WithConflicts ("proceed" ),
258
275
)
259
276
if err != nil {
260
277
return asset.DiscoveryError {
@@ -276,6 +293,58 @@ func (repo *DiscoveryRepository) softDeleteAsset(ctx context.Context, discoveryO
276
293
return nil
277
294
}
278
295
296
+ // Helper function to get current version
297
+ func (repo * DiscoveryRepository ) getCurrentAssetVersion (ctx context.Context , urn string ) (string , error ) {
298
+ query := map [string ]interface {}{
299
+ "query" : map [string ]interface {}{
300
+ "term" : map [string ]interface {}{
301
+ "urn.keyword" : urn ,
302
+ },
303
+ },
304
+ "size" : 1 ,
305
+ "_source" : []string {"version" },
306
+ }
307
+
308
+ var buf bytes.Buffer
309
+ if err := json .NewEncoder (& buf ).Encode (query ); err != nil {
310
+ return "" , fmt .Errorf ("failed to encode query: %w" , err )
311
+ }
312
+
313
+ res , err := repo .cli .client .Search (
314
+ repo .cli .client .Search .WithContext (ctx ),
315
+ repo .cli .client .Search .WithIndex (defaultSearchIndex ),
316
+ repo .cli .client .Search .WithBody (& buf ),
317
+ )
318
+ if err != nil {
319
+ return "" , fmt .Errorf ("search failed: %w" , err )
320
+ }
321
+ defer res .Body .Close ()
322
+
323
+ if res .IsError () {
324
+ return "" , fmt .Errorf ("search error: %s" , res .String ())
325
+ }
326
+
327
+ var result struct {
328
+ Hits struct {
329
+ Hits []struct {
330
+ Source struct {
331
+ Version string `json:"version"`
332
+ } `json:"_source"`
333
+ } `json:"hits"`
334
+ } `json:"hits"`
335
+ }
336
+
337
+ if err := json .NewDecoder (res .Body ).Decode (& result ); err != nil {
338
+ return "" , fmt .Errorf ("failed to decode response: %w" , err )
339
+ }
340
+
341
+ if len (result .Hits .Hits ) == 0 {
342
+ return "" , fmt .Errorf ("asset with URN %s not found" , urn )
343
+ }
344
+
345
+ return result .Hits .Hits [0 ].Source .Version , nil
346
+ }
347
+
279
348
func (repo * DiscoveryRepository ) indexAsset (ctx context.Context , ast asset.Asset ) (err error ) {
280
349
defer func (start time.Time ) {
281
350
const op = "index"
@@ -287,7 +356,7 @@ func (repo *DiscoveryRepository) indexAsset(ctx context.Context, ast asset.Asset
287
356
})
288
357
}(time .Now ())
289
358
290
- body , err := createUpsertBody (ast )
359
+ body , err := encodeBodyRequest (ast )
291
360
if err != nil {
292
361
return asset.DiscoveryError {
293
362
Op : "EncodeAsset" ,
@@ -327,10 +396,10 @@ func (repo *DiscoveryRepository) indexAsset(ctx context.Context, ast asset.Asset
327
396
return nil
328
397
}
329
398
330
- func createUpsertBody ( ast asset. Asset ) (io.Reader , error ) {
399
+ func encodeBodyRequest ( body interface {} ) (io.Reader , error ) {
331
400
var buf bytes.Buffer
332
- if err := json .NewEncoder (& buf ).Encode (ast ); err != nil {
333
- return nil , fmt .Errorf ("encode asset : %w" , err )
401
+ if err := json .NewEncoder (& buf ).Encode (body ); err != nil {
402
+ return nil , fmt .Errorf ("encode request body : %w" , err )
334
403
}
335
404
336
405
return & buf , nil
0 commit comments