Skip to content

Commit a08cd71

Browse files
committed
wip: splitme: variable per-io memory for pgsr
Author: Reviewed-by: Discussion: https://postgr.es/m/ Backpatch:
1 parent 70a3e1b commit a08cd71

File tree

6 files changed

+166
-94
lines changed

6 files changed

+166
-94
lines changed

contrib/pg_prewarm/pg_prewarm.c

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ typedef struct prefetch
5353
} prefetch;
5454

5555
static BlockNumber
56-
prewarm_buffer_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
56+
prewarm_buffer_next(PgStreamingRead *pgsr,
57+
uintptr_t pgsr_private, void *io_private,
5758
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
5859
{
5960
prefetch *p = (prefetch *) pgsr_private;
@@ -72,7 +73,7 @@ prewarm_buffer_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
7273

7374
static PgStreamingReadNextStatus
7475
prewarm_smgr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
75-
PgAioInProgress *aio, uintptr_t *read_private)
76+
PgAioInProgress *aio, void *read_private)
7677
{
7778
prefetch *p = (prefetch *) pgsr_private;
7879
BlockNumber blockno;
@@ -96,13 +97,13 @@ prewarm_smgr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
9697
pgaio_io_start_read_smgr(aio, p->rel->rd_smgr, p->forkNumber, blockno,
9798
pgaio_bounce_buffer_buffer(bb));
9899

99-
*read_private = (uintptr_t) bb;
100+
*(void **) read_private = bb;
100101

101102
return PGSR_NEXT_IO;
102103
}
103104

104105
static void
105-
prewarm_smgr_release(uintptr_t pgsr_private, uintptr_t read_private)
106+
prewarm_smgr_release(uintptr_t pgsr_private, void *read_private)
106107
{
107108
}
108109

@@ -279,7 +280,7 @@ pg_prewarm(PG_FUNCTION_ARGS)
279280
p.lastblock = last_block;
280281
p.bbs = NIL;
281282

282-
pgsr = pg_streaming_read_buffer_alloc(512, (uintptr_t) &p,
283+
pgsr = pg_streaming_read_buffer_alloc(512, 0, (uintptr_t) &p,
283284
NULL,
284285
prewarm_buffer_next);
285286

@@ -289,7 +290,7 @@ pg_prewarm(PG_FUNCTION_ARGS)
289290

290291
CHECK_FOR_INTERRUPTS();
291292

292-
buf = (Buffer) pg_streaming_read_get_next(pgsr);
293+
buf = (Buffer) pg_streaming_read_buffer_get_next(pgsr, NULL);
293294
if (BufferIsValid(buf))
294295
ReleaseBuffer(buf);
295296
else
@@ -298,7 +299,7 @@ pg_prewarm(PG_FUNCTION_ARGS)
298299
++blocks_done;
299300
}
300301

301-
if (BufferIsValid(pg_streaming_read_get_next(pgsr)))
302+
if (BufferIsValid(pg_streaming_read_buffer_get_next(pgsr, NULL)))
302303
elog(ERROR, "unexpected additional buffer");
303304

304305
pg_streaming_read_free(pgsr);
@@ -315,21 +316,23 @@ pg_prewarm(PG_FUNCTION_ARGS)
315316
p.lastblock = last_block;
316317
p.bbs = NIL;
317318

