8000 add soft evict impl by oshai · Pull Request #399 · jasync-sql/jasync-sql · GitHub
[go: up one dir, main page]

Skip to content

add soft evict impl #399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ class ConnectionPool<T : ConcreteConnection>(

fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>> = objectPool.giveBack(item)

fun softEvictConnections(): CompletableFuture<AsyncObjectPool<T>> = objectPool.softEvict()

/**
*
* Closes the pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ internal constructor(
return future
}

override fun softEvict(): CompletableFuture<AsyncObjectPool<T>> {
val future = CompletableFuture<Unit>()
val offered = actor.trySend(SoftEvictAll(future)).isSuccess
if (!offered) {
future.completeExceptionally(Exception("could not offer to actor"))
}
return future.map { this }
}

override fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>> {
val future = CompletableFuture<Unit>()
val offered = actor.trySend(GiveBack(item, future)).isSuccess
Expand Down Expand Up @@ -166,6 +175,7 @@ private sealed class ActorObjectPoolMessage<T : PooledObject> {
}

private class Take<T : PooledObject>(val future: CompletableFuture<T>) : ActorObjectPoolMessage<T>()
private class SoftEvictAll<T : PooledObject>(val future: CompletableFuture<Unit>) : ActorObjectPoolMessage<T>()
private class GiveBack<T : PooledObject>(
val returnedItem: T,
val future: CompletableFuture<Unit>,
Expand All @@ -183,7 +193,8 @@ private class GiveBack<T : PooledObject>(
private class Created<T : PooledObject>(
val itemCreateId: Int,
val item: Try<T>,
val takeAskFuture: CompletableFuture<T>?
val takeAskFuture: CompletableFuture<T>?,
val objectHolder: ObjectHolder<CompletableFuture<out T>>
) : ActorObjectPoolMessage<T>() {
override fun toString(): String {
val id = when (item) {
Expand Down Expand Up @@ -227,6 +238,7 @@ private class ObjectPoolActor<T : PooledObject>(
when (message) {
is Take<T> -> handleTake(message)
is GiveBack<T> -> handleGiveBack(message)
is SoftEvictAll<T> -> handleSoftEvictAll(message)
is Created<T> -> handleCreated(message)
is TestPoolItems<T> -> handleTestPoolItems()
is Close<T> -> handleClose(message)
Expand All @@ -235,6 +247,19 @@ private class ObjectPoolActor<T : PooledObject>(
scheduleNewItemsIfNeeded()
}

private fun handleSoftEvictAll(message: SoftEvictAll<T>) {
evictAvailableItems()
inUseItems.values.forEach { it.markForEviction = true }
inCreateItems.entries.forEach { it.value.markForEviction = true }
logger.trace { "handleSoftEvictAll - done" }
message.future.complete(Unit)
}

private fun evictAvailableItems() {
availableItems.forEach { it.item.destroy() }
availableItems.clear()
}

private fun scheduleNewItemsIfNeeded() {
logger.trace { "scheduleNewItemsIfNeeded - $poolStatusString" }
// deal with inconsistency in case we have items but also waiting futures
Expand All @@ -247,7 +272,7 @@ private class ObjectPoolActor<T : PooledObject>(
return
}
}
// deal with inconsistency in case we have waiting futures, and but we can create new items for them
// deal with inconsistency in case we have waiting futures, but we can create new items for them
while (availableItems.isEmpty() &&
waitingQueue.isNotEmpty() &&
totalItems < configuration.maxObjects &&
Expand All @@ -273,8 +298,7 @@ private class ObjectPoolActor<T : PooledObject>(
try {
closed = true
channel.close()
availableItems.forEach { it.item.destroy() }
availableItems.clear()
evictAvailableItems()
inUseItems.forEach {
it.value.cleanedByPool = true
it.key.destroy()
Expand Down Expand Up @@ -368,10 +392,12 @@ private class ObjectPoolActor<T : PooledObject>(
logger.trace { "releasing idle item ${item.id}" }
item.destroy()
}

configuration.maxObjectTtl != null && System.currentTimeMillis() - item.creationTime > configuration.maxObjectTtl -> {
logger.trace { "releasing item past ttl ${item.id}" }
item.destroy()
}

else -> {
val test = objectFactory.test(item)
inUseItems[item] = ItemInUseHolder(item.id, isInTest = true, testFuture = test)
Expand Down Expand Up @@ -411,7 +437,7 @@ private class ObjectPoolActor<T : PooledObject>(
is Failure -> future.completeExceptionally(message.item.exception)
is Success -> {
try {
message.item.value.borrowTo(future)
message.item.value.borrowTo(future, markForEviction = message.objectHolder.markForEviction)
} catch (e: Exception) {
future.completeExceptionally(e)
}
Expand All @@ -420,11 +446,11 @@ private class ObjectPoolActor<T : PooledObject>(
}
}

private fun T.borrowTo(future: CompletableFuture<T>, validate: Boolean = true) {
private fun T.borrowTo(future: CompletableFuture<T>, validate: Boolean = true, markForEviction: Boolean = false,) {
if (validate) {
validate(this)
}
inUseItems[this] = ItemInUseHolder(this.id, isInTest = false)
inUseItems[this] = ItemInUseHolder(this.id, isInTest = false, markForEviction = markForEviction)
logger.trace { "borrowed: ${this.id} ; $poolStatusString" }
future.complete(this)
}
Expand All @@ -450,6 +476,11 @@ private class ObjectPoolActor<T : PooledObject>(
}
validate(message.returnedItem)
message.future.complete(Unit)
if (removed.markForEviction) {
logger.trace { "GiveBack got item ${message.returnedItem.id} marked for eviction, so destroying it" }
message.returnedItem.destroy()
return
}
if (waitingQueue.isEmpty()) {
if (availableItems.any { holder -> message.returnedItem === holder.item }) {
logger.warn { "trying to give back an item to the pool twice ${message.returnedItem.id}, will ignore that" }
Expand Down Expand Up @@ -533,10 +564,11 @@ private class ObjectPoolActor<T : PooledObject>(
val created = objectFactory.create()
val itemCreateId = createIndex
createIndex++
inCreateItems[itemCreateId] = ObjectHolder(created)
val objectHolder = ObjectHolder(created)
inCreateItems[itemCreateId] = objectHolder
logger.trace { "createObject createRequest=$itemCreateId" }
created.onComplete { tried ->
offerOrLog(Created(itemCreateId, tried, future)) {
offerOrLog(Created(itemCreateId, tried, future, objectHolder)) {
"failed to offer on created item $itemCreateId"
}
}
Expand All @@ -558,9 +590,11 @@ private open class PoolObjectHolder<T : PooledObject>(
val timeElapsed: Long get 8000 () = System.currentTimeMillis() - time
}

private class ObjectHolder<T : Any>(val item: T) {
private class ObjectHolder<T : Any>(
val item: T,
var markForEviction: Boolean = false,
) {
val time = System.currentTimeMillis()

val timeElapsed: Long get() = System.currentTimeMillis() - time
}

Expand All @@ -569,12 +603,13 @@ private data class ItemInUseHolder<T : PooledObject>(
val isInTest: Boolean,
val testFuture: CompletableFuture<T>? = null,
val time: Long = System.currentTimeMillis(),
var cleanedByPool: Boolean = false
var cleanedByPool: Boolean = false,
var markForEviction: Boolean = false,

) {
val timeElapsed: Long get() = System.currentTimeMillis() - time

@Suppress("unused", "ProtectedInFinal")
@Suppress("unused")
protected fun finalize() {
if (!cleanedByPool) {
logger.warn { "LEAK DETECTED for item $this - $timeElapsed ms since in use" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ interface AsyncObjectPool<T> {

fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>>

/**
* Mark all objects in the pool as invalid. Objects will be evicted when not in use.
*/
fun softEvict(): CompletableFuture<AsyncObjectPool<T>>

