File tree 2 files changed +17
-11
lines changed
2 files changed +17
-11
lines changed Original file line number Diff line number Diff line change @@ -224,28 +224,24 @@ def load_events_since_marked_position(mark)
224
224
# aggregate root type can be specified (as a string) to only yield aggregate ids of the indicated type.
225
225
def event_streams_enumerator ( aggregate_type : nil , group_size : 100 )
226
226
Enumerator . new do |yielder |
227
- last_events_partition_key = ''
228
227
last_aggregate_id = nil
229
228
loop do
230
229
aggregate_rows = ActiveRecord ::Base . connection . exec_query (
231
- 'SELECT events_partition_key, aggregate_id
232
- FROM aggregates
230
+ 'SELECT aggregate_id
231
+ FROM aggregates
233
232
WHERE ($1::text IS NULL OR aggregate_type_id = (SELECT id FROM aggregate_types WHERE type = $1))
234
- AND ((events_partition_key >= $3 AND $4::uuid IS NULL)
235
- OR (events_partition_key, aggregate_id) > ($3, $4))
236
- ORDER BY 1, 2
233
+ AND ($3::uuid IS NULL OR aggregate_id > $3)
234
+ ORDER BY 1
237
235
LIMIT $2' ,
238
236
'aggregates_to_update' ,
239
237
[
240
238
aggregate_type ,
241
239
group_size ,
242
- last_events_partition_key ,
243
240
last_aggregate_id ,
244
241
] ,
245
242
) . to_a
246
243
break if aggregate_rows . empty?
247
244
248
- last_events_partition_key = aggregate_rows . last [ 'events_partition_key' ]
249
245
last_aggregate_id = aggregate_rows . last [ 'aggregate_id' ]
250
246
251
247
yielder << aggregate_rows . map { |x | x [ 'aggregate_id' ] }
Original file line number Diff line number Diff line change @@ -407,8 +407,8 @@ class MyAggregate < Sequent::Core::AggregateRoot
407
407
end
408
408
let ( :ordered_aggregate_ids ) do
409
409
event_streams
410
- . sort_by { |s | [ s . events_partition_key , s . aggregate_id ] }
411
410
. map ( &:aggregate_id )
411
+ . sort
412
412
end
413
413
414
414
let ( :group_size ) { 100 }
@@ -435,7 +435,12 @@ class MyAggregate < Sequent::Core::AggregateRoot
435
435
it 'finds all event streams of a specific type' do
436
436
subject = event_store . event_streams_enumerator ( aggregate_type : 'MyAggregate0' , group_size :)
437
437
aggregate_ids = subject . next
438
- expect ( aggregate_ids ) . to eq ( ordered_aggregate_ids [ 0 ...10 ] )
438
+ expect ( aggregate_ids ) . to eq (
439
+ event_streams
440
+ . select { |s | s . aggregate_type == 'MyAggregate0' }
441
+ . map ( &:aggregate_id )
442
+ . sort ,
443
+ )
439
444
expect { subject . next } . to raise_error ( StopIteration )
440
445
end
441
446
end
@@ -455,7 +460,12 @@ class MyAggregate < Sequent::Core::AggregateRoot
455
460
it 'finds all event streams of a specific type' do
456
461
subject = event_store . event_streams_enumerator ( aggregate_type : 'MyAggregate1' , group_size :)
457
462
aggregate_ids = subject . next
458
- expect ( aggregate_ids ) . to eq ( ordered_aggregate_ids [ 10 ..] )
463
+ expect ( aggregate_ids ) . to eq (
464
+ event_streams
465
+ . select { |s | s . aggregate_type == 'MyAggregate1' }
466
+ . map ( &:aggregate_id )
467
+ . sort ,
468
+ )
459
469
expect { subject . next } . to raise_error ( StopIteration )
460
470
end
461
471
end
You can’t perform that action at this time.
0 commit comments