@@ -30,7 +30,7 @@ import java.nio.charset.Charset
30
30
import java .util .concurrent .atomic .AtomicInteger
31
31
32
32
33
- class MySQLFrameDecoder (charset : Charset , connectionId : String ) extends ByteToMessageDecoder {
33
+ class MySQLFrameDecoder (charset : Charset , connectionId : String ) extends ByteToMessageDecoder {
34
34
35
35
private final val log = Log .getByName(s " [frame-decoder] ${connectionId}" )
36
36
private final val messagesCount = new AtomicInteger ()
@@ -48,6 +48,7 @@ class MySQLFrameDecoder(charset: Charset, connectionId : String) extends ByteToM
48
48
private [codec] var isPreparedStatementPrepare = false
49
49
private [codec] var isPreparedStatementExecute = false
50
50
private [codec] var isPreparedStatementExecuteRows = false
51
+ private [codec] var hasDoneHandshake = false
51
52
52
53
private [codec] var totalParams = 0L
53
54
private [codec] var processedParams = 0L
@@ -77,121 +78,133 @@ class MySQLFrameDecoder(charset: Charset, connectionId : String) extends ByteToM
77
78
78
79
val slice = buffer.readSlice(size)
79
80
80
- if ( log.isTraceEnabled ) {
81
+ if (log.isTraceEnabled) {
81
82
log.trace(s " Reading message type $messageType - " +
82
- s " (count= $messagesCount,size= $size,isInQuery= $isInQuery,processingColumns= $processingColumns,processingParams= $processingParams,processedColumns= $processedColumns,processedParams= $processedParams) " +
83
+ s " (count= $messagesCount,hasDoneHandshake= $hasDoneHandshake , size= $size,isInQuery= $isInQuery,processingColumns= $processingColumns,processingParams= $processingParams,processedColumns= $processedColumns,processedParams= $processedParams) " +
83
84
s " \n ${BufferDumper .dumpAsHex(slice)}} " )
84
85
}
85
86
86
87
slice.readByte()
87
88
88
- val decoder = messageType match {
89
- case ServerMessage .ServerProtocolVersion if ! isInQuery => this .handshakeDecoder
90
- case ServerMessage .Error => {
91
- this .clear
92
- this .errorDecoder
93
- }
94
- case ServerMessage .EOF => {
95
-
96
- if (this .processingParams && this .totalParams > 0 ) {
97
- this .processingParams = false
98
- if (this .totalColumns == 0 ) {
99
- ParamAndColumnProcessingFinishedDecoder
100
- } else {
101
- ParamProcessingFinishedDecoder
102
- }
103
- } else {
104
- if (this .processingColumns) {
105
- this .processingColumns = false
106
- ColumnProcessingFinishedDecoder
107
- } else {
108
-
109
- if ( this .isInQuery ) {
110
- this .clear
111
- EOFMessageDecoder
112
- } else {
113
- this .authenticationSwitchDecoder
114
- }
115
-
116
- }
117
- }
118
-
119
- }
120
- case ServerMessage .Ok => {
121
- if (this .isPreparedStatementPrepare) {
122
- this .preparedStatementPrepareDecoder
123
- } else {
124
- if (this .isPreparedStatementExecuteRows) {
125
- null
126
- } else {
127
- this .clear
128
- this .okDecoder
129
- }
89
+ if (this .hasDoneHandshake) {
90
+ this .handleCommonFlow(messageType, slice, out)
91
+ } else {
92
+ val decoder = messageType match {
93
+ case ServerMessage .Error => {
94
+ this .clear
95
+ this .errorDecoder
130
96
}
97
+ case _ => this .handshakeDecoder
131
98
}
132
- case _ => {
99
+ this .doDecoding(decoder, slice, out)
100
+ }
101
+ } else {
102
+ buffer.resetReaderIndex()
103
+ }
133
104
134
- if (this .isInQuery) {
135
- null
136
- } else {
137
- throw new ParserNotAvailableException (messageType)
138
- }
105
+ }
106
+ }
139
107
108
+ private def handleCommonFlow (messageType : Byte , slice : ByteBuf , out : java.util.List [Object ]) {
109
+ val decoder = messageType match {
110
+ case ServerMessage .Error => {
111
+ this .clear
112
+ this .errorDecoder
113
+ }
114
+ case ServerMessage .EOF => {
115
+
116
+ if (this .processingParams && this .totalParams > 0 ) {
117
+ this .processingParams = false
118
+ if (this .totalColumns == 0 ) {
119
+ ParamAndColumnProcessingFinishedDecoder
120
+ } else {
121
+ ParamProcessingFinishedDecoder
122
+ }
123
+ } else {
124
+ if (this .processingColumns) {
125
+ this .processingColumns = false
126
+ ColumnProcessingFinishedDecoder
127
+ } else {
128
+ this .clear
129
+ EOFMessageDecoder
140
130
}
141
131
}
142
132
143
- if (decoder == null ) {
144
- slice.readerIndex(slice.readerIndex() - 1 )
145
- val result = decodeQueryResult(slice)
146
-
147
- if (slice.readableBytes() != 0 ) {
148
- throw new BufferNotFullyConsumedException (slice)
149
- }
150
- if (result != null ) {
151
- out.add(result)
133
+ }
134
+ case ServerMessage .Ok => {
135
+ if (this .isPreparedStatementPrepare) {
136
+ this .preparedStatementPrepareDecoder
137
+ } else {
138
+ if (this .isPreparedStatementExecuteRows) {
139
+ null
140
+ } else {
141
+ this .clear
142
+ this .okDecoder
152
143
}
144
+ }
145
+ }
146
+ case _ => {
147
+
148
+ if (this .isInQuery) {
149
+ null
153
150
} else {
154
- val result = decoder.decode(slice)
151
+ throw new ParserNotAvailableException (messageType)
152
+ }
155
153
156
- result match {
157
- case m : PreparedStatementPrepareResponse => {
158
- this .hasReadColumnsCount = true
159
- this .totalColumns = m.columnsCount
160
- this .totalParams = m.paramsCount
161
- }
162
- case m : ParamAndColumnProcessingFinishedMessage => {
163
- this .clear
164
- }
165
- case m : ColumnProcessingFinishedMessage if this .isPreparedStatementPrepare => {
166
- this .clear
167
- }
168
- case m : ColumnProcessingFinishedMessage if this .isPreparedStatementExecute => {
169
- this .isPreparedStatementExecuteRows = true
170
- }
171
- case _ =>
172
- }
154
+ }
155
+ }
173
156
174
- if (slice.readableBytes() != 0 ) {
175
- throw new BufferNotFullyConsumedException (slice)
176
- }
157
+ doDecoding(decoder, slice, out)
158
+ }
159
+
160
+ private def doDecoding (decoder : MessageDecoder , slice : ByteBuf , out : java.util.List [Object ]) {
161
+ if (decoder == null ) {
162
+ slice.readerIndex(slice.readerIndex() - 1 )
163
+ val result = decodeQueryResult(slice)
164
+
165
+ if (slice.readableBytes() != 0 ) {
166
+ throw new BufferNotFullyConsumedException (slice)
167
+ }
168
+ if (result != null ) {
169
+ out.add(result)
170
+ }
171
+ } else {
172
+ val result = decoder.decode(slice)
173
+
174
+ result match {
175
+ case m : PreparedStatementPrepareResponse => {
176
+ this .hasReadColumnsCount = true
177
+ this .totalColumns = m.columnsCount
178
+ this .totalParams = m.paramsCount
179
+ }
180
+ case m : ParamAndColumnProcessingFinishedMessage => {
181
+ this .clear
182
+ }
183
+ case m : ColumnProcessingFinishedMessage if this .isPreparedStatementPrepare => {
184
+ this .clear
185
+ }
186
+ case m : ColumnProcessingFinishedMessage if this .isPreparedStatementExecute => {
187
+ this .isPreparedStatementExecuteRows = true
188
+ }
189
+ case _ =>
190
+ }
177
191
178
- if (result != null ) {
179
- result match {
180
- case m : PreparedStatementPrepareResponse => {
181
- out.add(result)
182
- if ( m.columnsCount == 0 && m.paramsCount == 0 ) {
183
- this .clear
184
- out.add(new ParamAndColumnProcessingFinishedMessage (new EOFMessage (0 , 0 )) )
185
- }
186
- }
187
- case _ => out.add(result)
192
+ if (slice.readableBytes() != 0 ) {
193
+ throw new BufferNotFullyConsumedException (slice)
194
+ }
195
+
196
+ if (result != null ) {
197
+ result match {
198
+ case m : PreparedStatementPrepareResponse => {
199
+ out.add(result)
200
+ if (m.columnsCount == 0 && m.paramsCount == 0 ) {
201
+ this .clear
202
+ out.add(new ParamAndColumnProcessingFinishedMessage (new EOFMessage (0 , 0 )))
188
203
}
189
204
}
205
+ case _ => out.add(result)
190
206
}
191
- } else {
192
- buffer.resetReaderIndex()
193
207
}
194
-
195
208
}
196
209
}
197
210
0 commit comments