8000 Feature/aql item block compression by jsteemann · Pull Request #6514 · arangodb/arangodb · GitHub
[go: up one dir, main page]

Skip to content

Feature/aql item block compression #6514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 18, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
compress repeated values in AqlItemBlocks
  • Loading branch information
jsteemann committed Sep 15, 2018
commit bcfb8b9b17b09fa71dc6d696973bc5d4315497e4
320 changes: 228 additions & 92 deletions arangod/Aql/AqlItemBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,78 +92,146 @@ AqlItemBlock::AqlItemBlock(ResourceMonitor* resourceMonitor, VPackSlice const sl

VPackArrayIterator dataIterator(data);
VPackArrayIterator rawIterator(raw);

auto storeSingleValue = [this](size_t row, size_t column, VPackArrayIterator& it, std::vector<AqlValue>& madeHere) {
AqlValue a(it.value());
it.next();
try {
setValue(row, column, a); // if this throws, a is destroyed again
} catch (...) {
a.destroy();
throw;
}
madeHere.emplace_back(a);
};

enum RunType {
NoRun = 0,
EmptyRun,
NextRun,
PositionalRun
};

int64_t runLength = 0;
size_t tablePos = 0;
RunType runType = NoRun;

try {
// skip the first two records
rawIterator.next();
rawIterator.next();
int64_t emptyRun = 0;


for (RegisterId column = 0; column < _nrRegs; column++) {
for (size_t i = 0; i < _nrItems; i++) {
if (emptyRun > 0) {
emptyRun--;
} else {
VPackSlice dataEntry = dataIterator.value();
dataIterator.next();
if (!dataEntry.isNumber()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"data must contain only numbers");
}
int64_t n = dataEntry.getNumericValue<int64_t>();
if (n == 0) {
// empty, do nothing here
} else if (n == -1) {
// empty run:
VPackSlice runLength = dataIterator.value();
dataIterator.next();
TRI_ASSERT(runLength.isNumber());
emptyRun = runLength.getNumericValue<int64_t>();
emptyRun--;
} else if (n == -2) {
// a range
VPackSlice lowBound = dataIterator.value();
dataIterator.next();
VPackSlice highBound = dataIterator.value();
dataIterator.next();

int64_t low =
VelocyPackHelper::getNumericValue<int64_t>(lowBound, 0);
int64_t high =
VelocyPackHelper::getNumericValue<int64_t>(highBound, 0);
AqlValue a(low, high);
try {
setValue(i, column, a);
} catch (...) {
a.destroy();
throw;
if (runLength > 0) {
switch (runType) {
case EmptyRun:
// nothing to do
break;

case NextRun:
storeSingleValue(i, column, rawIterator, madeHere);
break;

case PositionalRun:
TRI_ASSERT(tablePos < madeHere.size());
setValue(i, column, madeHere[tablePos]);
break;

case NoRun: {
TRI_ASSERT(false);
}
} else if (n == 1) {
// a VelocyPack value
AqlValue a(rawIterator.value());
rawIterator.next();
try {
setValue(i, column, a); // if this throws, a is destroyed again
} catch (...) {
a.destroy();
throw;
}

--runLength;
if (runLength == 0) {
runType = NoRun;
tablePos = 0;
}

continue;
}

VPackSlice dataEntry = dataIterator.value();
dataIterator.next();
if (!dataEntry.isNumber()) {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"data must contain only numbers");
}

int64_t n = dataEntry.getNumericValue<int64_t>();
if (n == 0) {
// empty, do nothing here
} else if (n == 1) {
// a VelocyPack value
storeSingleValue(i, column, rawIterator, madeHere);
} else if (n == -1 || n == -3 || n == -4) {
// -1: empty run, -3: run of "next" values, -4: run of positional values
VPackSlice v = dataIterator.value();
dataIterator.next();
TRI_ASSERT(v.isNumber());
runLength = v.getNumericValue<int64_t>();
runLength--;
switch (n) {
case -1:
runType = EmptyRun;
break;

case -3:
runType = NextRun;
storeSingleValue(i, column, rawIterator, madeHere);
break;

case -4: {
runType = PositionalRun;
VPackSlice v = dataIterator.value();
dataIterator.next();
TRI_ASSERT(v.isNumber());
tablePos = v.getNumericValue<size_t>();
if (tablePos >= madeHere.size()) {
// safeguard against out-of-bounds accesses
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"found undefined data value");
}

setValue(i, column, madeHere[tablePos]);
}
madeHere.emplace_back(a);
} else if (n >= 2) {
setValue(i, column, madeHere[static_cast<size_t>(n)]);
// If this throws, all is OK, because it was already put into
// the block elsewhere.
} else {
}
} else if (n == -2) {
// a range
VPackSlice lowBound = dataIterator.value();
dataIterator.next();
VPackSlice highBound = dataIterator.value();
dataIterator.next();

int64_t low =
VelocyPackHelper::getNumericValue<int64_t>(lowBound, 0);
int64_t high =
VelocyPackHelper::getNumericValue<int64_t>(highBound, 0);
emplaceValue(i, column, low, high);
} else if (n >= 2) {
if (static_cast<size_t>(n) >= madeHere.size()) {
// safeguard against out-of-bounds accesses
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"found undefined data value");
}

setValue(i, column, madeHere[static_cast<size_t>(n)]);
// If this throws, all is OK, because it was already put into
// the block elsewhere.
} else {
THROW_ARANGO_EXCEPTION_MESSAGE(TRI_ERROR_INTERNAL,
"found invalid data encoding value");
}
}
}
} catch (...) {
destroy();
throw;
}

TRI_ASSERT(runLength == 0);
TRI_ASSERT(runType == NoRun);
}