318-
pgsr = pg_streaming_read_alloc(512, (uintptr_t) &p,
319+
pgsr = pg_streaming_read_alloc(512,
320+
sizeof(void *),
321+
(uintptr_t) &p,
319322
prewarm_smgr_next,
320323
prewarm_smgr_release);
321324

322325
for (block = first_block; block <= last_block; ++block)
323326
{
324-
PgAioBounceBuffer *bb;
327+
PgAioBounceBuffer **bb;
325328

326329
CHECK_FOR_INTERRUPTS();
327330

328-
bb = (PgAioBounceBuffer *) pg_streaming_read_get_next(pgsr);
331+
bb = (PgAioBounceBuffer **) pg_streaming_read_get_next(pgsr);
329332
if (bb == NULL)
330333
elog(ERROR, "prefetch ended early");
331334

332-
p.bbs = lappend(p.bbs, (void *) bb);
335+
p.bbs = lappend(p.bbs, (void *) *bb);
333336

334337
++blocks_done;
335338
}

src/backend/access/heap/heapam.c

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
227227
*/
228228

229229
static BlockNumber
230-
heap_pgsr_next_single(PgStreamingRead *pgsr, uintptr_t pgsr_private,
230+
heap_pgsr_next_single(PgStreamingRead *pgsr,
231+
uintptr_t pgsr_private, void *io_private,
231232
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
232233
{
233234
HeapScanDesc scan = (HeapScanDesc) pgsr_private;
@@ -268,7 +269,8 @@ heap_pgsr_next_single(PgStreamingRead *pgsr, uintptr_t pgsr_private,
268269
}
269270

270271
static BlockNumber
271-
heap_pgsr_next_parallel(PgStreamingRead *pgsr, uintptr_t pgsr_private,
272+
heap_pgsr_next_parallel(PgStreamingRead *pgsr,
273+
uintptr_t pgsr_private, void *io_private,
272274
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
273275
{
274276
HeapScanDesc scan = (HeapScanDesc) pgsr_private;
@@ -294,7 +296,8 @@ heap_pgsr_single_alloc(HeapScanDesc scan)
294296
{
295297
int iodepth = Max(Min(512, NBuffers / 128), 1);
296298

297-
return pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) scan,
299+
return pg_streaming_read_buffer_alloc(iodepth, 0,
300+
(uintptr_t) scan,
298301
scan->rs_strategy,
299302
heap_pgsr_next_single);
300303
}
@@ -304,7 +307,8 @@ heap_pgsr_parallel_alloc(HeapScanDesc scan)
304307
{
305308
int iodepth = Max(Min(512, NBuffers / 128), 1);
306309

307-
return pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) scan,
310+
return pg_streaming_read_buffer_alloc(iodepth, 0,
311+
(uintptr_t) scan,
308312
scan->rs_strategy,
309313
heap_pgsr_next_parallel);
310314
}
@@ -624,7 +628,7 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir, Buffer *pgsr_buf)
624628
/* FIXME: Integrate more neatly */
625629
if (scan->pgsr)
626630
{
627-
*pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
631+
*pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL);
628632
if (*pgsr_buf == InvalidBuffer)
629633
return InvalidBlockNumber;
630634
return BufferGetBlockNumber(*pgsr_buf);
@@ -784,7 +788,7 @@ heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir
784788
block = InvalidBlockNumber;
785789
}
786790
#endif
787-
*pgsr_buf = pg_streaming_read_get_next(scan->pgsr);
791+
*pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL);
788792
if (*pgsr_buf == InvalidBuffer)
789793
return InvalidBlockNumber;
790794
Assert(scan->rs_base.rs_parallel ||

src/backend/access/heap/vacuumlazy.c

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,8 @@ heap_vacuum_rel(Relation rel, VacuumParams *params,
799799
}
800800

801801
static BlockNumber
802-
vacuum_scan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
802+
vacuum_scan_pgsr_next(PgStreamingRead *pgsr,
803+
uintptr_t pgsr_private, void *io_private,
803804
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
804805
{
805806
LVRelState *vacrel = (LVRelState *) pgsr_private;
@@ -847,15 +848,6 @@ vacuum_scan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
847848
return InvalidBlockNumber;
848849
}
849850

850-
static void
851-
vacuum_pgsr_release(uintptr_t pgsr_private, uintptr_t read_private)
852-
{
853-
Buffer buf = (Buffer) read_private;
854-
855-
Assert(BufferIsValid(buf));
856-
ReleaseBuffer(buf);
857-
}
858-
859851
/*
860852
* lazy_scan_heap() -- workhorse function for VACUUM
861853
*
@@ -909,7 +901,8 @@ lazy_scan_heap(LVRelState *vacrel)
909901
{
910902
int iodepth = Max(Min(128, NBuffers / 128), 1);
911903

912-
pgsr = pg_streaming_read_buffer_alloc(iodepth, (uintptr_t) vacrel,
904+
pgsr = pg_streaming_read_buffer_alloc(iodepth, 0,
905+
(uintptr_t) vacrel,
913906
vacrel->bstrategy,
914907
vacuum_scan_pgsr_next);
915908
}
@@ -936,7 +929,7 @@ lazy_scan_heap(LVRelState *vacrel)
936929
bool all_visible_according_to_vm = false;
937930
LVPagePruneState prunestate;
938931

939-
buf = pg_streaming_read_get_next(pgsr);
932+
buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
940933
if (!BufferIsValid(buf))
941934
break;
942935

@@ -2505,7 +2498,6 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
25052498
typedef struct VacuumHeapBlockState
25062499
{
25072500
BlockNumber blkno;
2508-
Buffer buffer;
25092501
int start_tupindex;
25102502
int end_tupindex;
25112503
} VacuumHeapBlockState;
@@ -2518,19 +2510,19 @@ typedef struct VacuumHeapState
25182510
int next_tupindex;
25192511
} VacuumHeapState;
25202512

2521-
static PgStreamingReadNextStatus
2522-
vacuum_heap_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
2523-
PgAioInProgress *aio, uintptr_t *read_private)
2513+
static BlockNumber
2514+
vacuum_heap_pgsr_next(PgStreamingRead *pgsr,
2515+
uintptr_t pgsr_private,
2516+
void *io_private,
2517+
struct RelationData **rel, ForkNumber *forkNum, ReadBufferMode *mode)
25242518
{
25252519
VacuumHeapState *vhs = (VacuumHeapState *) pgsr_private;
25262520
VacDeadItems *dead_items = vhs->vacrel->dead_items;
2527-
VacuumHeapBlockState *bs;
2528-
bool already_valid;
2521+
VacuumHeapBlockState *bs = io_private;
25292522

25302523
if (vhs->next_tupindex == dead_items->num_items)
2531-
return PGSR_NEXT_END;
2524+
return InvalidBlockNumber;
25322525

2533-
bs = palloc0(sizeof(*bs));
25342526
bs->blkno = ItemPointerGetBlockNumber(&dead_items->items[vhs->next_tupindex]);
25352527
bs->start_tupindex = vhs->next_tupindex;
25362528
bs->end_tupindex = vhs->next_tupindex;
@@ -2544,15 +2536,10 @@ vacuum_heap_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
25442536
bs->end_tupindex = vhs->next_tupindex;
25452537
}
25462538

2547-
bs->buffer = ReadBufferAsync(vhs->relation, MAIN_FORKNUM, bs->blkno,
2548-
RBM_NORMAL, vhs->vacrel->bstrategy, &already_valid,
2549-
&aio);
2550-
*read_private = (uintptr_t) bs;
2551-
2552-
if (already_valid)
2553-
return PGSR_NEXT_NO_IO;
2554-
else
2555-
return PGSR_NEXT_IO;
2539+
*rel = vhs->relation;
2540+
*forkNum = MAIN_FORKNUM;
2541+
*mode = RBM_NORMAL;
2542+
return bs->blkno;
25562543
}
25572544

25582545
/*
@@ -2600,16 +2587,20 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
26002587
vhs.vacrel = vacrel;
26012588
vhs.last_block = InvalidBlockNumber;
26022589
vhs.next_tupindex = 0;
2603-
pgsr = pg_streaming_read_alloc(512, (uintptr_t) &vhs,
2604-
vacuum_heap_pgsr_next,
2605-
vacuum_pgsr_release);
2590+
pgsr = pg_streaming_read_buffer_alloc(512,
2591+
sizeof(VacuumHeapBlockState),
2592+
(uintptr_t) &vhs,
2593+
vacrel->bstrategy,
2594+
vacuum_heap_pgsr_next);
26062595
while (true)
26072596
{
2608-
VacuumHeapBlockState *bs = (VacuumHeapBlockState *) pg_streaming_read_get_next(pgsr);
2597+
VacuumHeapBlockState *bs;
26092598
Page page;
26102599
Size freespace;
2600+
Buffer buffer;
26112601

2612-
if (bs == NULL)
2602+
buffer = pg_streaming_read_buffer_get_next(pgsr, (void **) &bs);
2603+
if (!BufferIsValid(buffer))
26132604
break;
26142605

26152606
Assert(bs->start_tupindex == index);
@@ -2623,15 +2614,15 @@ lazy_vacuum_heap_rel(LVRelState *vacrel)
26232614
visibilitymap_pin(vacrel->rel, bs->blkno, &vmbuffer);
26242615

26252616
/* We need a non-cleanup exclusive lock to mark dead_items unused */
2626-
LockBuffer(bs->buffer, BUFFER_LOCK_EXCLUSIVE);
2627-
index = lazy_vacuum_heap_page(vacrel, bs->blkno, bs->buffer, index,
2617+
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
2618+
index = lazy_vacuum_heap_page(vacrel, bs->blkno, buffer, index,
26282619
vmbuffer);
26292620

26302621
/* Now that we've compacted the page, record its available space */
2631-
page = BufferGetPage(bs->buffer);
2622+
page = BufferGetPage(buffer);
26322623
freespace = PageGetHeapFreeSpace(page);
26332624

2634-
UnlockReleaseBuffer(bs->buffer);
2625+
UnlockReleaseBuffer(buffer);
26352626
RecordPageWithFreeSpace(vacrel->rel, bs->blkno, freespace);
26362627
vacuumed_pages++;
26372628

src/backend/access/nbtree/nbtree.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,7 @@ btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
897897

898898
static BlockNumber
899899
btvacuumscan_pgsr_next(PgStreamingRead *pgsr, uintptr_t pgsr_private,
900+
void *io_private,
900901
Relation *rel, ForkNumber *fork, ReadBufferMode *mode)
901902
{
902903
BTVacState *vstate = (BTVacState *) pgsr_private;
@@ -1030,14 +1031,15 @@ btvacuumscan(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
10301031
vstate.nextblock = scanblkno;
10311032
vstate.num_pages = num_pages;
10321033

1033-
pgsr = pg_streaming_read_buffer_alloc(512, (uintptr_t) &vstate,
1034+
pgsr = pg_streaming_read_buffer_alloc(512, 0,
1035+
(uintptr_t) &vstate,
10341036
vstate.info->strategy,
10351037
btvacuumscan_pgsr_next);
10361038

10371039
/* Iterate over pages, then loop back to recheck length */
10381040
for (; scanblkno < num_pages; scanblkno++)
10391041
{
1040-
Buffer buf = pg_streaming_read_get_next(pgsr);
1042+
Buffer buf = pg_streaming_read_buffer_get_next(pgsr, NULL);
10411043

10421044
Assert(BufferIsValid(buf));
10431045
Assert(BufferGetBlockNumber(buf) == scanblkno);

0 commit comments

Comments
 (0)