From b83ed71b2360d0824719c26f8ddf64ecd6f98683 Mon Sep 17 00:00:00 2001 From: oshai Date: Thu, 15 Jun 2023 22:09:35 +0300 Subject: [PATCH 1/2] add soft evict impl Issue #371 --- .../jasync/sql/db/pool/ConnectionPool.kt | 2 + .../sql/db/pool/ActorBasedObjectPool.kt | 57 +++++++++++++++---- .../jasync/sql/db/pool/AsyncObjectPool.kt | 5 ++ 3 files changed, 53 insertions(+), 11 deletions(-) diff --git a/db-async-common/src/main/java/com/github/jasync/sql/db/pool/ConnectionPool.kt b/db-async-common/src/main/java/com/github/jasync/sql/db/pool/ConnectionPool.kt index 77d24b44f..378c3a0ef 100644 --- a/db-async-common/src/main/java/com/github/jasync/sql/db/pool/ConnectionPool.kt +++ b/db-async-common/src/main/java/com/github/jasync/sql/db/pool/ConnectionPool.kt @@ -131,6 +131,8 @@ class ConnectionPool( fun giveBack(item: T): CompletableFuture> = objectPool.giveBack(item) + fun softEvictConnections(): CompletableFuture> = objectPool.softEvict() + /** * * Closes the pool diff --git a/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt b/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt index 971ddd51f..c652e3d0e 100644 --- a/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt +++ b/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt @@ -91,6 +91,15 @@ internal constructor( return future } + override fun softEvict(): CompletableFuture> { + val future = CompletableFuture() + val offered = actor.trySend(SoftEvictAll(future)).isSuccess + if (!offered) { + future.completeExceptionally(Exception("could not offer to actor")) + } + return future.map { this } + } + override fun giveBack(item: T): CompletableFuture> { val future = CompletableFuture() val offered = actor.trySend(GiveBack(item, future)).isSuccess @@ -166,6 +175,7 @@ private sealed class ActorObjectPoolMessage { } private class Take(val future: CompletableFuture) : ActorObjectPoolMessage() +private class SoftEvictAll(val future: CompletableFuture) : ActorObjectPoolMessage() private class GiveBack( val returnedItem: T, val future: CompletableFuture, @@ -183,7 +193,8 @@ private class GiveBack( private class Created( val itemCreateId: Int, val item: Try, - val takeAskFuture: CompletableFuture? + val takeAskFuture: CompletableFuture?, + val objectHolder: ObjectHolder> ) : ActorObjectPoolMessage() { override fun toString(): String { val id = when (item) { @@ -227,6 +238,7 @@ private class ObjectPoolActor( when (message) { is Take -> handleTake(message) is GiveBack -> handleGiveBack(message) + is SoftEvictAll -> handleSoftEvictAll(message) is Created -> handleCreated(message) is TestPoolItems -> handleTestPoolItems() is Close -> handleClose(message) @@ -235,6 +247,19 @@ private class ObjectPoolActor( scheduleNewItemsIfNeeded() } + private fun handleSoftEvictAll(message: SoftEvictAll) { + evictAvailableItems() + inUseItems.values.forEach { it.markForEviction = true } + inCreateItems.entries.forEach { it.value.markForEviction = true } + logger.trace { "handleSoftEvictAll - done" } + message.future.complete(Unit) + } + + private fun evictAvailableItems() { + availableItems.forEach { it.item.destroy() } + availableItems.clear() + } + private fun scheduleNewItemsIfNeeded() { logger.trace { "scheduleNewItemsIfNeeded - $poolStatusString" } // deal with inconsistency in case we have items but also waiting futures @@ -273,8 +298,7 @@ private class ObjectPoolActor( try { closed = true channel.close() - availableItems.forEach { it.item.destroy() } - availableItems.clear() + evictAvailableItems() inUseItems.forEach { it.value.cleanedByPool = true it.key.destroy() @@ -368,10 +392,12 @@ private class ObjectPoolActor( logger.trace { "releasing idle item ${item.id}" } item.destroy() } + configuration.maxObjectTtl != null && System.currentTimeMillis() - item.creationTime > configuration.maxObjectTtl -> { logger.trace { "releasing item past ttl ${item.id}" } item.destroy() } + else -> { val test = objectFactory.test(item) inUseItems[item] = ItemInUseHolder(item.id, isInTest = true, testFuture = test) @@ -411,7 +437,7 @@ private class ObjectPoolActor( is Failure -> future.completeExceptionally(message.item.exception) is Success -> { try { - message.item.value.borrowTo(future) + message.item.value.borrowTo(future, markForEviction = message.objectHolder.markForEviction) } catch (e: Exception) { future.completeExceptionally(e) } @@ -420,11 +446,11 @@ private class ObjectPoolActor( } } - private fun T.borrowTo(future: CompletableFuture, validate: Boolean = true) { + private fun T.borrowTo(future: CompletableFuture, validate: Boolean = true, markForEviction: Boolean = false,) { if (validate) { validate(this) } - inUseItems[this] = ItemInUseHolder(this.id, isInTest = false) + inUseItems[this] = ItemInUseHolder(this.id, isInTest = false, markForEviction = markForEviction) logger.trace { "borrowed: ${this.id} ; $poolStatusString" } future.complete(this) } @@ -450,6 +476,11 @@ private class ObjectPoolActor( } validate(message.returnedItem) message.future.complete(Unit) + if (removed.markForEviction) { + logger.trace { "GiveBack got item ${message.returnedItem.id} marked for eviction, so destroying it" } + message.returnedItem.destroy() + return + } if (waitingQueue.isEmpty()) { if (availableItems.any { holder -> message.returnedItem === holder.item }) { logger.warn { "trying to give back an item to the pool twice ${message.returnedItem.id}, will ignore that" } @@ -533,10 +564,11 @@ private class ObjectPoolActor( val created = objectFactory.create() val itemCreateId = createIndex createIndex++ - inCreateItems[itemCreateId] = ObjectHolder(created) + val objectHolder = ObjectHolder(created) + inCreateItems[itemCreateId] = objectHolder logger.trace { "createObject createRequest=$itemCreateId" } created.onComplete { tried -> - offerOrLog(Created(itemCreateId, tried, future)) { + offerOrLog(Created(itemCreateId, tried, future, objectHolder)) { "failed to offer on created item $itemCreateId" } } @@ -558,9 +590,11 @@ private open class PoolObjectHolder( val timeElapsed: Long get() = System.currentTimeMillis() - time } -private class ObjectHolder(val item: T) { +private class ObjectHolder( + val item: T, + var markForEviction: Boolean = false, +) { val time = System.currentTimeMillis() - val timeElapsed: Long get() = System.currentTimeMillis() - time } @@ -569,7 +603,8 @@ private data class ItemInUseHolder( val isInTest: Boolean, val testFuture: CompletableFuture? = null, val time: Long = System.currentTimeMillis(), - var cleanedByPool: Boolean = false + var cleanedByPool: Boolean = false, + var markForEviction: Boolean = false, ) { val timeElapsed: Long get() = System.currentTimeMillis() - time diff --git a/pool-async/src/main/java/com/github/jasync/sql/db/pool/AsyncObjectPool.kt b/pool-async/src/main/java/com/github/jasync/sql/db/pool/AsyncObjectPool.kt index cd1510ff5..e62ca2e26 100644 --- a/pool-async/src/main/java/com/github/jasync/sql/db/pool/AsyncObjectPool.kt +++ b/pool-async/src/main/java/com/github/jasync/sql/db/pool/AsyncObjectPool.kt @@ -40,6 +40,11 @@ interface AsyncObjectPool { fun giveBack(item: T): CompletableFuture> + /** + * Mark all objects in the pool as invalid. Objects will be evicted when not in use. + */ + fun softEvict(): CompletableFuture> + /** * * Closes this pool and future calls to **take** will cause the Future to raise an From 246c57033c2e1c1c36e0245066a5e1b8bff97571 Mon Sep 17 00:00:00 2001 From: oshai Date: Fri, 16 Jun 2023 00:17:12 +0300 Subject: [PATCH 2/2] add tests for soft evict --- .../sql/db/pool/ActorBasedObjectPool.kt | 4 +-- .../sql/db/pool/ActorBasedObjectPoolTest.kt | 35 +++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt b/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt index c652e3d0e..a9f0e893c 100644 --- a/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt +++ b/pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt @@ -272,7 +272,7 @@ private class ObjectPoolActor( return } } - // deal with inconsistency in case we have waiting futures, and but we can create new items for them + // deal with inconsistency in case we have waiting futures, but we can create new items for them while (availableItems.isEmpty() && waitingQueue.isNotEmpty() && totalItems < configuration.maxObjects && @@ -609,7 +609,7 @@ private data class ItemInUseHolder( ) { val timeElapsed: Long get() = System.currentTimeMillis() - time - @Suppress("unused", "ProtectedInFinal") + @Suppress("unused") protected fun finalize() { if (!cleanedByPool) { logger.warn { "LEAK DETECTED for item $this - $timeElapsed ms since in use" } diff --git a/pool-async/src/test/java/com/github/jasync/sql/db/pool/ActorBasedObjectPoolTest.kt b/pool-async/src/test/java/com/github/jasync/sql/db/pool/ActorBasedObjectPoolTest.kt index d3671d751..c44d1d271 100644 --- a/pool-async/src/test/java/com/github/jasync/sql/db/pool/ActorBasedObjectPoolTest.kt +++ b/pool-async/src/test/java/com/github/jasync/sql/db/pool/ActorBasedObjectPoolTest.kt @@ -150,6 +150,41 @@ class ActorBasedObjectPoolTest { assertThat(factory.validated).isEqualTo(listOf(result, result, result)) } + @Test + fun `softEviction - basic take-evict-return pool should be empty`() { + tested = createDefaultPool() + val result = tested.take().get() + tested.softEvict().get() + tested.giveBack(result).get() + assertThat(tested.availableItems).isEmpty() + } + + @Test + fun `softEviction - minimum number of objects is maintained, but objects are replaced`() { + tested = ActorBasedObjectPool( + factory, + configuration.copy(minIdleObjects = 3), + false + ) + tested.take().get() + await.untilCallTo { tested.availableItemsSize } matches { it == 3 } + val availableItems = tested.availableItems + tested.softEvict().get() + await.untilCallTo { tested.availableItemsSize } matches { it == 3 } + assertThat(tested.availableItems.toSet().intersect(availableItems.toSet())).isEmpty() + } + + @Test + fun `test for objects in create eviction in case of softEviction`() { + factory.creationStuckTime = 10 + tested = createDefaultPool() + val itemPromise = tested.take() + tested.softEvict().get() + val item = itemPromise.get() + tested.giveBack(item).get() + assertThat(tested.availableItemsSize).isEqualTo(0) + } + @Test fun `take2-return2-take first not validated second is ok should be returned`() { tested = createDefaultPool()