@@ -91,6 +91,15 @@ internal constructor(
9191 return future
9292 }
9393
94+ override fun softEvict (): CompletableFuture <AsyncObjectPool <T >> {
95+ val future = CompletableFuture <Unit >()
96+ val offered = actor.trySend(SoftEvictAll (future)).isSuccess
97+ if (! offered) {
98+ future.completeExceptionally(Exception (" could not offer to actor" ))
99+ }
100+ return future.map { this }
101+ }
102+
94103 override fun giveBack (item : T ): CompletableFuture <AsyncObjectPool <T >> {
95104 val future = CompletableFuture <Unit >()
96105 val offered = actor.trySend(GiveBack (item, future)).isSuccess
@@ -166,6 +175,7 @@ private sealed class ActorObjectPoolMessage<T : PooledObject> {
166175}
167176
168177private class Take <T : PooledObject >(val future : CompletableFuture <T >) : ActorObjectPoolMessage<T>()
178+ private class SoftEvictAll <T : PooledObject >(val future : CompletableFuture <Unit >) : ActorObjectPoolMessage<T>()
169179private class GiveBack <T : PooledObject >(
170180 val returnedItem : T ,
171181 val future : CompletableFuture <Unit >,
@@ -183,7 +193,8 @@ private class GiveBack<T : PooledObject>(
183193private class Created <T : PooledObject >(
184194 val itemCreateId : Int ,
185195 val item : Try <T >,
186- val takeAskFuture : CompletableFuture <T >?
196+ val takeAskFuture : CompletableFuture <T >? ,
197+ val objectHolder : ObjectHolder <CompletableFuture <out T >>
187198) : ActorObjectPoolMessage<T>() {
188199 override fun toString (): String {
189200 val id = when (item) {
@@ -227,6 +238,7 @@ private class ObjectPoolActor<T : PooledObject>(
227238 when (message) {
228239 is Take <T > -> handleTake(message)
229240 is GiveBack <T > -> handleGiveBack(message)
241+ is SoftEvictAll <T > -> handleSoftEvictAll(message)
230242 is Created <T > -> handleCreated(message)
231243 is TestPoolItems <T > -> handleTestPoolItems()
232244 is Close <T > -> handleClose(message)
@@ -235,6 +247,19 @@ private class ObjectPoolActor<T : PooledObject>(
235247 scheduleNewItemsIfNeeded()
236248 }
237249
250+ private fun handleSoftEvictAll (message : SoftEvictAll <T >) {
251+ evictAvailableItems()
252+ inUseItems.values.forEach { it.markForEviction = true }
253+ inCreateItems.entries.forEach { it.value.markForEviction = true }
254+ logger.trace { " handleSoftEvictAll - done" }
255+ message.future.complete(Unit )
256+ }
257+
258+ private fun evictAvailableItems () {
259+ availableItems.forEach { it.item.destroy() }
260+ availableItems.clear()
261+ }
262+
238263 private fun scheduleNewItemsIfNeeded () {
239264 logger.trace { " scheduleNewItemsIfNeeded - $poolStatusString " }
240265 // deal with inconsistency in case we have items but also waiting futures
@@ -273,8 +298,7 @@ private class ObjectPoolActor<T : PooledObject>(
273298 try {
274299 closed = true
275300 channel.close()
276- availableItems.forEach { it.item.destroy() }
277- availableItems.clear()
301+ evictAvailableItems()
278302 inUseItems.forEach {
279303 it.value.cleanedByPool = true
280304 it.key.destroy()
@@ -368,10 +392,12 @@ private class ObjectPoolActor<T : PooledObject>(
368392 logger.trace { " releasing idle item ${item.id} " }
369393 item.destroy()
370394 }
395+
371396 configuration.maxObjectTtl != null && System .currentTimeMillis() - item.creationTime > configuration.maxObjectTtl -> {
372397 logger.trace { " releasing item past ttl ${item.id} " }
373398 item.destroy()
374399 }
400+
375401 else -> {
376402 val test = objectFactory.test(item)
377403 inUseItems[item] = ItemInUseHolder (item.id, isInTest = true , testFuture = test)
@@ -411,7 +437,7 @@ private class ObjectPoolActor<T : PooledObject>(
411437 is Failure -> future.completeExceptionally(message.item.exception)
412438 is Success -> {
413439 try {
414- message.item.value.borrowTo(future)
440+ message.item.value.borrowTo(future, markForEviction = message.objectHolder.markForEviction )
415441 } catch (e: Exception ) {
416442 future.completeExceptionally(e)
417443 }
@@ -420,11 +446,11 @@ private class ObjectPoolActor<T : PooledObject>(
420446 }
421447 }
422448
423- private fun T.borrowTo (future : CompletableFuture <T >, validate : Boolean = true) {
449+ private fun T.borrowTo (future : CompletableFuture <T >, validate : Boolean = true, markForEviction : Boolean = false, ) {
424450 if (validate) {
425451 validate(this )
426452 }
427- inUseItems[this ] = ItemInUseHolder (this .id, isInTest = false )
453+ inUseItems[this ] = ItemInUseHolder (this .id, isInTest = false , markForEviction = markForEviction )
428454 logger.trace { " borrowed: ${this .id} ; $poolStatusString " }
429455 future.complete(this )
430456 }
@@ -450,6 +476,11 @@ private class ObjectPoolActor<T : PooledObject>(
450476 }
451477 validate(message.returnedItem)
452478 message.future.complete(Unit )
479+ if (removed.markForEviction) {
480+ logger.trace { " GiveBack got item ${message.returnedItem.id} marked for eviction, so destroying it" }
481+ message.returnedItem.destroy()
482+ return
483+ }
453484 if (waitingQueue.isEmpty()) {
454485 if (availableItems.any { holder -> message.returnedItem == = holder.item }) {
455486 logger.warn { " trying to give back an item to the pool twice ${message.returnedItem.id} , will ignore that" }
@@ -533,10 +564,11 @@ private class ObjectPoolActor<T : PooledObject>(
533564 val created = objectFactory.create()
534565 val itemCreateId = createIndex
535566 createIndex++
536- inCreateItems[itemCreateId] = ObjectHolder (created)
567+ val objectHolder = ObjectHolder (created)
568+ inCreateItems[itemCreateId] = objectHolder
537569 logger.trace { " createObject createRequest=$itemCreateId " }
538570 created.onComplete { tried ->
539- offerOrLog(Created (itemCreateId, tried, future)) {
571+ offerOrLog(Created (itemCreateId, tried, future, objectHolder )) {
540572 " failed to offer on created item $itemCreateId "
541573 }
542574 }
@@ -558,9 +590,11 @@ private open class PoolObjectHolder<T : PooledObject>(
558590 val timeElapsed: Long get() = System .currentTimeMillis() - time
559591}
560592
561- private class ObjectHolder <T : Any >(val item : T ) {
593+ private class ObjectHolder <T : Any >(
594+ val item : T ,
595+ var markForEviction : Boolean = false ,
596+ ) {
562597 val time = System .currentTimeMillis()
563-
564598 val timeElapsed: Long get() = System .currentTimeMillis() - time
565599}
566600
@@ -569,7 +603,8 @@ private data class ItemInUseHolder<T : PooledObject>(
569603 val isInTest : Boolean ,
570604 val testFuture : CompletableFuture <T >? = null ,
571605 val time : Long = System .currentTimeMillis(),
572- var cleanedByPool : Boolean = false
606+ var cleanedByPool : Boolean = false ,
607+ var markForEviction : Boolean = false ,
573608
574609) {
575610 val timeElapsed: Long get() = System .currentTimeMillis() - time
0 commit comments