/**
*
* Closes this pool and future calls to **take** will cause the Future to raise an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,41 @@ class ActorBasedObjectPoolTest {
assertThat(factory.validated).isEqualTo(listOf(result, result, result))
}

@Test
fun `softEviction - basic take-evict-return pool should be empty`() {
tested = createDefaultPool()
val result = tested.take().get()
tested.softEvict().get()
tested.giveBack(result).get()
assertThat(tested.availableItems).isEmpty()
}

@Test
fun `softEviction - minimum number of objects is maintained, but objects are replaced`() {
tested = ActorBasedObjectPool(
factory,
configuration.copy(minIdleObjects = 3),
false
)
tested.take().get()
await.untilCallTo { tested.availableItemsSize } matches { it == 3 }
val availableItems = tested.availableItems
tested.softEvict().get()
await.untilCallTo { tested.availableItemsSize } matches { it == 3 }
assertThat(tested.availableItems.toSet().intersect(availableItems.toSet())).isEmpty()
}

@Test
fun `test for objects in create eviction in case of softEviction`() {
factory.creationStuckTime = 10
tested = createDefaultPool()
val itemPromise = tested.take()
tested.softEvict().get()
val item = itemPromise.get()
tested.giveBack(item).get()
assertThat(tested.availableItemsSize).isEqualTo(0)
}

@Test
fun `take2-return2-take first not validated second is ok should be returned`() {
tested = createDefaultPool()
Expand Down
0