8000 1. fix connection leak by returning an error on inactive channel insi… · olksdr/postgresql-async@edabfa9 · GitHub
[go: up one dir, main page]

Skip to content

Commit edabfa9

Browse files
committed
1. fix connection leak by returning an error on inactive channel inside the future
2. catch exceptions on user function in the object pool 3. add request timeout for queries
1 parent 29c2362 commit edabfa9

File tree

4 files changed

+53
-25
lines changed

4 files changed

+53
-25
lines changed

db-async-common/src/main/scala/com/github/mauricio/async/db/Configuration.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717
package com.github.mauricio.async.db
1818

1919
import java.nio.charset.Charset
20-
import scala.Predef._
21-
import scala.{None, Option, Int}
20+
21+
import io.netty.buffer.{AbstractByteBufAllocator, PooledByteBufAllocator}
2222
import io.netty.util.CharsetUtil
23-
import io.netty.buffer.AbstractByteBufAllocator
24-
import io.netty.buffer.PooledByteBufAllocator
23+
2524
import scala.concurrent.duration._
2625

2726
object Configuration {
@@ -55,5 +54,5 @@ case class Configuration(username: String,
5554
maximumMessageSize: Int = 16777216,
5655
allocator: AbstractByteBufAllocator = PooledByteBufAllocator.DEFAULT,
5756
connectTimeout: Duration = 5.seconds,
58-
testTimeout: Duration = 5.seconds
59-
)
57+
testTimeout: Duration = 5.seconds,
58+
requestTimeout: Duration = 5.seconds)

db-async-common/src/main/scala/com/github/mauricio/async/db/pool/AsyncObjectPool.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,22 @@ trait AsyncObjectPool[T] {
7373
def use[A](f: (T) => Future[A])(implicit executionContext: ExecutionContext): Future[A] =
7474
take.flatMap { item =>
7575
val p = Promise[A]()
76-
f(item).onComplete { r =>
77-
giveBack(item).onComplete { _ =>
78-
p.complete(r)
76+
try {
77+
f(item).onComplete { r =>
78+
giveBack(item).onComplete { _ =>
79+
p.complete(r)
80+
}
7981
}
82+
} catch {
83+
// calling f might throw exception.
84+
// in that case the item will be removed from the pool if identified as invalid by the factory.
85+
// the error returned to the user is the original error thrown by f.
86+
case error: Throwable =>
87+
giveBack(item).onComplete { _ =>
88+
p.failure(error)
89+
}
8090
}
91+
8192
p.future
8293
}
8394

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,22 @@
1616

1717
package com.github.mauricio.async.db.mysql
1818

19+
import java.util.concurrent.TimeoutException
20+
import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
21+
1922
import com.github.mauricio.async.db._
2023
import com.github.mauricio.async.db.exceptions._
21-
import com.github.mauricio.async.db.mysql.codec.{MySQLHandlerDelegate, MySQLConnectionHandler}
24+
import com.github.mauricio.async.db.mysql.codec.{MySQLConnectionHandler, MySQLHandlerDelegate}
2225
import com.github.mauricio.async.db.mysql.exceptions.MySQLException
2326
import com.github.mauricio.async.db.mysql.message.client._
2427
import com.github.mauricio.async.db.mysql.message.server._
2528
import com.github.mauricio.async.db.mysql.util.CharsetMapper
2629
import com.github.mauricio.async.db.util.ChannelFutureTransformer.toFuture
2730
import com.github.mauricio.async.db.util._
28-
import java.util.concurrent.atomic.{AtomicLong,AtomicReference}
29-
import scala.concurrent.{ExecutionContext, Promise, Future}
30-
import io.netty.channel.{EventLoopGroup, ChannelHandlerContext}
31-
import scala.util.Failure
32-
import scala.Some
33-
import scala.util.Success
31+
import io.netty.channel.{ChannelHandlerContext, EventLoopGroup}
32+
33+
import scala.concurrent.{ExecutionContext, Future, Promise}
34+
import scala.util.{Failure, Success}
3435

3536
object MySQLConnection {
3637
final val Counter = new AtomicLong()
@@ -185,18 +186,24 @@ class MySQLConnection(
185186

186187
def sendQuery(query: String): Future[QueryResult] = {
187188
this.validateIsReadyForQuery()
188-
val promise = Promise[QueryResult]
189+
val promise = Promise[QueryResult]()
189190
this.setQueryPromise(promise)
190191
this.connectionHandler.write(new QueryMessage(query))
192+
addTimeout(promise)
193+
191194
promise.future
192195
}
193196

194-
private def failQueryPromise(t: Throwable) {
197+
private def addTimeout(promise: Promise[QueryResult]): Unit = {
198+
this.connectionHandler.schedule(
199+
promise.tryFailure(new TimeoutException(s"response took too long to return(${configuration.requestTimeout})")),
200+
configuration.requestTimeout)
201+
}
195202

203+
private def failQueryPromise(t: Throwable) {
196204
this.clearQueryPromise.foreach {
197205
_.tryFailure(t)
198206
}
199-
200207
}
201208

202209
private def succeedQueryPromise(queryResult: QueryResult) {
@@ -234,9 +241,11 @@ class MySQLConnection(
234241
if ( values.length != totalParameters ) {
235242
throw new InsufficientParametersException(totalParameters, values)
236243
}
237-
val promise = Promise[QueryResult]
244+
val promise = Promise[QueryResult]()
238245
this.setQueryPromise(promise)
239246
this.connectionHandler.sendPreparedStatement(query, values)
247+
addTimeout(promise)
248+
240249
promise.future
241250
}
242251

mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/codec/MySQLConnectionHandler.scala

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.github.mauricio.async.db.mysql.codec
1818

1919
import java.net.InetSocketAddress
2020
import java.nio.ByteBuffer
21+
import java.util.concurrent.TimeUnit
2122

2223
import com.github.mauricio.async.db.Configuration
2324
import com.github.mauricio.async.db.exceptions.DatabaseException
@@ -37,6 +38,7 @@ import io.netty.handler.codec.CodecException
3738
import scala.annotation.switch
3839
import scala.collection.mutable.{ArrayBuffer, HashMap}
3940
import scala.concurrent._
41+
import scala.concurrent.duration.Duration
4042

4143
class MySQLConnectionHandler(
4244
configuration: Configuration,
@@ -319,17 +321,18 @@ class MySQLConnectionHandler(
319321
}
320322

321323
private def writeAndHandleError( message : Any ) : ChannelFuture = {
322-
323324
if ( this.currentContext.channel().isActive ) {
324-
val future = this.currentContext.writeAndFlush(message)
325+
val res = this.currentContext.writeAndFlush(message)
325326

326-
future.onFailure {
327+
res.onFailure {
327328
case e : Throwable => handleException(e)
328329
}
329330

330-
future
331+
res
331332
} else {
332-
throw new DatabaseException("This channel is not active and can't take messages")
333+
val error = new DatabaseException("This channel is not active and can't take messages")
334+
handleException(error)
335+
this.currentContext.channel().newFailedFuture(error)
333336
}
334337
}
335338

@@ -351,4 +354,10 @@ class MySQLConnectionHandler(
351354
}
352355
}
353356

357+
def schedule(block: => Unit, duration: Duration): Unit = {
358+
this.currentContext.channel().eventLoop().schedule(new Runnable {
359+
override def run(): Unit = block
360+
}, duration.toMillis, TimeUnit.MILLISECONDS)
361+
}
362+
354363
}

0 commit comments

Comments
 (0)
0