-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathConsolidation.cpp
More file actions
136 lines (111 loc) · 4.55 KB
/
Copy pathConsolidation.cpp
File metadata and controls
136 lines (111 loc) · 4.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include <algorithm>
#include "Consolidation.h"
namespace IndexUpdate {
//does int. division and taking correct upper
inline size_t _rfdiv(size_t a, size_t share) {
return /*(a<share) ? 1 : */(a / share + size_t(a % share != 0));
}
class KWaySegmentConsolidator {
std::vector<size_t> &segHeap;
const size_t memSizeInPostings;
size_t lastSize;
ConsolidationStats cost;
//while we can put many into buffer completely
bool roundKWay() {
auto smallest1 = pop();
auto smallest2 = pop();
size_t reads = smallest1 + smallest2;
size_t writes = reads;
unsigned totalReadSeeks = 2;
//if one of the smallest is in mem
if (lastSize && (lastSize == smallest1 || lastSize == smallest2)) {
--totalReadSeeks;
reads -= lastSize;
lastSize = 0;
}
//can't handle even the two smallest
if (reads > memSizeInPostings / 2) { //roll back
push(smallest2);
push(smallest1);
if (writes != reads) //it is possible one was in-mem
lastSize = writes - reads;
return false;
}
while (!segHeap.empty()) {
size_t seg = pop();
if (seg == lastSize) {
lastSize = 0;
}
else {
if (reads + seg > memSizeInPostings / 2) { //too much
push(seg);
break;
}
reads += seg;
++totalReadSeeks;
}
writes += seg;
}
push(writes);
//S,R,W
cost += ConsolidationStats(reads, totalReadSeeks, writes, 1);
return true;
}
size_t pop() {
std::pop_heap(segHeap.begin(), segHeap.end(), std::greater<size_t>());
auto smallest = segHeap.back();
segHeap.pop_back();
return smallest;
}
void push(size_t s) {
segHeap.push_back(s);
std::push_heap(segHeap.begin(), segHeap.end(), std::greater<size_t>());
}
inline size_t writePShare() const { return (memSizeInPostings / 2); }
void round2Way() {
const auto writeShare = writePShare();
const auto readShare = (writeShare / 2);
auto smallest1 = pop();
auto smallest2 = pop();
size_t readSeeks = 0;
readSeeks += _rfdiv(smallest1, readShare);
readSeeks += _rfdiv(smallest2, readShare);
size_t readSeg = smallest1 + smallest2;
size_t writeSeg = smallest1 + smallest2;
size_t writeSeeks = _rfdiv(writeSeg, writeShare);
//fix seeks and reads if in mem
if (lastSize && (lastSize == smallest1 || lastSize == smallest2)) {
readSeg -= lastSize;
readSeeks -= _rfdiv(lastSize, readShare);
lastSize = 0; //this can happen only once per consolidation!
}
push(writeSeg);
cost += ConsolidationStats(readSeg,readSeeks, writeSeg, writeSeeks);
}
public:
KWaySegmentConsolidator(std::vector<size_t> &segments,
size_t lastInMem = 1 /*0 or 1*/, size_t memBufferPostings = 1ULL<<26)
: segHeap(segments), memSizeInPostings(memBufferPostings) {
assert(lastInMem < 2);
lastSize = segHeap.back() * lastInMem;
std::make_heap(segHeap.begin(), segHeap.end(), std::greater<size_t>());
}
ConsolidationStats operator()() {
cost = ConsolidationStats();
//attempt k-way consolidation of all the segments that fit completely into mem
while (segHeap.size() > 1 && roundKWay())
continue;
while (segHeap.size() > 1) //always consolidate two smallest
round2Way();
return cost;
}
};
ConsolidationStats consolidateSegments(std::vector<uint64_t>& segments, unsigned offset) {
assert(segments.size()>1);
std::vector<uint64_t> consolidants(segments.begin() + offset, segments.end());
auto writtenPostings = std::accumulate(segments.begin()+offset,segments.end(),0ull);
segments.erase(segments.begin()+offset,segments.end());
segments.push_back(writtenPostings);
return KWaySegmentConsolidator(consolidants)();
}
}