8000 Fix connections leaking on high load. (#290) · jasync-sql/jasync-sql@936e26d · GitHub
[go: up one dir, main page]

Skip to content

Commit 936e26d

Browse files
authored
Fix connections leaking on high load. (#290)
Moves the responsibility of connection create timeout completely to the connection. By doing so, allows us to close the underlying Netty Channel if the creation times out. Also: * Lower connectionTimeout in the unit tests * Fix unit tests
1 parent 598aee5 commit 936e26d

File tree

10 files changed

+121
-29
lines changed

10 files changed

+121
-29
lines changed

db-async-common/src/main/java/com/github/jasync/sql/db/ConnectionPoolConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ data class ConnectionPoolConfiguration @JvmOverloads constructor(
114114
maxObjectTtl = maxConnectionTtl,
115115
maxQueueSize = 8000 maxPendingQueries,
116116
validationInterval = connectionValidationInterval,
117-
createTimeout = connectionCreateTimeout,
117+
createTimeout = connectionCreateTimeout * 2,
118118
testTimeout = connectionTestTimeout,
119119
queryTimeout = queryTimeout,
120120
coroutineDispatcher = coroutineDispatcher

db-async-common/src/test/java/com/github/jasync/sql/db/ConnectionPoolConfigurationTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class ConnectionPoolConfigurationTest {
4848
assertThat(configuration.connectionValidationInterval).isEqualTo(13)
4949
assertThat(configuration.poolConfiguration.validationInterval).isEqualTo(13)
5050
assertThat(configuration.connectionCreateTimeout).isEqualTo(14)
51-
assertThat(configuration.poolConfiguration.createTimeout).isEqualTo(14)
51+
assertThat(configuration.poolConfiguration.createTimeout).isEqualTo(28)
5252
assertThat(configuration.connectionTestTimeout).isEqualTo(15)
5353
assertThat(configuration.poolConfiguration.testTimeout).isEqualTo(15)
5454
assertThat(configuration.poolConfiguration.queryTimeout).isEqualTo(16)

mysql-async/src/main/java/com/github/jasync/sql/db/mysql/MySQLConnection.kt

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import com.github.jasync.sql.db.util.success
4242
import com.github.jasync.sql.db.util.toCompletableFuture
4343
import io.netty.channel.ChannelHandlerContext
4444
import io.netty.handler.ssl.SslHandler
45+
import java.time.Duration
4546
import java.util.Optional
4647
import java.util.concurrent.CompletableFuture
4748
import java.util.concurrent.atomic.AtomicLong
@@ -53,7 +54,8 @@ private val logger = KotlinLogging.logger {}
5354
@Suppress("CanBeParameter")
5455
class MySQLConnection @JvmOverloads constructor(
5556
configuration: Configuration,
56-
charsetMapper: CharsetMapper = CharsetMapper.Instance
57+
charsetMapper: CharsetMapper = CharsetMapper.Instance,
58+
withDelegate: (delegate: MySQLHandlerDelegate) -> MySQLHandlerDelegate = { delegate -> delegate }
5759
) : ConcreteConnectionBase(configuration), MySQLHandlerDelegate, Connection, TimeoutScheduler {
5860

5961
companion object {
@@ -76,7 +78,7 @@ class MySQLConnection @JvmOverloads constructor(
7678
private val connectionHandler = MySQLConnectionHandler(
7779
configuration,
7880
charsetMapper,
79-
this,
81+
withDelegate(this),
8082
configuration.eventLoopGroup,
8183
configuration.executionContext,
8284
connectionId
@@ -101,8 +103,10 @@ class MySQLConnection @JvmOverloads constructor(
101103

102104
fun isAutoCommit(): Boolean = (serverStatus and StatusFlags.AUTO_COMMIT) != 0
103105

104-
private val timeoutSchedulerImpl =
105-
TimeoutSchedulerImpl(configuration.executionContext, configuration.eventLoopGroup, this::onTimeout)
106+
private val queryTimeoutSchedulerImpl =
107+
TimeoutSchedulerImpl(configuration.executionContext, configuration.eventLoopGroup, this::onQueryTimeout)
108+
private val createTimeoutSchedulerImpl =
109+
TimeoutSchedulerImpl(configuration.executionContext, configuration.eventLoopGroup, this::onCreateTimeout)
106110

107111
private var channelClosed = false
108112
private var reportErrorAfterChannelClosed = false
@@ -113,6 +117,7 @@ class MySQLConnection @JvmOverloads constructor(
113117
override fun lastException(): Throwable? = this.lastException
114118

115119
override fun connect(): CompletableFuture<MySQLConnection> {
120+
createTimeoutSchedulerImpl.addTimeout(this.connectionPromise, Duration.ofMillis(configuration.connectionTimeout.toLong()), connectionId)
116121
this.connectionHandler.connect().onFailureAsync(configuration.executionContext) { e ->
117122
this.connectionPromise.failed(e)
118123
}
@@ -142,7 +147,10 @@ class MySQLConnection @JvmOverloads constructor(
142147
}
143148
}
144149
}
145-
is Failure -> this.disconnectionPromise.complete(ty1)
150+
is Failure -> {
151+
this.connectionHandler.closeChannel()
152+
this.disconnectionPromise.complete(ty1)
153+
}
146154
}
147155
}
148156
}
@@ -166,7 +174,7 @@ class MySQLConnection @JvmOverloads constructor(
166174
}
167175
}
168176

169-
override fun isTimeout(): Boolean = timeoutSchedulerImpl.isTimeout()
177+
override fun isTimeout(): Boolean = queryTimeoutSchedulerImpl.isTimeout()
170178

171179
override fun connected(ctx: ChannelHandlerContext) {
172180
logger.debug { "$connectionId Connected to ${ctx.channel().remoteAddress()}" }
@@ -336,7 +344,7 @@ class MySQLConnection @JvmOverloads constructor(
336344
this.setQueryPromise(promise)
337345
this.checkStoredProcedureCall(query)
338346
this.connectionHandler.sendQuery(query)
339-
timeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
347+
queryTimeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
340348
return promise
341349
}
342350

@@ -377,7 +385,11 @@ class MySQLConnection @JvmOverloads constructor(
377385

378386
override fun disconnect(): CompletableFuture<Connection> = this.close()
379387

380-
private fun onTimeout() {
388+
private fun onQueryTimeout() {
389+
disconnect()
390+
}
391+
392+
private fun onCreateTimeout() {
381393
disconnect()
382394
}
383395

@@ -397,7 +409,7 @@ class MySQLConnection @JvmOverloads constructor(
397409
this.setQueryPromise(promise)
398410
this.checkStoredProcedureCall(params.query)
399411
this.connectionHandler.sendPreparedStatement(params.query, params.values)
400-
timeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
412+
queryTimeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
401413
val closedPromise = this.releaseIfNeeded(params.release, promise, params.query)
402414
return closedPromise
403415
}

mysql-async/src/test/java/com/github/jasync/sql/db/mysql/ConnectionPoolConfigurationSpec.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,7 @@ class ConnectionPoolConfigurationSpec : ConnectionHelper() {
6565
"jdbc:mysql://$host:$port/$database?user=$username&password=$password"
6666
}
6767

68-
val connection = MySQLConnectionBuilder.createConnectionPool(connectionUri) {
69-
connectionCreateTimeout = 1
70-
}
71-
assertThat(connection.configuration.connectionCreateTimeout).isEqualTo(1)
68+
val connection = MySQLConnectionBuilder.createConnectionPool(connectionUri)
7269
try {
7370
return fn(connection)
7471
} finally {

mysql-async/src/test/java/com/github/jasync/sql/db/mysql/MySQLConnectionSpec.kt

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,56 @@
11
package com.github.jasync.sql.db.mysql
22

33
import com.github.jasync.sql.db.Configuration
4+
import com.github.jasync.sql.db.Connection
5+
import com.github.jasync.sql.db.mysql.codec.MySQLHandlerDelegate
6+
import com.github.jasync.sql.db.mysql.message.server.OkMessage
7+
import java.util.concurrent.CompletableFuture
8+
import java.util.concurrent.ExecutionException
49
import java.util.concurrent.TimeUnit
10+
import java.util.concurrent.TimeoutException
511
import kotlin.test.assertEquals
612
import org.assertj.core.api.Assertions
713
import org.junit.Test
814

915
class MySQLConnectionSpec : ConnectionHelper() {
1016

17+
@Test
18+
fun `connect should return with timeout exception after create timeout`() {
19+
20+
class MySQLSlowConnectionDelegate(
21+
private val delegate: MySQLHandlerDelegate,
22+
private val onOkSlowdownInMillis: Int
23+
) : MySQLHandlerDelegate by delegate {
24+
override fun onOk(message: OkMessage) {
25+
Thread.sleep(onOkSlowdownInMillis.toLong())
26+
delegate.onOk(message)
27+
}
28+
}
29+
30+
val configuration = Configuration(
31+
"mysql_async",
32+
"localhost",
33+
port = ContainerHelper.getPort(),
34+
password = "root",
35+
database = "mysql_async_tests",
36+
connectionTimeout = 10
37+
)
38+
39+
val connection: CompletableFuture<out Connection> = MySQLConnection(
40+
configuration,
41+
withDelegate = { delegate ->
42+
MySQLSlowConnectionDelegate(
43+
delegate,
44+
configuration.connectionTimeout * 2
45+
)
46+
}
47+
).connect()
48+
49+
verifyException(ExecutionException::class.java, TimeoutException::class.java) {
50+
awaitFuture(connection)
51+
}
52+
}
53+
1154
@Test
1255
fun `connect to a MySQL instance with a password`() {
1356
val configuration = Configuration(

pool-async/src/main/java/com/github/jasync/sql/db/pool/PoolConfiguration.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ data class PoolConfiguration @JvmOverloads constructor(
2424
val maxIdle: Long,
2525
val maxQueueSize: Int,
2626
val validationInterval: Long = 5000,
27-
val createTimeout: Long = 5000,
27+
val createTimeout: Long = 10000, // It is suggested to set this to sql.db.Configuration.connectionTimeout * 2
2828
val testTimeout: Long = 5000,
2929
val queryTimeout: Long? = null,
3030
val coroutineDispatcher: CoroutineDispatcher = Dispatchers.Default,

postgresql-async/src/main/java/com/github/jasync/sql/db/postgresql/PostgreSQLConnection.kt

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import com.ongres.scram.client.ScramClient
6161
import com.ongres.scram.client.ScramSession
6262
import com.ongres.scram.common.exception.ScramException
6363
import com.ongres.scram.common.stringprep.StringPreparations
64+
import java.time.Duration
6465
import java.util.Collections
6566
import java.util.Optional
6667
import java.util.concurrent.CompletableFuture
@@ -74,7 +75,8 @@ private val logger = KotlinLogging.logger {}
7475
class PostgreSQLConnection @JvmOverloads constructor(
7576
configuration: Configuration = DEFAULT,
7677
val encoderRegistry: ColumnEncoderRegistry = PostgreSQLColumnEncoderRegistry.Instance,
77-
val decoderRegistry: ColumnDecoderRegistry = PostgreSQLColumnDecoderRegistry.Instance
78+
val decoderRegistry: ColumnDecoderRegistry = PostgreSQLColumnDecoderRegistry.Instance,
79+
withDelegate: (delegate: PostgreSQLConnectionDelegate) -> PostgreSQLConnectionDelegate = { delegate -> delegate }
7880
) : ConcreteConnectionBase(configuration), PostgreSQLConnectionDelegate, Connection, TimeoutScheduler {
7981

8082
companion object {
@@ -85,7 +87,7 @@ class PostgreSQLConnection @JvmOverloads constructor(
8587
private val connectionHandler = PostgreSQLConnectionHandler(
8688
configuration,
8789
encoderRegistry,
88-
this,
90+
withDelegate(this),
8991
configuration.eventLoopGroup,
9092
configuration.executionContext
9193
)
@@ -99,8 +101,10 @@ class PostgreSQLConnection @JvmOverloads constructor(
99101
private var authenticated = false
100102

101103
private val connectionFuture = CompletableFuture<PostgreSQLConnection>()
102-
private val timeoutSchedulerImpl =
103-
TimeoutSchedulerImpl(configuration.executionContext, configuration.eventLoopGroup, this::onTimeout)
104+
private val queryTimeoutSchedulerImpl =
105+
TimeoutSchedulerImpl(configuration.executionContext, configuration.eventLoopGroup, this::onQueryTimeout)
106+
private val createTimeoutSchedulerImpl =
107+
TimeoutSchedulerImpl(configuration.executionContext, configuration.eventLoopGroup, this::onCreateTimeout)
104108

105109
private var recentError = false
106110
private val queryPromiseReference = AtomicReference<Optional<CompletableFuture<QueryResult>>>(Optional.empty())
@@ -120,6 +124,7 @@ class PostgreSQLConnection @JvmOverloads constructor(
120124
fun isReadyForQuery(): Boolean = !this.queryPromise().isPresent
121125

122126
override fun connect(): CompletableFuture<PostgreSQLConnection> {
127+
createTimeoutSchedulerImpl.addTimeout(this.connectionFuture, Duration.ofMillis(configuration.connectionTimeout.toLong()), connectionId)
123128
this.connectionHandler.connect().onFailureAsync(configuration.executionContext) { e ->
124129
this.connectionFuture.failed(e)
125130
}
@@ -138,13 +143,17 @@ class PostgreSQLConnection @JvmOverloads constructor(
138143
override fun disconnect(): CompletableFuture<Connection> =
139144
this.connectionHandler.disconnect().toCompletableFuture().mapAsync(configuration.executionContext) { c -> this }
140145

141-
private fun onTimeout() {
146+
private fun onQueryTimeout() {
147+
disconnect()
148+
}
149+
150+
private fun onCreateTimeout() {
142151
disconnect()
143152
}
144153

145154
override fun isConnected(): Boolean = this.connectionHandler.isConnected()
146155

147-
override fun isTimeout(): Boolean = timeoutSchedulerImpl.isTimeout()
156+
override fun isTimeout(): Boolean = queryTimeoutSchedulerImpl.isTimeout()
148157

149158
override fun isQuerying(): Boolean = queryPromise().isPresent
150159

@@ -161,7 +170,7 @@ class PostgreSQLConnection @JvmOverloads constructor(
161170
this.setQueryPromise(promise)
162171

163172
write(QueryMessage(query))
164-
timeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
173+
queryTimeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
165174
return promise
166175
}
167176

@@ -201,7 +210,7 @@ class PostgreSQLConnection @JvmOverloads constructor(
201210
)
202211
}
203212
)
204-
timeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
213+
queryTimeoutSchedulerImpl.addTimeout(promise, configuration.queryTimeout, connectionId)
205214
val closedPromise = this.releaseIfNeeded(params.release, promise, params.query)
206215
return closedPromise
207216
}

postgresql-async/src/main/java/com/github/jasync/sql/db/postgresql/codec/PostgreSQLConnectionHandler.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,10 @@ class PostgreSQLConnectionHandler(
107107
}
108108
}
109109
}
110-
is Failure -> this.disconnectionPromise.failed(ty1.exception)
110+
is Failure -> {
111+
this.currentContext!!.channel().close()
112+
this.disconnectionPromise.failed(ty1.exception)
113+
}
111114
}
112115
}
113116
}

postgresql-async/src/test/java/com/github/aysnc/sql/db/integration/PostgreSQLConnectionSpec.kt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import com.github.jasync.sql.db.interceptor.MdcQueryInterceptorSupplier
1313
import com.github.jasync.sql.db.interceptor.QueryInterceptor
1414
import com.github.jasync.sql.db.invoke
1515
import com.github.jasync.sql.db.postgresql.PostgreSQLConnection
16+
import com.github.jasync.sql.db.postgresql.codec.PostgreSQLConnectionDelegate
1617
import com.github.jasync.sql.db.postgresql.exceptions.QueryMustNotBeNullOrEmptyException
1718
import com.github.jasync.sql.db.util.ExecutorServiceUtils
1819
import com.github.jasync.sql.db.util.flatMapAsync
@@ -25,6 +26,7 @@ import java.time.LocalDateTime
2526
import java.util.concurrent.CompletableFuture
2627
import java.util.concurrent.ExecutionException
2728
import java.util.concurrent.TimeUnit
29+
import java.util.concurrent.TimeoutException
2830
import java.util.function.Supplier
2931
import org.assertj.core.api.Assertions.assertThat
3032
import org.junit.Test
@@ -124,6 +126,35 @@ class PostgreSQLConnectionSpec : DatabaseTestHelper() {
124126
" insert into prepared_statement_test (name) values ('John Doe') returning id"
125127
val preparedStatementSelect = "select * from prepared_statement_test"
126128

129+
@Test
130+
fun `connect should return with timeout exception after create timeout`() {
131+
132+
class PostgreSQLSlowConnectionDelegate(
133+
private val delegate: PostgreSQLConnectionDelegate,
134+
private val onReadyForQuerySlowdownInMillis: Int
135+
) : PostgreSQLConnectionDelegate by delegate {
136+
override fun onReadyForQuery() {
137+
Thread.sleep(onReadyForQuerySlowdownInMillis.toLong())
138+
delegate.onReadyForQuery()
139+
}
140+
}
141+
142+
val configuration = defaultConfiguration.copy(connectionTimeout = 10)
143+
val connection: CompletableFuture<out Connection> = PostgreSQLConnection(
144+
configuration,
145+
withDelegate = { delegate ->
146+
PostgreSQLSlowConnectionDelegate(
147+
delegate,
148+
configuration.connectionTimeout * 2
149+
)
150+
}
151+
).connect()
152+
153+
verifyException(ExecutionException::class.java, TimeoutException::class.java) {
154+
awaitFuture(connection)
155+
}
156+
}
157+
127158
@Test
128159
fun `handler should connect to the database`() {
129160

postgresql-async/src/test/java/com/github/aysnc/sql/db/integration/PostgreSQLPoolConfigurationSpec.kt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,7 @@ class PostgreSQLPoolConfigurationSpec : DatabaseTestHelper() {
8989
"jdbc:postgresql://$host:$port/$database?user=$username&password=$password"
9090
}
9191

92-
val connection = PostgreSQLConnectionBuilder.createConnectionPool(connectionUri) {
93-
connectionCreateTimeout = 1
94-
}
95-
assertThat(connection.configuration.connectionCreateTimeout).isEqualTo(1)
92+
val connection = PostgreSQLConnectionBuilder.createConnectionPool(connectionUri)
9693
try {
9794
return fn(connection)
9895
} finally {

0 commit comments

Comments
 (0)
0