8000 Implement pause/resume and row streaming test · mysqljs/mysql@5e51c94 · GitHub
[go: up one dir, main page]

Skip to content

Commit 5e51c94

Browse files
committed
Implement pause/resume and row streaming test
1 parent 0d0251e commit 5e51c94

File tree

4 files changed

+88
-0
lines changed

4 files changed

+88
-0
lines changed

lib/Connection.js

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ Connection.prototype.destroy = function() {
6262
this._protocol.destroy();
6363
};
6464

65+
Connection.prototype.pause = function() {
66+
this._socket.pause();
67+
this._protocol.pause();
68+
};
69+
70+
Connection.prototype.resume = function() {
71+
this._socket.resume();
72+
this._protocol.resume();
73+
};
74+
6575
Connection.prototype.escape = function(value) {
6676
return SqlString.escape(value);
6777
};

lib/protocol/Parser.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,17 @@ function Parser(options) {
1111
this._packetHeader = null;
1212
this._packetParser = options.packetParser || function() {};
1313
this._encoding = 'utf-8';
14+
this._paused = false;
1415
}
1516

1617
Parser.prototype.write = function(buffer) {
1718
this.append(buffer);
1819

1920
while (true) {
21+
if (this._paused) {
22+
return;
23+
}
24+
2025
if (!this._packetHeader) {
2126
if (this._bytesRemaining() < 4) {
2227
break;
@@ -63,6 +68,16 @@ Parser.prototype.append = function(newBuffer) {
6368
this._offset = 0;
6469
};
6570

71+
Parser.prototype.pause = function() {
72+
this._paused = true;
73+
};
74+
75+
Parser.prototype.resume = function() {
76+
this._paused = false;
77+
// A little hacky, but does the trick of resuming the parser
78+
this.write(new Buffer(0));
79+
};
80+
6681
Parser.prototype.peak = function() {
6782
return this._buffer[this._offset];
6883
};

lib/protocol/Protocol.js

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ Protocol.prototype.end = function() {
5858
}
5959
};
6060

61+
Protocol.prototype.pause = function() {
62+
this._parser.pause();
63+
};
64+
65+
Protocol.prototype.resume = function() {
66+
this._parser.resume();
67+
};
68+
6169
Protocol.prototype._enqueue = function(sequence) {
6270
var cannotEnqueue = (this._quitSequence || this._destroyed);
6371
if (cannotEnqueue) {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
var common = require('../common');
2+
var connection = common.createConnection();
3+
var assert = require('assert');
4+
5+
common.useTestDb(connection);
6+
7+
var table = 'stream_test';
8+
connection.query([
9+
'CREATE TEMPORARY TABLE `' + table + '` (',
10+
'`id` int(11) unsigned NOT NULL AUTO_INCREMENT,',
11+
'`title` varchar(255),',
12+
'PRIMARY KEY (`id`)',
13+
') ENGINE=InnoDB DEFAULT CHARSET=utf8'
14+
].join('\n'));
15+
16+
var rowCount = 10;
17+
for (var i = 1; i <= rowCount; i++) {
18+
var row = {
19+
id: i,
20+
title: 'Row #' + i,
21+
};
22+
23+
connection.query('INSERT INTO ' + table + ' SET ?', row);
24+
}
25+
26+
var paused = false;
27+
var query = connection.query('SELECT * FROM ' + table);
28+
29+
var hadEnd = 0;
30+
var rows = [];
31+
query
32+
.on('row', function(row) {
33+
// Make sure we never receive a row while being paused
34+
assert.equal(paused, false);
35+
36+
paused = true;
37+
connection.pause();
38+
39+
setTimeout(function() {
40+
paused = false;
41+
connection.resume();
42+
43+
rows.push(row);
44+
}, 10);
45+
})
46+
.on('end', function() {
47+
hadEnd = true;
48+
});
49+
50+
connection.end();
51+
52+
process.on('exit', function() {
53+
assert.equal(rows.length, 10);
54+
assert.equal(hadEnd, true);
55+
});

0 commit comments

Comments
 (0)
0