File tree Expand file tree Collapse file tree 4 files changed +88
-0
lines changed Expand file tree Collapse file tree 4 files changed +88
-0
lines changed Original file line number Diff line number Diff line change @@ -62,6 +62,16 @@ Connection.prototype.destroy = function() {
62
62
this . _protocol . destroy ( ) ;
63
63
} ;
64
64
65
+ Connection . prototype . pause = function ( ) {
66
+ this . _socket . pause ( ) ;
67
+ this . _protocol . pause ( ) ;
68
+ } ;
69
+
70
+ Connection . prototype . resume = function ( ) {
71
+ this . _socket . resume ( ) ;
72
+ this . _protocol . resume ( ) ;
73
+ } ;
74
+
65
75
Connection . prototype . escape = function ( value ) {
66
76
return SqlString . escape ( value ) ;
67
77
} ;
Original file line number Diff line number Diff line change @@ -11,12 +11,17 @@ function Parser(options) {
11
11
this . _packetHeader = null ;
12
12
this . _packetParser = options . packetParser || function ( ) { } ;
13
13
this . _encoding = 'utf-8' ;
14
+ this . _paused = false ;
14
15
}
15
16
16
17
Parser . prototype . write = function ( buffer ) {
17
18
this . append ( buffer ) ;
18
19
19
20
while ( true ) {
21
+ if ( this . _paused ) {
22
+ return ;
23
+ }
24
+
20
25
if ( ! this . _packetHeader ) {
21
26
if ( this . _bytesRemaining ( ) < 4 ) {
22
27
break ;
@@ -63,6 +68,16 @@ Parser.prototype.append = function(newBuffer) {
63
68
this . _offset = 0 ;
64
69
} ;
65
70
71
+ Parser . prototype . pause = function ( ) {
72
+ this . _paused = true ;
73
+ } ;
74
+
75
+ Parser . prototype . resume = function ( ) {
76
+ this . _paused = false ;
77
+ // A little hacky, but does the trick of resuming the parser
78
+ this . write ( new Buffer ( 0 ) ) ;
79
+ } ;
80
+
66
81
Parser . prototype . peak = function ( ) {
67
82
return this . _buffer [ this . _offset ] ;
68
83
} ;
Original file line number Diff line number Diff line change @@ -58,6 +58,14 @@ Protocol.prototype.end = function() {
58
58
}
59
59
} ;
60
60
61
+ Protocol . prototype . pause = function ( ) {
62
+ this . _parser . pause ( ) ;
63
+ } ;
64
+
65
+ Protocol . prototype . resume = function ( ) {
66
+ this . _parser . resume ( ) ;
67
+ } ;
68
+
61
69
Protocol . prototype . _enqueue = function ( sequence ) {
62
70
var cannotEnqueue = ( this . _quitSequence || this . _destroyed ) ;
63
71
if ( cannotEnqueue ) {
Original file line number Diff line number Diff line change
1
+ var common = require ( '../common' ) ;
2
+ var connection = common . createConnection ( ) ;
3
+ var assert = require ( 'assert' ) ;
4
+
5
+ common . useTestDb ( connection ) ;
6
+
7
+ var table = 'stream_test' ;
8
+ connection . query ( [
9
+ 'CREATE TEMPORARY TABLE `' + table + '` (' ,
10
+ '`id` int(11) unsigned NOT NULL AUTO_INCREMENT,' ,
11
+ '`title` varchar(255),' ,
12
+ 'PRIMARY KEY (`id`)' ,
13
+ ') ENGINE=InnoDB DEFAULT CHARSET=utf8'
14
+ ] . join ( '\n' ) ) ;
15
+
16
+ var rowCount = 10 ;
17
+ for ( var i = 1 ; i <= rowCount ; i ++ ) {
18
+ var row = {
19
+ id : i ,
20
+ title : 'Row #' + i ,
21
+ } ;
22
+
23
+ connection . query ( 'INSERT INTO ' + table + ' SET ?' , row ) ;
24
+ }
25
+
26
+ var paused = false ;
27
+ var query = connection . query ( 'SELECT * FROM ' + table ) ;
28
+
29
+ var hadEnd = 0 ;
30
+ var rows = [ ] ;
31
+ query
32
+ . on ( 'row' , function ( row ) {
33
+ // Make sure we never receive a row while being paused
34
+ assert . equal ( paused , false ) ;
35
+
36
+ paused = true ;
37
+ connection . pause ( ) ;
38
+
39
+ setTimeout ( function ( ) {
40
+ paused = false ;
41
+ connection . resume ( ) ;
42
+
43
+ rows . push ( row ) ;
44
+ } , 10 ) ;
45
+ } )
46
+ . on ( 'end' , function ( ) {
47
+ hadEnd = true ;
48
+ } ) ;
49
+
50
+ connection . end ( ) ;
51
+
52
+ process . on ( 'exit' , function ( ) {
53
+ assert . equal ( rows . length , 10 ) ;
54
+ assert . equal ( hadEnd , true ) ;
55
+ } ) ;
You can’t perform that action at this time.
0 commit comments