/// @brief destroy the block, used in the destructor and elsewhere
Expand Down Expand Up @@ -496,18 +564,27 @@ AqlItemBlock* AqlItemBlock::concatenate(ResourceMonitor* resourceMonitor,
/// from the first column (top to bottom) and going right.
/// Each entry found is encoded in the following way:
/// 0 means a single empty entry
/// -1 followed by a positive integer N (encoded as number)
/// means a run of that many empty entries
/// -2 followed by two numbers LOW and HIGH means a range
/// and LOW and HIGH are the boundaries (inclusive)
/// 1 means a JSON entry at the "next" position in "raw"
/// the "next" position starts with 2 and is increased
/// by one for every 1 found in data
/// integer values >= 2 mean a JSON entry, in this
/// case the "raw" list contains an entry in the
/// corresponding position
/// -1 followed by a positive integer N (encoded as number)
/// means a run of that many empty entries. this is a
/// compression for multiple "0" entries
/// -2 followed by two numbers LOW and HIGH means a range
/// and LOW and HIGH are the boundaries (inclusive)
/// -3 followed by a positive integer N (encoded as number)
/// means a run of that many JSON entries which can
/// be found at the "next" position in "raw". this is
/// a compression for multiple "1" entries
/// -4 followed by a positive integer N (encoded as number)
/// and followed by a positive integer P (encoded as number)
/// means a run of that many JSON entries which can
/// be found in the "raw" list at the position P
/// 1 means a JSON entry at the "next" position in "raw"
/// the "next" position starts with 2 and is increased
/// by one for every 1 found in data
/// integer values >= 2 mean a JSON entry, in this
/// case the "raw" list contains an entry in the
/// corresponding position
/// "raw": List of actual values, positions 0 and 1 are always null
/// such that actual indices start at 2
/// such that actual indices start at 2
void AqlItemBlock::toVelocyPack(transaction::Methods* trx,
VPackBuilder& result) const {
VPackOptions options(VPackOptions::Defaults);
Expand All @@ -519,57 +596,116 @@ void AqlItemBlock::toVelocyPack(transaction::Methods* trx,
// Two nulls in the beginning such that indices start with 2
raw.add(VPackValue(VPackValueType::Null));
raw.add(VPackValue(VPackValueType::Null));

std::unordered_map<AqlValue, size_t> table; // remember duplicates

result.add("nrItems", VPackValue(_nrItems));
result.add("nrRegs", VPackValue(_nrRegs));
result.add("error", VPackValue(false));
// Backwards compatbility 3.3
result.add("exhausted", VPackValue(false));
result.add("data", VPackValue(VPackValueType::Array));

size_t emptyCount = 0; // here we count runs of empty AqlValues
enum State {
Empty, // saw an empty value
Range, // saw a range value
Next, // saw a previously unknown value
Positional, // saw a value previously encountered
};

auto commitEmpties = [&result, &emptyCount]() { // this commits an empty run to the result
if (emptyCount > 0) {
if (emptyCount == 1) {
result.add(VPackValue(0));
} else {
result.add(VPackValue(-1));
result.add(VPackValue(emptyCount));
}
emptyCount = 0;
std::unordered_map<AqlValue, size_t> table; // remember duplicates
size_t lastTablePos = 0;
State lastState = Positional;

State currentState = Positional;
size_t runLength = 0;
size_t tablePos = 0;

result.add("data", VPackValue(VPackValueType::Array));

// write out data buffered for repeated "empty" or "next" values
auto writeBuffered = [](State lastState, size_t lastTablePos, VPackBuilder& result, size_t runLength) {
if (lastState == Range) {
return;
}

if (lastState == Positional) {
if (lastTablePos >= 2) {
if (runLength == 1) {
result.add(VPackValue(lastTablePos));
} else {
result.add(VPackValue(-4));
result.add(VPackValue(runLength));
result.add(VPackValue(lastTablePos));
}
}
} else {
TRI_ASSERT(lastState == Empty || lastState == Next);
if (runLength == 1) {
// saw exactly one value
result.add(VPackValue(lastState == Empty ? 0 : 1));
} else {
// saw multiple values
result.add(VPackValue(lastState == Empty ? -1 : -3));
result.add(VPackValue(runLength));

}}
};

size_t pos = 2; // write position in raw
for (RegisterId column = 0; column < _nrRegs; column++) {
for (size_t i = 0; i < _nrItems; i++) {
AqlValue const& a(_data[i * _nrRegs + column]);

// determine current state
if (a.isEmpty()) {
emptyCount++;
currentState = Empty;
} else if (a.isRange()) {
currentState = Range;
} else {
commitEmpties();
if (a.isRange()) {
result.add(VPackValue(-2));
result.add(VPackValue(a.range()->_low));
result.add(VPackValue(a.range()->_high));
} else {
auto it = table.find(a);
auto it = table.find(a);

if (it == table.end()) {
a.toVelocyPack(trx, raw, false);
result.add(VPackValue(1));
table.emplace(a, pos++);
} else {
result.add(VPackValue(it->second));
if (it == table.end()) {
currentState = Next;
a.toVelocyPack(trx, raw, false);
table.emplace(a, pos++);
} else {
currentState = Positional;
tablePos = it->second;
TRI_ASSERT(tablePos >= 2);
if (lastState != Positional) {
lastTablePos = tablePos;
}
}
}

// handle state change
if (currentState != lastState ||
(currentState == Positional && tablePos != lastTablePos)) {
// write out remaining buffered data in case of a state change
writeBuffered(lastState, lastTablePos, result, runLength);

lastTablePos = 0;
lastState = currentState;
runLength = 0;
}

switch (currentState) {
case Empty:
case Next:
case Positional:
++runLength;
lastTablePos = tablePos;
break;

case Range:
result.add(VPackValue(-2));
result.add(VPackValue(a.range()->_low));
result.add(VPackValue(a.range()->_high));
break;
}
}
}
commitEmpties();

// write out any remaining buffered data
writeBuffered(lastState, lastTablePos, result, runLength);

result.close(); // closes "data"

Expand Down
Loading
0