Skip to content

Commit 234b3de

Browse files
committed
Refactor DbContext
Fix DbTable::upsertRow bug Fix merge remaining wr segment bug Fix Trb upsert log bug
1 parent bfc5f73 commit 234b3de

11 files changed

+570
-396
lines changed

src/terark/db/db_context.cpp

+5-3
Original file line numberDiff line numberDiff line change
@@ -212,9 +212,11 @@ IndexIterator* DbContext::getIndexIterNoLock(size_t segIdx, size_t indexId) {
212212
void DbContext::debugCheckUnique(fstring row, size_t uniqueIndexId) {
213213
assert(this->segArrayUpdateSeq == m_tab->m_segArrayUpdateSeq);
214214
const Schema& indexSchema = m_tab->getIndexSchema(uniqueIndexId);
215-
m_tab->m_schema->m_rowSchema->parseRow(row, &cols1);
216-
indexSchema.selectParent(cols1, &key1);
217-
indexSearchExactNoLock(uniqueIndexId, key1, &exactMatchRecIdvec);
215+
auto cols1 = cols.get();
216+
auto key1 = bufs.get();
217+
m_tab->m_schema->m_rowSchema->parseRow(row, cols1.get());
218+
indexSchema.selectParent(*cols1, key1.get());
219+
indexSearchExactNoLock(uniqueIndexId, *key1, &exactMatchRecIdvec);
218220
assert(exactMatchRecIdvec.size() <= 1);
219221
}
220222

src/terark/db/db_context.hpp

+51-11
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,55 @@ namespace terark { namespace db {
1212
typedef boost::intrusive_ptr<class DbTable> DbTablePtr;
1313
typedef boost::intrusive_ptr<class StoreIterator> StoreIteratorPtr;
1414

15+
template<class T, class P>
16+
void DbContextObjCacheFreeExcessMem(T*) {
17+
}
18+
template<class P>
19+
void DbContextObjCacheFreeExcessMem(valvec<byte>* obj) {
20+
static size_t constexpr limit_size = 1024 * 1204;
21+
if (obj->size() > limit_size) {
22+
obj->clear();
23+
}
24+
}
25+
26+
template<class T>
27+
class DbContextObjCache {
28+
private:
29+
struct Wrapper {
30+
DbContextObjCache* owner;
31+
T x;
32+
};
33+
struct CacheItem {
34+
CacheItem(Wrapper *o) : obj(o) {}
35+
CacheItem(CacheItem const &) = delete;
36+
CacheItem(CacheItem &&o) : obj(o.obj) {
37+
o.obj = nullptr;
38+
}
39+
Wrapper* obj;
40+
41+
T* operator->() const { return &obj->x; }
42+
T& operator*() const { return obj->x; }
43+
T* get() const { return &obj->x; }
44+
45+
~CacheItem() {
46+
if (obj) {
47+
DbContextObjCacheFreeExcessMem<T, void>(&obj->x);
48+
obj->owner->pool.emplace_back(obj);
49+
}
50+
}
51+
};
52+
valvec<Wrapper*> pool;
53+
public:
54+
CacheItem get() {
55+
if (pool.empty()) {
56+
return CacheItem(new Wrapper{this});
57+
}
58+
else {
59+
return CacheItem(pool.pop_val());
60+
}
61+
}
62+
};
63+
1564
class TERARK_DB_DLL DbContextLink : public RefCounter {
1665
friend class DbTable;
1766
protected:
@@ -105,18 +154,9 @@ class TERARK_DB_DLL DbContext : public DbContextLink {
105154
valvec<llong> m_rowNumVec; // copy of DbTable::m_rowNumVec
106155
llong m_mySnapshotVersion;
107156
std::string errMsg;
108-
valvec<byte> buf1;
109-
valvec<byte> buf2;
110-
valvec<byte> row1;
111-
valvec<byte> row2;
112-
valvec<byte> key1;
113-
valvec<byte> key2;
114-
valvec<byte> userBuf; // TerarkDB will not use userBuf
115-
valvec<byte> trbBuf;
157+
DbContextObjCache<valvec<byte>> bufs;
158+
DbContextObjCache<ColumnVec> cols;
116159
valvec<uint32_t> offsets;
117-
ColumnVec cols1;
118-
ColumnVec cols2;
119-
ColumnVec trbCols;
120160
valvec<llong> exactMatchRecIdvec;
121161
boost::intrusive_ptr<RefCounter> trbLog;
122162
size_t regexMatchMemLimit;

src/terark/db/db_segment.cpp

+76-57
Original file line numberDiff line numberDiff line change
@@ -440,48 +440,51 @@ const {
440440
void
441441
ColgroupSegment::getValueByPhysicId(size_t id, valvec<byte>* val, DbContext* ctx)
442442
const {
443+
auto cols1 = ctx->cols.get();
444+
auto cols2 = ctx->cols.get();
445+
auto buf1 = ctx->bufs.get();
443446
val->risk_set_size(0);
444-
ctx->buf1.risk_set_size(0);
445-
ctx->cols1.erase_all();
447+
cols1->erase_all();
448+
buf1->risk_set_size(0);
446449

447450
// getValueAppend to ctx->buf1
448451
const size_t colgroupNum = m_colgroups.size();
449452
for (size_t i = 0; i < colgroupNum; ++i) {
450453
const Schema& iSchema = m_schema->getColgroupSchema(i);
451454
if (iSchema.m_keepCols.has_any1()) {
452-
size_t oldsize = ctx->buf1.size();
453-
m_colgroups[i]->getValueAppend(id, &ctx->buf1, ctx);
454-
iSchema.parseRowAppend(ctx->buf1, oldsize, &ctx->cols1);
455+
size_t oldsize = buf1->size();
456+
m_colgroups[i]->getValueAppend(id, buf1.get(), ctx);
457+
iSchema.parseRowAppend(*buf1, oldsize, cols1.get());
455458
}
456459
else {
457-
ctx->cols1.grow(iSchema.columnNum());
460+
cols1->grow(iSchema.columnNum());
458461
}
459462
}
460-
assert(ctx->cols1.size() == m_schema->m_colgroupSchemaSet->m_flattenColumnNum);
463+
assert(cols1->size() == m_schema->m_colgroupSchemaSet->m_flattenColumnNum);
461464

462465
// combine columns to ctx->cols2
463466
size_t baseColumnId = 0;
464-
ctx->cols2.m_base = ctx->cols1.m_base;
465-
ctx->cols2.m_cols.resize_fill(m_schema->columnNum());
467+
cols2->m_base = cols1->m_base;
468+
cols2->m_cols.resize_fill(m_schema->columnNum());
466469
for (size_t i = 0; i < colgroupNum; ++i) {
467470
const Schema& iSchema = m_schema->getColgroupSchema(i);
468471
for (size_t j = 0; j < iSchema.columnNum(); ++j) {
469472
if (iSchema.m_keepCols[j]) {
470473
size_t parentColId = iSchema.parentColumnId(j);
471-
ctx->cols2.m_cols[parentColId] = ctx->cols1.m_cols[baseColumnId + j];
474+
cols2->m_cols[parentColId] = cols1->m_cols[baseColumnId + j];
472475
}
473476
}
474477
baseColumnId += iSchema.columnNum();
475478
}
476479

477480
#if !defined(NDEBUG)
478-
for (size_t i = 0; i < ctx->cols2.size(); ++i) {
479-
assert(ctx->cols2.m_cols[i].isValid());
481+
for (size_t i = 0; i < cols2->size(); ++i) {
482+
assert(cols2->m_cols[i].isValid());
480483
}
481484
#endif
482485

483486
// combine to val
484-
m_schema->m_rowSchema->combineRow(ctx->cols2, val);
487+
m_schema->m_rowSchema->combineRow(*cols2, val);
485488
}
486489

487490
void
@@ -610,22 +613,24 @@ ColgroupSegment::selectColumnsByPhysicId(llong physicId,
610613
valvec<byte>* colsData, DbContext* ctx)
611614
const {
612615
assert(physicId >= 0);
616+
auto cols = ctx->cols.get();
617+
auto buf = ctx->bufs.get();
613618
colsData->erase_all();
614-
ctx->buf1.erase_all();
619+
buf->erase_all();
615620
ctx->offsets.resize_fill(m_colgroups.size(), UINT32_MAX);
616621
auto offsets = ctx->offsets.data();
617622
for(size_t i = 0; i < colsNum; ++i) {
618623
assert(colsId[i] < m_schema->m_rowSchema->columnNum());
619624
auto cp = m_schema->m_colproject[colsId[i]];
620625
size_t colgroupId = cp.colgroupId;
621-
size_t oldsize = ctx->buf1.size();
626+
size_t oldsize = buf->size();
622627
const Schema& schema = m_schema->getColgroupSchema(colgroupId);
623628
if (offsets[colgroupId] == UINT32_MAX) {
624-
offsets[colgroupId] = ctx->cols1.size();
625-
m_colgroups[colgroupId]->getValueAppend(physicId, &ctx->buf1, ctx);
626-
schema.parseRowAppend(ctx->buf1, oldsize, &ctx->cols1);
629+
offsets[colgroupId] = cols->size();
630+
m_colgroups[colgroupId]->getValueAppend(physicId, buf.get(), ctx);
631+
schema.parseRowAppend(*buf, oldsize, cols.get());
627632
}
628-
fstring d = ctx->cols1[offsets[colgroupId] + cp.subColumnId];
633+
fstring d = (*cols)[offsets[colgroupId] + cp.subColumnId];
629634
if (i < colsNum-1)
630635
schema.projectToNorm(d, cp.subColumnId, colsData);
631636
else
@@ -671,10 +676,12 @@ const {
671676
m_colgroups[colgroupId]->getValue(physicId, colsData, ctx);
672677
}
673678
else {
674-
m_colgroups[colgroupId]->getValue(physicId, &ctx->buf1, ctx);
675-
schema.parseRow(ctx->buf1, &ctx->cols1);
679+
auto cols = ctx->cols.get();
680+
auto buf = ctx->bufs.get();
681+
m_colgroups[colgroupId]->getValue(physicId, buf.get(), ctx);
682+
schema.parseRow(*buf, cols.get());
676683
colsData->erase_all();
677-
colsData->append(ctx->cols1[cp.subColumnId]);
684+
colsData->append((*cols)[cp.subColumnId]);
678685
}
679686
}
680687

@@ -1907,23 +1914,24 @@ WritableStore* WritableSegment::getWritableStore() { return this; }
19071914
void
19081915
PlainWritableSegment::getValueAppend(llong recId, valvec<byte>* val, DbContext* ctx)
19091916
const {
1910-
assert(&ctx->buf1 != val);
1911-
assert(&ctx->buf2 != val);
19121917
if (m_schema->m_updatableColgroups.empty()) {
19131918
// m_wrtStore->getValueAppend(recId, val, ctx);
19141919
this->getWrtStoreData(recId, val, ctx);
19151920
}
19161921
else {
1917-
ctx->buf1.erase_all();
1918-
ctx->cols1.erase_all();
1922+
auto cols1 = ctx->cols.get();
1923+
auto cols2 = ctx->cols.get();
1924+
auto buf1 = ctx->bufs.get();
1925+
cols1->erase_all();
1926+
buf1->erase_all();
19191927
// m_wrtStore->getValueAppend(recId, &ctx->buf1, ctx);
1920-
this->getWrtStoreData(recId, &ctx->buf1, ctx);
1928+
this->getWrtStoreData(recId, buf1.get(), ctx);
19211929
const size_t ProtectCnt = 100;
1922-
SpinRwLock lock;
1930+
SpinRwLock lock;
19231931
if (!m_isFreezed && m_isDel.unused() < ProtectCnt) {
19241932
lock.acquire(m_segMutex, false);
19251933
}
1926-
this->getCombineAppend(recId, val, ctx->buf1, ctx->cols1, ctx->cols2);
1934+
this->getCombineAppend(recId, val, *buf1, *cols1, *cols2);
19271935
}
19281936
}
19291937

@@ -2006,7 +2014,8 @@ WritableSegment::indexSearchExactAppend(size_t mySegIdx, size_t indexId,
20062014
assert(!m_hasLockFreePointSearch);
20072015
IndexIterator* iter = ctx->getIndexIterNoLock(mySegIdx, indexId);
20082016
llong recId = -1;
2009-
int cmp = iter->seekLowerBound(key, &recId, &ctx->key2);
2017+
auto key2 = ctx->bufs.get();
2018+
int cmp = iter->seekLowerBound(key, &recId, key2.get());
20102019
if (cmp == 0) {
20112020
// now IndexIterator::m_isUniqueInSchema is just for this quick check
20122021
// faster than m_schema->getIndexSchema(indexId).m_isUnique
@@ -2028,7 +2037,7 @@ WritableSegment::indexSearchExactAppend(size_t mySegIdx, size_t indexId,
20282037
size_t oldsize = recIdvec->size();
20292038
do {
20302039
recIdvec->push_back(recId);
2031-
} while (iter->increment(&recId, &ctx->key2) && key == ctx->key2);
2040+
} while (iter->increment(&recId, key2.get()) && key == *key2);
20322041
size_t i = oldsize, j = oldsize;
20332042
size_t n = recIdvec->size();
20342043
llong* p = recIdvec->data();
@@ -2110,18 +2119,20 @@ void PlainWritableSegment::selectColumnsByWhole(llong recId,
21102119
const {
21112120
assert(m_schema->m_updatableColgroups.empty());
21122121
colsData->erase_all();
2113-
// this->getValue(recId, &ctx->buf1, ctx);
2114-
this->getWrtStoreData(recId, &ctx->buf1, ctx);
2122+
auto cols = ctx->cols.get();
2123+
auto buf = ctx->bufs.get();
2124+
// this->getValue(recId, buf.get(), ctx);
2125+
this->getWrtStoreData(recId, buf.get(), ctx);
21152126
const Schema& schema = *m_schema->m_rowSchema;
2116-
schema.parseRow(ctx->buf1, &ctx->cols1);
2117-
assert(ctx->cols1.size() == schema.columnNum());
2127+
schema.parseRow(*buf, cols.get());
2128+
assert(cols->size() == schema.columnNum());
21182129
for(size_t i = 0; i < colsNum; ++i) {
21192130
size_t columnId = colsId[i];
21202131
assert(columnId < schema.columnNum());
21212132
if (i < colsNum-1)
2122-
schema.projectToNorm(ctx->cols1[columnId], columnId, colsData);
2133+
schema.projectToNorm((*cols)[columnId], columnId, colsData);
21232134
else
2124-
schema.projectToLast(ctx->cols1[columnId], columnId, colsData);
2135+
schema.projectToLast((*cols)[columnId], columnId, colsData);
21252136
}
21262137
}
21272138

@@ -2132,7 +2143,8 @@ const {
21322143
colsData->erase_all();
21332144
const SchemaConfig& sconf = *m_schema;
21342145
const Schema& rowSchema = *sconf.m_rowSchema;
2135-
ctx->cols1.erase_all();
2146+
auto cols1 = ctx->cols.get();
2147+
cols1->erase_all();
21362148
for(size_t i = 0; i < colsNum; ++i) {
21372149
size_t columnId = colsIdvec[i];
21382150
assert(columnId < rowSchema.columnNum());
@@ -2153,14 +2165,15 @@ const {
21532165
}
21542166
else {
21552167
schema = sconf.m_wrtSchema.get();
2156-
if (ctx->cols1.empty()) {
2168+
if (cols1->empty()) {
2169+
auto buf1 = ctx->bufs.get();
21572170
// m_wrtStore->getValue(recId, &ctx->buf1, ctx);
2158-
this->getWrtStoreData(recId, &ctx->buf1, ctx);
2159-
schema->parseRow(ctx->buf1, &ctx->cols1);
2171+
this->getWrtStoreData(recId, buf1.get(), ctx);
2172+
schema->parseRow(*buf1, cols1.get());
21602173
}
21612174
size_t subColumnId = sconf.m_rowSchemaColToWrtCol[columnId];
21622175
assert(subColumnId < sconf.m_wrtSchema->columnNum());
2163-
fstring coldata = ctx->cols1[subColumnId];
2176+
fstring coldata = (*cols1)[subColumnId];
21642177
if (i < colsNum-1)
21652178
rowSchema.projectToNorm(coldata, columnId, colsData);
21662179
else
@@ -2188,21 +2201,23 @@ const {
21882201
store->getValue(recId, colsData, ctx);
21892202
}
21902203
else {
2204+
auto cols = ctx->cols.get();
2205+
auto buf = ctx->bufs.get();
21912206
const Schema& wrtSchema = *m_schema->m_wrtSchema;
21922207
// m_wrtStore->getValue(recId, &ctx->buf1, ctx);
2193-
this->getWrtStoreData(recId, &ctx->buf1, ctx);
2194-
wrtSchema.parseRow(ctx->buf1, &ctx->cols1);
2195-
assert(ctx->cols1.size() == wrtSchema.columnNum());
2208+
this->getWrtStoreData(recId, buf.get(), ctx);
2209+
wrtSchema.parseRow(*buf, cols.get());
2210+
assert(cols->size() == wrtSchema.columnNum());
21962211
colsData->erase_all();
21972212
if (m_schema->m_updatableColgroups.empty()) {
21982213
assert(m_schema->m_wrtSchema == m_schema->m_rowSchema);
21992214
assert(m_schema->m_rowSchemaColToWrtCol.empty());
2200-
wrtSchema.projectToLast(ctx->cols1[columnId], columnId, colsData);
2215+
wrtSchema.projectToLast((*cols)[columnId], columnId, colsData);
22012216
}
22022217
else {
22032218
size_t wrtColumnId = m_schema->m_rowSchemaColToWrtCol[columnId];
22042219
assert(wrtColumnId < wrtSchema.columnNum());
2205-
wrtSchema.projectToLast(ctx->cols1[wrtColumnId], columnId, colsData);
2220+
wrtSchema.projectToLast((*cols)[wrtColumnId], columnId, colsData);
22062221
}
22072222
}
22082223
}
@@ -2402,15 +2417,17 @@ llong PlainWritableSegment::append(fstring row, DbContext* ctx) {
24022417
return store->append(row, ctx);
24032418
}
24042419
else {
2405-
sconf.m_rowSchema->parseRow(row, &ctx->cols1);
2406-
sconf.m_wrtSchema->selectParent(ctx->cols1, &ctx->buf1);
2407-
llong id1 = store->append(ctx->buf1, ctx);
2420+
auto cols = ctx->cols.get();
2421+
auto buf = ctx->bufs.get();
2422+
sconf.m_rowSchema->parseRow(row, cols.get());
2423+
sconf.m_wrtSchema->selectParent(*cols, buf.get());
2424+
llong id1 = store->append(*buf, ctx);
24082425
for (size_t colgroupId : sconf.m_updatableColgroups) {
24092426
store = m_colgroups[colgroupId]->getAppendableStore();
24102427
assert(nullptr != store);
24112428
const Schema& schema = sconf.getColgroupSchema(colgroupId);
2412-
schema.selectParent(ctx->cols1, &ctx->buf1);
2413-
llong id2 = store->append(ctx->buf1, ctx);
2429+
schema.selectParent(*cols, buf.get());
2430+
llong id2 = store->append(*buf, ctx);
24142431
TERARK_RT_assert(id1 == id2, std::logic_error);
24152432
}
24162433
return id1;
@@ -2426,15 +2443,17 @@ void PlainWritableSegment::update(llong id, fstring row, DbContext* ctx) {
24262443
store->update(id, row, ctx);
24272444
}
24282445
else {
2429-
sconf.m_rowSchema->parseRow(row, &ctx->cols1);
2430-
sconf.m_wrtSchema->selectParent(ctx->cols1, &ctx->buf1);
2431-
store->update(id, ctx->buf1, ctx);
2446+
auto cols = ctx->cols.get();
2447+
auto buf = ctx->bufs.get();
2448+
sconf.m_rowSchema->parseRow(row, cols.get());
2449+
sconf.m_wrtSchema->selectParent(*cols, buf.get());
2450+
store->update(id, *buf, ctx);
24322451
for (size_t colgroupId : sconf.m_updatableColgroups) {
24332452
store = m_colgroups[colgroupId]->getUpdatableStore();
24342453
assert(nullptr != store);
24352454
const Schema& schema = sconf.getColgroupSchema(colgroupId);
2436-
schema.selectParent(ctx->cols1, &ctx->buf1);
2437-
store->update(id, ctx->buf1, ctx);
2455+
schema.selectParent(*cols, buf.get());
2456+
store->update(id, *buf, ctx);
24382457
}
24392458
}
24402459
}

0 commit comments

Comments
 (0)