@@ -66,18 +66,30 @@ std::size_t findAndValidateVPacks(char const* vpHeaderStart,
66
66
}
67
67
68
68
std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail (
69
- char * start, char * end , bool isFirstChunk, uint32_t chunk, uint64_t id ,
70
- uint32_t totalMessageLength = 0 ) {
69
+ std::vector<VPackSlice> const & slices , bool isFirstChunk, uint32_t chunk,
70
+ uint64_t id, uint32_t totalMessageLength = 0 ) {
71
71
using basics::StringBuffer;
72
72
bool firstOfMany = false ;
73
+
74
+ // if we have more than one chunk and the chunk is the first
75
+ // then we are sending the first in a series. if this condition
76
+ // is true we also send extra 8 bytes for the messageLength
77
+ // (length of all VPackData)
73
78
if (isFirstChunk && chunk > 1 ) {
74
79
firstOfMany = true ;
75
80
}
76
81
82
+ // build chunkX -- see VelocyStream Documentaion
77
83
chunk <<= 1 ;
78
84
chunk |= isFirstChunk ? 0x1 : 0x0 ;
79
85
80
- uint32_t dataLength = std::distance (start, end);
86
+ // get the lenght of VPack data
87
+ uint32_t dataLength = 0 ;
88
+ for (auto & slice : slices) {
89
+ dataLength += slice.byteSize ();
90
+ }
91
+
92
+ // calculate length of current chunk
81
93
uint32_t chunkLength = dataLength;
82
94
chunkLength += (sizeof (chunkLength) + sizeof (chunk) + sizeof (id));
83
95
if (firstOfMany) {
@@ -87,58 +99,53 @@ std::unique_ptr<basics::StringBuffer> createChunkForNetworkDetail(
87
99
auto buffer =
88
100
std::make_unique<StringBuffer>(TRI_UNKNOWN_MEM_ZONE, chunkLength, false );
89
101
90
- // TRI_AppendUInt32StringBuffer(buffer->stringBuffer(), chunkLength);
91
- // buffer->appendInteger(chunkLength);
92
102
char cChunkLength[sizeof (chunkLength)];
93
103
char const * cChunkLengthPtr = cChunkLength;
94
104
std::memcpy (&cChunkLength, &chunkLength, sizeof (chunkLength));
95
105
buffer->appendText (cChunkLengthPtr, sizeof (chunkLength));
96
106
97
- // TRI_AppendUInt32StringBuffer(buffer->stringBuffer(), chunk);
98
- // buffer->appendInteger(chunk); // chunkX //contains is first
99
107
char cChunk[sizeof (chunk)];
100
108
char const * cChunkPtr = cChunk;
101
109
std::memcpy (&cChunk, &chunk, sizeof (chunk));
102
110
buffer->appendText (cChunkPtr, sizeof (chunk));
103
111
104
- // TRI_AppendUInt32StringBuffer(buffer->stringBuffer(), id);
105
- // buffer->appendInteger(id);
106
112
char cId[sizeof (id)];
107
113
char const * cIdPtr = cId;
108
114
std::memcpy (&cId, &id, sizeof (id));
109
115
buffer->appendText (cIdPtr, sizeof (id));
110
116
111
117
if (firstOfMany) {
112
- // TRI_ASSERT(totalMessageLength != 0);
113
- // buffer->appendInteger(totalMessageLength);
114
118
char cTotalMessageLength[sizeof (totalMessageLength)];
115
119
char const * cTotalMessageLengthPtr = cTotalMessageLength;
116
120
std::memcpy (&cTotalMessageLength, &totalMessageLength,
117
121
sizeof (totalMessageLength));
118
122
buffer->appendText (cTotalMessageLengthPtr, sizeof (totalMessageLength));
119
123
}
120
- buffer->appendText (std::string (start, dataLength));
124
+
125
+ // append data in slices
126
+ for (auto const & slice : slices) {
127
+ buffer->appendText (std::string (slice.startAs <char >(), slice.byteSize ()));
128
+ }
121
129
122
130
return buffer;
123
131
}
124
132
125
- std::unique_ptr<basics::StringBuffer> createChunkForNetworkSingle (char * start,
126
- char * end,
127
- uint64_t id) {
128
- return createChunkForNetworkDetail (start, end, true , 1 , id, 0 /* unused*/ );
133
+ std::unique_ptr<basics::StringBuffer> createChunkForNetworkSingle (
134
+ std::vector<VPackSlice> const & slices, uint64_t id) {
135
+ return createChunkForNetworkDetail (slices, true , 1 , id, 0 /* unused*/ );
129
136
}
130
137
131
138
std::unique_ptr<basics::StringBuffer> createChunkForNetworkMultiFirst (
132
- char * start, char * end , uint64_t id, uint32_t numberOfChunks,
139
+ std::vector<VPackSlice> const & slices , uint64_t id, uint32_t numberOfChunks,
133
140
uint32_t totalMessageLength) {
134
- return createChunkForNetworkDetail (start, end , true , numberOfChunks, id,
141
+ return createChunkForNetworkDetail (slices , true , numberOfChunks, id,
135
142
totalMessageLength);
136
143
}
137
144
138
145
std::unique_ptr<basics::StringBuffer> createChunkForNetworkMultiFollow (
139
- char * start, char * end , uint64_t id, uint32_t chunkNumber,
146
+ std::vector<VPackSlice> const & slices , uint64_t id, uint32_t chunkNumber,
140
147
uint32_t totalMessageLength) {
141
- return createChunkForNetworkDetail (start, end , false , chunkNumber, id, 0 );
148
+ return createChunkForNetworkDetail (slices , false , chunkNumber, id, 0 );
142
149
}
143
150
}
144
151
@@ -164,10 +171,12 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) {
164
171
slices.push_back (response_message._header );
165
172
166
173
// if payload != Slice()
167
- slices.push_back (response_message._payload );
174
+ if (response_message._generateBody ) {
175
+ slices.push_back (response_message._payload );
176
+ }
168
177
178
+ // calculate message length
169
179
uint32_t message_length = 0 ;
170
-
171
180
for (auto const & slice : slices) {
172
181
message_length += slice.byteSize ();
173
182
}
@@ -183,7 +192,7 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) {
183
192
184
193
// adds chunk header infromation and creates SingBuffer* that can be
185
194
// used with _writeBuffers
186
- auto buffer = createChunkForNetworkSingle (tmp. begin (), tmp. end () , id);
195
+ auto buffer = createChunkForNetworkSingle (slices , id);
187
196
_writeBuffers.push_back (buffer.get ());
188
197
buffer.release ();
189
198
@@ -194,7 +203,7 @@ void VppCommTask::addResponse(VppResponse* response, bool isError) {
194
203
VppCommTask::ChunkHeader VppCommTask::readChunkHeader () {
195
204
VppCommTask::ChunkHeader header;
196
205
197
- auto cursor = _readBuffer-> begin () ;
206
+ auto cursor = _processReadVariables. _readBufferCursor ;
198
207
199
208
std::memcpy (&header._chunkLength , cursor, sizeof (header._chunkLength ));
200
209
cursor += sizeof (header._chunkLength );
@@ -217,7 +226,8 @@ VppCommTask::ChunkHeader VppCommTask::readChunkHeader() {
217
226
header._messageLength = 0 ; // not needed
218
227
}
219
228
220
- header._headerLength = std::distance (_readBuffer->begin (), cursor);
229
+ header._headerLength =
230
+ std::distance (_processReadVariables._readBufferCursor , cursor);
221
231
222
232
return header;
223
233
}
@@ -260,7 +270,8 @@ bool VppCommTask::processRead() {
260
270
ChunkHeader chunkHeader = readChunkHeader ();
261
271
auto chunkEnd = chunkBegin + chunkHeader._chunkLength ;
262
272
auto vpackBegin = chunkBegin + chunkHeader._headerLength ;
263
- bool do_execute = false ;
273
+ bool doExecute = false ;
274
+ bool read_maybe_only_part_of_buffer = false ;
264
275
VPackMessage message; // filled in CASE 1 or CASE 2b
265
276
266
277
// CASE 1: message is in one chunk
@@ -275,7 +286,7 @@ bool VppCommTask::processRead() {
275
286
VPackValidator val;
276
287
val.validate (message._header .begin (), message._header .byteSize ());
277
288
278
- do_execute = true ;
289
+ doExecute = true ;
279
290
}
280
291
// CASE 2: message is in multiple chunks
281
292
auto incompleteMessageItr = _incompleteMessages.find (chunkHeader._messageID );
@@ -309,7 +320,7 @@ bool VppCommTask::processRead() {
309
320
// check buffer longer than length
310
321
311
322
// MESSAGE COMPLETE
312
- if (im._currentChunk == im._numberOfChunks ) {
323
+ if (im._currentChunk == im._numberOfChunks - 1 /* zero based counting */ ) {
313
324
std::size_t payloadOffset = findAndValidateVPacks (
314
325
reinterpret_cast <char const *>(im._buffer .data ()),
315
326
reinterpret_cast <char const *>(im._buffer .data () +
@@ -323,11 +334,13 @@ bool VppCommTask::processRead() {
323
334
_incompleteMessages.erase (incompleteMessageItr);
324
335
// check length
325
336
326
- do_execute = true ;
337
+ doExecute = true ;
327
338
}
328
339
}
329
340
330
341
// clean buffer up to length of chunk
342
+ read_maybe_only_part_of_buffer = true ;
343
+ prv._currentChunkLength = 0 ;
331
344
prv._readBufferCursor = chunkEnd;
332
345
std::size_t processedDataLen =
333
346
std::distance (_readBuffer->begin (), prv._readBufferCursor );
@@ -337,21 +350,27 @@ bool VppCommTask::processRead() {
337
350
// begin of this function
338
351
}
339
352
340
- if (!do_execute) {
341
- return false ; // we have no complete request, so we return early
353
+ if (doExecute) {
354
+ // return false; // we have no complete request, so we return early
355
+ // for now we can handle only one request at a time
356
+ // lock _request???? REVIEW (fc)
357
+ LOG (ERR) << message._header .toJson ();
358
+ _request = new VppRequest (_connectionInfo, std::move (message));
359
+ GeneralServerFeature::HANDLER_FACTORY->setRequestContext (_request);
360
+ _request->setClientTaskId (_taskId);
361
+ _protocolVersion = _request->protocolVersion ();
362
+ executeRequest (_request,
363
+ new VppResponse (GeneralResponse::ResponseCode::SERVER_ERROR,
364
+ chunkHeader._messageID ));
342
365
}
343
366
344
- // for now we can handle only one request at a time
345
- // lock _request???? REVIEW (fc)
346
- LOG (ERR) << message._header .toJson ();
347
- _request = new VppRequest (_connectionInfo, std::move (message));
348
- GeneralServerFeature::HANDLER_FACTORY->setRequestContext (_request);
349
- _request->setClientTaskId (_taskId);
350
- _protocolVersion = _request->protocolVersion ();
351
- executeRequest (_request,
352
- new VppResponse (GeneralResponse::ResponseCode::SERVER_ERROR,
353
- chunkHeader._messageID ));
354
- return true ;
367
+ if (read_maybe_only_part_of_buffer) {
368
+ if (prv._readBufferCursor == _readBuffer->end ()) {
369
+ return false ;
370
+ }
371
+ return true ;
372
+ }
373
+ return doExecute;
355
374
}
356
375
357
376
void VppCommTask::completedWriteBuffer () {
0 commit comments