8000 buf-fix/arangosearch-columnstore2 (#15122) · open-bigdata/arangodb@02a72fb · GitHub
[go: up one dir, main page]

Skip to content

Commit 02a72fb

Browse files
authored
buf-fix/arangosearch-columnstore2 (arangodb#15122)
* backport from iresearch upstream * backport from iresearch upstream
1 parent d8a6f02 commit 02a72fb

18 files changed

+635
-239
lines changed

3rdParty/iresearch/core/formats/columnstore2.cpp

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,33 @@ namespace columnstore2 {
996996
// --SECTION-- column implementation
997997
// -----------------------------------------------------------------------------
998998

999+
void column::prepare(doc_id_t key) {
1000+
assert(!sealed_);
1001+
1002+
if (IRS_LIKELY(key > pend_)) {
1003+
if (addr_table_.full()) {
1004+
flush_block();
1005+
}
1006+
1007+
prev_ = pend_;
1008+
pend_ = key;
1009+
docs_writer_.push_back(key);
1010+
addr_table_.push_back(data_.stream.file_pointer());
1011+
}
1012+
}
1013+
1014+
void column::reset() {
1015+
if (addr_table_.empty()) {
1016+
return;
1017+
}
1018+
1019+
[[maybe_unused]] const bool res = docs_writer_.erase(pend_);
1020+
assert(res);
1021+
data_.stream.seek(addr_table_.back());
1022+
addr_table_.pop_back();
1023+
pend_ = prev_;
1024+
}
1025+
9991026
void column::flush_block() {
10001027
assert(!addr_table_.empty());
10011028
assert(ctx_.data_out);
@@ -1043,6 +1070,10 @@ void column::flush_block() {
10431070
fixed_length_ = false;
10441071
}
10451072

1073+
#ifdef IRESEARCH_DEBUG
1074+
block.size = data_.file.length();
1075+
#endif
1076+
10461077
if (data_.file.length()) {
10471078
block.data += data_out.file_pointer();
10481079

@@ -1082,9 +1113,7 @@ void column::finish(index_output& index_out) {
10821113
assert(ctx_.data_out);
10831114

10841115
docs_writer_.finish();
1085-
if (!addr_table_.empty()) {
1086-
flush_block();
1087-
}
1116+
flush();
10881117
docs_.stream.flush();
10891118

10901119
column_header hdr;
@@ -1115,6 +1144,18 @@ void column::finish(index_output& index_out) {
11151144
if (0 == prev_avg_) {
11161145
hdr.type = ColumnType::MASK;
11171146
} else if (ctx_.consolidation) {
1147+
#ifdef IRESEARCH_DEBUG
1148+
// ensure blocks are dense after consolidation
1149+
auto prev = std::begin(blocks_);
1150+
if (prev != std::end(blocks_)) {
1151+
auto next = std::next(prev);
1152+
1153+
for (; next != std::end(blocks_); ++next) {
1154+
assert(next->data == prev->size + prev->data);
1155+
prev = next;
1156+
}
1157+
}
1158+
#endif
11181159
hdr.type = ColumnType::DENSE_FIXED;
11191160
} else {
11201161
hdr.type = ColumnType::FIXED;
@@ -1203,6 +1244,13 @@ columnstore_writer::column_t writer::push_column(const column_info& info) {
12031244
}
12041245

12051246
const auto id = columns_.size();
1247+
1248+
// in case of consolidation we write columns one-by-one to
1249+
// ensure that blocks from different columns don't interleave
1250+
if (consolidation_ && id) {
1251+
columns_.back().flush();
1252+
}
1253+
12061254
auto& column = columns_.emplace_back(
12071255
column::context{
12081256
alloc_,

3rdParty/iresearch/core/formats/columnstore2.hpp

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ class column final : public irs::column_output {
6363
uint64_t avg;
6464
uint64_t data;
6565
uint64_t last_size;
66+
#ifdef IRESEARCH_DEBUG
67+
uint64_t size;
68+
#endif
6669
uint32_t bits;
6770
}; // column_block
6871

@@ -75,21 +78,19 @@ class column final : public irs::column_output {
7578
deflater_{std::move(deflater)} {
7679
}
7780

78-
void prepare(doc_id_t key) {
79-
if (IRS_LIKELY(key > pend_)) {
80-
if (addr_table_.full()) {
81-
flush_block();
82-
}
81+
void prepare(doc_id_t key);
8382

84-
prev_ = pend_;
85-
pend_ = key;
86-
docs_writer_.push_back(key);
87-
addr_table_.push_back(data_.stream.file_pointer());
88-
}
83+
bool empty() const noexcept {
84+
return addr_table_.empty() && !docs_count_;
8985
}
9086

91-
bool empty() const noexcept {
92-
return addr_table_.empty();
87+
void flush() {
88+
if (!addr_table_.empty()) {
89+
flush_block();
90+
#ifdef IRESEARCH_DEBUG
91+
sealed_ = true;
92+
#endif
93+
}
9394
}
9495

9596
void finish(index_output& index_out);
@@ -102,17 +103,7 @@ class column final : public irs::column_output {
102103
data_.stream.write_bytes(b, size);
103104
}
104105

105-
virtual void reset() override {
106-
if (empty()) {
107-
return;
108-
}
109-
110-
[[maybe_unused]] const bool res = docs_writer_.erase(pend_);
111-
assert(res);
112-
data_.stream.seek(addr_table_.back());
113-
addr_table_.pop_back();
114-
pend_ = prev_;
115-
}
106+
virtual void reset() override;
116107

117108
private:
118109
//////////////////////////////////////////////////////////////////////////////
@@ -180,6 +171,9 @@ class column final : public irs::column_output {
180171
doc_id_t prev_{}; // last committed doc_id_t
181172
doc_id_t pend_{}; // last pushed doc_id_t
182173
bool fixed_length_{true};
174+
#ifdef IRESEARCH_DEBUG
175+
bool sealed_{false};
176+
#endif
183177
}; // column
184178

185179
////////////////////////////////////////////////////////////////////////////////

3rdParty/iresearch/core/index/field_data.cpp

Lines changed: 4 additions & 21 deletions
179B
Original file line numberDiff line numberDiff line change
@@ -1146,10 +1146,12 @@ bool field_data::invert(token_stream& stream, doc_id_t id) {
11461146
fields_data::fields_data(
11471147
const field_features_t& field_features,
11481148
const feature_column_info_provider_t& feature_columns,
1149+
std::deque<cached_column>& cached_features,
11491150
const comparer* comparator /*= nullptr*/)
11501151
: comparator_(comparator),
11511152
field_features_(&field_features),
11521153
feature_columns_(&feature_columns),
1154+
cached_features_(&cached_features),
11531155
byte_writer_(byte_pool_.begin()),
11541156
int_writer_(int_pool_.begin()) {
11551157
}
@@ -1171,7 +1173,7 @@ field_data* fields_data::emplace(
11711173
try {
11721174
fields_.emplace_back(
11731175
name, features, *field_features_,
1174-
*feature_columns_, cached_features_, columns,
1176+
*feature_columns_, *cached_features_, columns,
11751177
byte_writer_, int_writer_,
11761178
index_features, (nullptr != comparator_));
11771179
} catch (...) {
@@ -1231,34 +1233,15 @@ void fields_data::flush(field_writer& fw, flush_state& state) {
12311233
void fields_data::reset() noexcept {
12321234
byte_writer_ = byte_pool_.begin(); // reset position pointer to start of pool
12331235
fields_.clear();
1234-
cached_features_.clear(); // FIXME(@gnusi): we loose all per-column buffers
12351236
fields_map_.clear();
12361237
int_writer_ = int_pool_.begin(); // reset position pointer to start of pool
12371238
}
12381239

1239-
void fields_data::flush_features(
1240-
columnstore_writer& writer,
1241-
const doc_map& docmap,
1242-
sorted_column::flush_buffer_t& buffer) {
1243-
for (auto& feature : cached_features_) {
1244-
assert(feature.id);
1245-
if (IRS_LIKELY(!field_limits::valid(*feature.id))) {
1246-
*feature.id = feature.stream.flush(writer, docmap, buffer);
1247-
}
1248-
}
1249-
}
1250-
12511240
size_t fields_data::memory_active() const noexcept {
12521241
return byte_writer_.pool_offset()
12531242
+ int_writer_.pool_offset() * sizeof(int_block_pool::value_type)
12541243
+ fields_map_.size() * sizeof(fields_map::value_type)
1255-
+ fields_.size() * sizeof(decltype(fields_)::value_type)
1256-
+ std::accumulate(
1257-
std::begin(cached_features_),
1258-
std::end(cached_features_),
1259-
size_t{0},
1260-
[](const auto lhs, const auto& rhs) {
1261-
return lhs + rhs.stream.memory_active(); });
1244+
+ fields_.size() * sizeof(decltype(fields_)::value_type);
12621245
}
12631246

12641247
size_t fields_data::memory_reserved() const noexcept {

3rdParty/iresearch/core/index/field_data.hpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ class IRESEARCH_API fields_data: util::noncopyable {
184184
explicit fields_data(
185185
const field_features_t& field_features,
186186
const feature_column_info_provider_t& feature_columns,
187+
std::deque<cached_column>& cached_features,
187188
const comparer* comparator);
188189

189190
const comparer* comparator() const noexcept {
< 10000 div class="diff-text-inner color-fg-muted">@@ -209,18 +210,13 @@ class IRESEARCH_API fields_data: util::noncopyable {
209210
void flush(field_writer& fw, flush_state& state);
210211
void reset() noexcept;
211212

212-
void flush_features(
213-
columnstore_writer& writer,
214-
const doc_map& docmap,
215-
sorted_column::flush_buffer_t& buffer);
216-
217213
private:
218214
IRESEARCH_API_PRIVATE_VARIABLES_BEGIN
219215
const comparer* comparator_;
220216
const field_features_t* field_features_;
221217
const feature_column_info_provider_t* feature_columns_;
222218
std::deque<field_data> fields_; // pointers remain valid
223-
std::deque<cached_column> cached_features_; // pointers remain valid
219+
std::deque<cached_column>* cached_features_; // pointers remain valid
224220
fields_map fields_map_;
225221
postings_ref_t sorted_postings_;
226222
std::vector<const field_data*> sorted_fields_;

3rdParty/iresearch/core/index/segment_writer.cpp

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,17 +60,20 @@ segment_writer::stored_column::stored_column(
6060
const hashed_string_ref& name,
6161
columnstore_writer& columnstore,
6262
const column_info_provider_t& column_info,
63+
std::deque<cached_column>& cached_columns,
6364
bool cache)
6465
: name(name.c_str(), name.size()),
65-
name_hash(name.hash()),
66-
stream(column_info(name)) {
66+
name_hash(name.hash()) {
67+
auto info = column_info(name);
68+
6769
if (!cache) {
68-
auto& info = stream.info();
6970
std::tie(id, writer) = columnstore.push_column(info);
7071
} else {
71-
writer = [this](irs::doc_id_t doc)-> column_output& {
72-
this->stream.prepare(doc);
73-
return this->stream;
72+
auto& cached = cached_columns.emplace_back(&id, info);
73+
74+
writer = [stream = &cached.stream](irs::doc_id_t doc)-> column_output& {
75+
stream->prepare(doc);
76+
return *stream;
7477
};
7578
}
7679
}
@@ -112,9 +115,17 @@ size_t segment_writer::memory_active() const noexcept {
112115
const auto docs_mask_extra = (0 != (docs_mask_.size() % sizeof(bitvector::word_t)))
113116
? sizeof(bitvector::word_t) : 0;
114117

115-
const auto column_cache_active = std::accumulate(
118+
auto column_cache_active = std::accumulate(
116119
columns_.begin(), columns_.end(), size_t(0),
117120
[](size_t lhs, const stored_column& rhs) noexcept {
121+
return lhs + rhs.name.size() + sizeof(rhs);
122+
});
123+
124+
column_cache_active += std::accumulate(
125+
std::begin(cached_columns_),
126+
std::end(cached_columns_),
127+
column_cache_active,
128+
[](const auto lhs, const auto& rhs) {
118129
return lhs + rhs.stream.memory_active();
119130
});
120131

@@ -129,9 +140,14 @@ size_t segment_writer::memory_reserved() const noexcept {
129140
const auto docs_mask_extra = (0 != (docs_mask_.size() % sizeof(bitvector::word_t)))
130141
? sizeof(bitvector::word_t) : 0;
131142

132-
const auto column_cache_reserved = std::accumulate(
133-
columns_.begin(), columns_.end(), size_t(0),
134-
[](size_t lhs, const stored_column& rhs) noexcept {
143+
auto column_cache_reserved
144+
= columns_.capacity()*sizeof(decltype(columns_)::value_type);
145+
146+
column_cache_reserved += std::accumulate(
147+
std::begin(cached_columns_),
148+
std::end(cached_columns_),
149+
column_cache_reserved,
150+
[](const auto lhs, const auto& rhs) {
135151
return lhs + rhs.stream.memory_reserved();
136152
});
137153

@@ -162,7 +178,7 @@ segment_writer::segment_writer(
162178
const feature_column_info_provider_t& feature_column_info,
163179
const comparer* comparator) noexcept
164180
: sort_(column_info),
165-
fields_(field_features, feature_column_info, comparator),
181+
fields_(field_features, feature_column_info, cached_columns_, comparator),
166182
column_info_(&column_info),
167183
field_features_(&field_features),
168184
dir_(dir),
@@ -206,7 +222,7 @@ column_output& segment_writer::stream(
206222
name,
207223
[this, &name](const auto& ctor){
208224
ctor(name, *col_writer_, *column_info_,
209-
nullptr != fields_.comparator());
225+
cached_columns_, nullptr != fields_.comparator());
210226
})->writer(doc_id);
211227
}
212228

@@ -289,16 +305,14 @@ void segment_writer::flush(index_meta::index_segment_t& segment) {
289305
std::tie(docmap, sort_.id) = sort_.stream.flush(
290306
*col_writer_, doc_id_t(docs_cached()), *fields_.comparator());
291307

308+
// flush all cached columns
292309
irs::sorted_column::flush_buffer_t buffer;
293-
for (auto& column : columns_) {
294-
if (IRS_LIKELY(!field_limits::valid(column.id))) {
295-
// cached column
296-
column.id = column.stream.flush(*col_writer_, docmap, buffer);
310+
for (auto& column : cached_columns_) {
311+
if (IRS_LIKELY(!field_limits::valid(*column.id))) {
312+
*column.id = column.stream.flush(*col_writer_, docmap, buffer);
297313
}
298314
}
299315

300-
fields_.flush_features(*col_writer_, docmap, buffer);
301-
302316
meta.sort = sort_.id; // store sorted column id in segment meta
303317

304318
if (!docmap.empty()) {
@@ -344,7 +358,8 @@ void segment_writer::reset() noexcept {
344358
docs_context_.clear();
345359
docs_mask_.clear();
346360
fields_.reset();
347-
columns_.clear(); // FIXME(@gnusi): we loose all per-column buffers
361+
columns_.clear();
362+
cached_columns_.clear(); // FIXME(@gnusi): we loose all per-column buffers
348363
sort_.stream.clear();
349364

350365
if (col_writer_) {

3rdParty/iresearch/core/index/segment_writer.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,12 +272,12 @@ class IRESEARCH_API segment_writer : util::noncopyable {
272272
const hashed_string_ref& name,
273273
columnstore_writer& columnstore,
274274
const column_info_provider_t& column_info,
275+
std::deque<cached_column>& cached_columns,
275276
bool cache);
276277

277278
std::string name;
278279
size_t name_hash;
279280
columnstore_writer::values_writer_f writer;
280-
mutable irs::sorted_column stream;
281281
mutable field_id id{ field_limits::invalid() };
282282
}; // stored_column
283283

@@ -439,6 +439,7 @@ class IRESEARCH_API segment_writer : util::noncopyable {
439439
void flush_fields(const doc_map& docmap); // flushes indexed fields to directory
440440

441441
IRESEARCH_API_PRIVATE_VARIABLES_BEGIN
442+
std::deque<cached_column> cached_columns_; // pointers remain valid
442443
sorted_column sort_;
443444
update_contexts docs_context_;
444445
bitvector docs_mask_; // invalid/removed doc_ids (e.g. partially indexed due to indexing failure)

0 commit comments

Comments
 (0)
0