8000 make use of generateBody in payload and fix some bugs in VppCommTask · lethalbrains/arangodb@a1d6909 · GitHub
[go: up one dir, main page]

Skip to content

Commit a1d6909

Browse files
committed
make use of generateBody in payload and fix some bugs in VppCommTask
1 parent b3fab8c commit a1d6909

File tree

6 files changed

+83
-56
lines changed

6 files changed

+83
-56
lines changed

arangod/GeneralServer/GeneralCommTask.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,11 @@ class GeneralCommTask : public SocketTask, public RequestStatisticsAgent {
143143
void signalTask(TaskData*) override;
144144

145145
protected:
146+
// is called in a loop as long as it returns true
147+
// return false if there is not enought data to do
148+
// any more processing
146149
virtual bool processRead() = 0;
150+
147151
virtual void handleChunk(char const*, size_t) = 0;
148152

149153
protected:

arangod/GeneralServer/VppCommTask.cpp

Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -66,18 +66,30 @@ std::size_t findAndValidateVPacks(char const* vpHeaderStart,
6666
}
6767

6868
std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail(
69-
char* start, char* end, bool isFirstChunk, uint32_t chunk, uint64_t id,
70-
uint32_t totalMessageLength = 0) {
69+
std::vector<VPackSlice> const& slices, bool isFirstChunk, uint32_t chunk,
70+
uint64_t id, uint32_t totalMessageLength = 0) {
7171
using basics::StringBuffer;
7272
bool firstOfMany = false;
73+
74+
// if we have more than one chunk and the chunk is the first
75+
// then we are sending the first in a series. if this condition
76+
// is true we also send extra 8 bytes for the messageLength
77+
// (length of all VPackData)
7378
if (isFirstChunk && chunk > 1) {
7479
firstOfMany = true;
7580
}
7681

82+
// build chunkX -- see VelocyStream Documentaion
7783
chunk <<= 1;
7884
chunk |= isFirstChunk ? 0x1 : 0x0;
7985

80-
uint32_t dataLength = std::distance(start, end);
86+
// get the lenght of VPack data
87+
uint32_t dataLength = 0;
88+
for (auto& slice : slices) {
89+
dataLength += slice.byteSize();
90+
}
91+
92+
// calculate length of current chunk
8193
uint32_t chunkLength = dataLength;
8294
chunkLength += (sizeof(chunkLength) + sizeof(chunk) + sizeof(id));
8395
if (firstOfMany) {
@@ -87,58 +99,53 @@ std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail(
8799
auto buffer =
88100
std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE, chunkLength, false);
89101

90-
// TRI_AppendUInt32StringBuffer(buffer->stringBuffer(), chunkLength);
91-
// buffer->appendInteger(chunkLength);
92102
char cChunkLength[sizeof(chunkLength)];
93103
char const* cChunkLengthPtr = cChunkLength;
94104
std::memcpy(&cChunkLength, &chunkLength, sizeof(chunkLength));
95105
buffer->appendText(cChunkLengthPtr, sizeof(chunkLength));
96106

97-
// TRI_AppendUInt32StringBuffer(buffer->stringBuffer(), chunk);
98-
// buffer->appendInteger(chunk); // chunkX //contains is first
99107
char cChunk[sizeof(chunk)];
100108
char const* cChunkPtr = cChunk;
101109
std::memcpy(&cChunk, &chunk, sizeof(chunk));
102110
buffer->appendText(cChunkPtr, sizeof(chunk));
103111

104-
// TRI_AppendUInt32StringBuffer(buffer->stringBuffer(), id);
105-
// buffer->appendInteger(id);
106112
char cId[sizeof(id)];
107113
char const* cIdPtr = cId;
108114
std::memcpy(&cId, &id, sizeof(id));
109115
buffer->appendText(cIdPtr, sizeof(id));
110116

111117
if (firstOfMany) {
112-
// TRI_ASSERT(totalMessageLength != 0);
113-
// buffer->appendInteger(totalMessageLength);
114118
char cTotalMessageLength[sizeof(totalMessageLength)];
115119
char const* cTotalMessageLengthPtr = cTotalMessageLength;
116120
std::memcpy(&cTotalMessageLength, &totalMessageLength,
117121
sizeof(totalMessageLength));
118122
buffer->appendText(cTotalMessageLengthPtr, sizeof(totalMessageLength));
119123
}
120-
buffer->appendText(std::string(start, dataLength));
124+
125+
// append data in slices
126+
for (auto const& slice : slices) {
127+
buffer->appendText(std::string(slice.startAs<char>(), slice.byteSize()));
128+
}
121129

122130
return buffer;
123131
}
124132

125-
std::unique_ptr<basics::StringBuffer> createChunkForNetworkSingle(char* start,
126-
char* end,
127-
uint64_t id) {
128-
return createChunkForNetworkDetail(start, end, true, 1, id, 0 /*unused*/);
133+
std::unique_ptr<basics::StringBuffer> createChunkForNetworkSingle(
134+
std::vector<VPackSlice> const& slices, uint64_t id) {
135+
return createChunkForNetworkDetail(slices, true, 1, id, 0 /*unused*/);
129136
}
130137

131138
std::unique_ptr<basics::StringBuffer> createChunkForNetworkMultiFirst(
132-
char* start, char* end, uint64_t id, uint32_t numberOfChunks,
139+
std::vector<VPackSlice> const& slices, uint64_t id, uint32_t numberOfChunks,
133140
uint32_t totalMessageLength) {
134-
return createChunkForNetworkDetail(start, end, true, numberOfChunks, id,
141+
return createChunkForNetworkDetail(slices, true, numberOfChunks, id,
135142
totalMessageLength);
136143
}
137144

138145
std::unique_ptr<basics::StringBuffer> createChunkForNetworkMultiFollow(
139-
char* start, char* end, uint64_t id, uint32_t chunkNumber,
146+
std::vector<VPackSlice> const& slices, uint64_t id, uint32_t chunkNumber,
140147
uint32_t totalMessageLength) {
141-
return createChunkForNetworkDetail(start, end, false, chunkNumber, id, 0);
148+
return createChunkForNetworkDetail(slices, false, chunkNumber, id, 0);
142149
}
143150
}
144151

@@ -164,10 +171,12 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) {
164171
slices.push_back(response_message._header);
165172

166173
// if payload != Slice()
167-
slices.push_back(response_message._payload);
174+
if (response_message._generateBody) {
175+
slices.push_back(response_message._payload);
176+
}
168177

178+
// calculate message length
169179
uint32_t message_length = 0;
170-
171180
for (auto const& slice : slices) {
172181
message_length += slice.byteSize();
173182
}
@@ -183,7 +192,7 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) {
183192

184193
// adds chunk header infromation and creates SingBuffer* that can be
185194
// used with _writeBuffers
186-
auto buffer = createChunkForNetworkSingle(tmp.begin(), tmp.end(), id);
195+
auto buffer = createChunkForNetworkSingle(slices, id);
187196
_writeBuffers.push_back(buffer.get());
188197
buffer.release();
189198

@@ -194,7 +203,7 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) {
194203
VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
195204
VppCommTask::ChunkHeader header;
196205

197-
auto cursor = _readBuffer->begin();
206+
auto cursor = _processReadVariables._readBufferCursor;
198207

199208
std::memcpy(&header._chunkLength, cursor, sizeof(header._chunkLength));
200209
cursor += sizeof(header._chunkLength);
@@ -217,7 +226,8 @@ VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
217226
header._messageLength = 0; // not needed
218227
}
219228

220-
header._headerLength = std::distance(_readBuffer->begin(), cursor);
229+
header._headerLength =
230+
std::distance(_processReadVariables._readBufferCursor, cursor);
221231

222232
return header;
223233
}
@@ -260,7 +270,8 @@ bool VppCommTask::processRead() {
260270
ChunkHeader chunkHeader = readChunkHeader();
261271
auto chunkEnd = chunkBegin + chunkHeader._chunkLength;
262272
auto vpackBegin = chunkBegin + chunkHeader._headerLength;
263-
bool do_execute = false;
273+
bool doExecute = false;
274+
bool read_maybe_only_part_of_buffer = false;
264275
VPackMessage message; // filled in CASE 1 or CASE 2b
265276

266277
// CASE 1: message is in one chunk
@@ -275,7 +286,7 @@ bool VppCommTask::processRead() {
275286
VPackValidator val;
276287
val.validate(message._header.begin(), message._header.byteSize());
277288

278-
do_execute = true;
289+
doExecute = true;
279290
}
280291
// CASE 2: message is in multiple chunks
281292
auto incompleteMessageItr = _incompleteMessages.find(chunkHeader._messageID);
@@ -309,7 +320,7 @@ bool VppCommTask::processRead() {
309320
// check buffer longer than length
310321

311322
// MESSAGE COMPLETE
312-
if (im._currentChunk == im._numberOfChunks) {
323+
if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */) {
313324
std::size_t payloadOffset = findAndValidateVPacks(
314325
reinterpret_cast<char const*>(im._buffer.data()),
315326
reinterpret_cast<char const*>(im._buffer.data() +
@@ -323,11 +334,13 @@ bool VppCommTask::processRead() {
323334
_incompleteMessages.erase(incompleteMessageItr);
324335
// check length
325336

326-
do_execute = true;
337+
doExecute = true;
327338
}
328339
}
329340

330341
// clean buffer up to length of chunk
342+
read_maybe_only_part_of_buffer = true;
343+
prv._currentChunkLength = 0;
331344
prv._readBufferCursor = chunkEnd;
332345
std::size_t processedDataLen =
333346
std::distance(_readBuffer->begin(), prv._readBufferCursor);
@@ -337,21 +350,27 @@ bool VppCommTask::processRead() {
337350
// begin of this function
338351
}
339352

340-
if (!do_execute) {
341-
return false; // we have no complete request, so we return early
353+
if (doExecute) {
354+
// return false; // we have no complete request, so we return early
355+
// for now we can handle only one request at a time
356+
// lock _request???? REVIEW (fc)
357+
LOG(ERR) << message._header.toJson();
358+
_request = new VppRequest(_connectionInfo, std::move(message));
359+
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(_request);
360+
_request->setClientTaskId(_taskId);
361+
_protocolVersion = _request->protocolVersion();
362+
executeRequest(_request,
363+
new VppResponse(GeneralResponse::ResponseCode::SERVER_ERROR,
364+
chunkHeader._messageID));
342365
}
343366

344-
// for now we can handle only one request at a time
345-
// lock _request???? REVIEW (fc)
346-
LOG(ERR) << message._header.toJson();
347-
_request = new VppRequest(_connectionInfo, std::move(message));
348-
GeneralServerFeature::HANDLER_FACTORY->setRequestContext(_request);
349-
_request->setClientTaskId(_taskId);
350-
_protocolVersion = _request->protocolVersion();
351-
executeRequest(_request,
352-
new VppResponse(GeneralResponse::ResponseCode::SERVER_ERROR,
353-
chunkHeader._messageID));
354-
return true;
367+
if (read_maybe_only_part_of_buffer) {
368+
if (prv._readBufferCursor == _readBuffer->end()) {
369+
return false;
370+
}
371+
return true;
372+
}
373+
return doExecute;
355374
}
356375

357376
void VppCommTask::completedWriteBuffer() {

arangod/GeneralServer/VppCommTask.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class VppCommTask : public GeneralCommTask {
7575
: _length(length),
7676
_buffer(_length),
7777
_numberOfChunks(numberOfChunks),
78-
_currentChunk(1UL) {}
78+
_currentChunk(0) {}
7979
uint32_t _length; // length of total message in bytes
8080
VPackBuffer<uint8_t> _buffer;
8181
std::size_t _numberOfChunks;

lib/Rest/VppMessage.h

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,28 @@ namespace arangodb {
3434
namespace rest {
3535

3636
struct VPackMessage {
37-
VPackMessage() : _buffer(), _header(), _payload(), _id() {}
37+
VPackMessage()
38+
: _buffer(), _header(), _payload(), _id(), _generateBody(true) {}
3839
VPackMessage(VPackBuffer<uint8_t>&& buff, VPackSlice head, VPackSlice pay,
39-
uint64_t id)
40-
: _buffer(std::move(buff)), _header(head), _payload(pay), _id(id) {}
40+
uint64_t id, bool generateBody = true)
41+
: _buffer(std::move(buff)),
42+
_header(head),
43+
_payload(pay),
44+
_id(id),
45+
_generateBody(generateBody) {}
4146
VPackMessage(VPackMessage&& other) = default;
4247

4348
VPackBuffer<uint8_t> _buffer;
4449
VPackSlice _header;
4550
VPackSlice _payload;
4651
uint64_t _id;
52+
bool _generateBody;
4753
};
4854

4955
struct VPackMessageNoOwnBuffer : VPackMessage {
50-
VPackMessageNoOwnBuffer(VPackSlice head, VPackSlice pay, uint64_t id)
51-
: VPackMessage(VPackBuffer<uint8_t>(), head, pay, id) {}
56+
VPackMessageNoOwnBuffer(VPackSlice head, VPackSlice pay, uint64_t id,
57+
bool generateBody = true)
58+
: VPackMessage(VPackBuffer<uint8_t>(), head, pay, id, generateBody) {}
5259
};
5360
}
5461
}

lib/Rest/VppResponse.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,16 +56,14 @@ void VppResponse::reset(ResponseCode code) {
5656
_headers.clear();
5757
_connectionType = CONNECTION_KEEP_ALIVE;
5858
_contentType = ContentType::TEXT;
59+
_generateBody = false; // payload has to be set
5960
}
6061

6162
void VppResponse::setPayload(ContentType contentType,
6263
arangodb::velocypack::Slice const& slice,
6364
bool generateBody, VPackOptions const& options) {
6465
if (generateBody) {
65-
// addPayload(slice);
66-
if (slice.isEmptyObject()) {
67-
throw std::logic_error("payload should be empty!!");
68-
}
66+
_generateBody = true;
6967
_payload.append(slice.startAs<char>(),
7068
std::distance(slice.begin(), slice.end()));
7169
}
@@ -79,12 +77,10 @@ VPackMessageNoOwnBuffer VppResponse::prepareForNetwork() {
7977
builder.add(
8078
"responseCode",
8179
VPackValue(static_cast<int>(meta::underlyingValue(_responseCode))));
82-
// for (auto const& item : _headers) {
83-
// builder.add(item.first, VPackValue(item.second));
84-
//}
8580
builder.close();
8681
_header = builder.steal();
8782
return VPackMessageNoOwnBuffer(VPackSlice(_header->data()),
88-
VPackSlice(_payload.data()), _messageID);
83+
VPackSlice(_payload.data()), _messageID,
84+
_generateBody);
8985
}
9086
// void VppResponse::writeHeader(basics::StringBuffer*) {}

lib/Rest/VppResponse.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ class VppResponse : public GeneralResponse {
8282
_header; // generated form _headers when prepared for network
8383
VPackBuffer<uint8_t> _payload;
8484
uint64_t _messageID;
85+
bool _generateBody; // this must be true if payload should be send
8586
};
8687
}
8788

0 commit comments

Comments
 (0)
0