Skip to content

Commit 0a8d4ad

Browse files
committed
add soft evict impl
Issue #371
1 parent 7279ffb commit 0a8d4ad

3 files changed

Lines changed: 53 additions & 11 deletions

File tree

db-async-common/src/main/java/com/github/jasync/sql/db/pool/ConnectionPool.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ class ConnectionPool<T : ConcreteConnection>(
131131

132132
fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>> = objectPool.giveBack(item)
133133

134+
fun softEvictConnections(): CompletableFuture<AsyncObjectPool<T>> = objectPool.softEvict()
135+
134136
/**
135137
*
136138
* Closes the pool

pool-async/src/main/java/com/github/jasync/sql/db/pool/ActorBasedObjectPool.kt

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,15 @@ internal constructor(
9191
return future
9292
}
9393

94+
override fun softEvict(): CompletableFuture<AsyncObjectPool<T>> {
95+
val future = CompletableFuture<Unit>()
96+
val offered = actor.trySend(SoftEvictAll(future)).isSuccess
97+
if (!offered) {
98+
future.completeExceptionally(Exception("could not offer to actor"))
99+
}
100+
return future.map { this }
101+
}
102+
94103
override fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>> {
95104
val future = CompletableFuture<Unit>()
96105
val offered = actor.trySend(GiveBack(item, future)).isSuccess
@@ -166,6 +175,7 @@ private sealed class ActorObjectPoolMessage<T : PooledObject> {
166175
}
167176

168177
private class Take<T : PooledObject>(val future: CompletableFuture<T>) : ActorObjectPoolMessage<T>()
178+
private class SoftEvictAll<T : PooledObject>(val future: CompletableFuture<Unit>) : ActorObjectPoolMessage<T>()
169179
private class GiveBack<T : PooledObject>(
170180
val returnedItem: T,
171181
val future: CompletableFuture<Unit>,
@@ -183,7 +193,8 @@ private class GiveBack<T : PooledObject>(
183193
private class Created<T : PooledObject>(
184194
val itemCreateId: Int,
185195
val item: Try<T>,
186-
val takeAskFuture: CompletableFuture<T>?
196+
val takeAskFuture: CompletableFuture<T>?,
197+
val objectHolder: ObjectHolder<CompletableFuture<out T>>
187198
) : ActorObjectPoolMessage<T>() {
188199
override fun toString(): String {
189200
val id = when (item) {
@@ -227,6 +238,7 @@ private class ObjectPoolActor<T : PooledObject>(
227238
when (message) {
228239
is Take<T> -> handleTake(message)
229240
is GiveBack<T> -> handleGiveBack(message)
241+
is SoftEvictAll<T> -> handleSoftEvictAll(message)
230242
is Created<T> -> handleCreated(message)
231243
is TestPoolItems<T> -> handleTestPoolItems()
232244
is Close<T> -> handleClose(message)
@@ -235,6 +247,19 @@ private class ObjectPoolActor<T : PooledObject>(
235247
scheduleNewItemsIfNeeded()
236248
}
237249

250+
private fun handleSoftEvictAll(message: SoftEvictAll<T>) {
251+
evictAvailableItems()
252+
inUseItems.values.forEach { it.markForEviction = true }
253+
inCreateItems.entries.forEach { it.value.markForEviction = true }
254+
logger.trace { "handleSoftEvictAll - done" }
255+
message.future.complete(Unit)
256+
}
257+
258+
private fun evictAvailableItems() {
259+
availableItems.forEach { it.item.destroy() }
260+
availableItems.clear()
261+
}
262+
238263
private fun scheduleNewItemsIfNeeded() {
239264
logger.trace { "scheduleNewItemsIfNeeded - $poolStatusString" }
240265
// deal with inconsistency in case we have items but also waiting futures
@@ -273,8 +298,7 @@ private class ObjectPoolActor<T : PooledObject>(
273298
try {
274299
closed = true
275300
channel.close()
276-
availableItems.forEach { it.item.destroy() }
277-
availableItems.clear()
301+
evictAvailableItems()
278302
inUseItems.forEach {
279303
it.value.cleanedByPool = true
280304
it.key.destroy()
@@ -368,10 +392,12 @@ private class ObjectPoolActor<T : PooledObject>(
368392
logger.trace { "releasing idle item ${item.id}" }
369393
item.destroy()
370394
}
395+
371396
configuration.maxObjectTtl != null && System.currentTimeMillis() - item.creationTime > configuration.maxObjectTtl -> {
372397
logger.trace { "releasing item past ttl ${item.id}" }
373398
item.destroy()
374399
}
400+
375401
else -> {
376402
val test = objectFactory.test(item)
377403
inUseItems[item] = ItemInUseHolder(item.id, isInTest = true, testFuture = test)
@@ -411,7 +437,7 @@ private class ObjectPoolActor<T : PooledObject>(
411437
is Failure -> future.completeExceptionally(message.item.exception)
412438
is Success -> {
413439
try {
414-
message.item.value.borrowTo(future)
440+
message.item.value.borrowTo(future, markForEviction = message.objectHolder.markForEviction)
415441
} catch (e: Exception) {
416442
future.completeExceptionally(e)
417443
}
@@ -420,11 +446,11 @@ private class ObjectPoolActor<T : PooledObject>(
420446
}
421447
}
422448

423-
private fun T.borrowTo(future: CompletableFuture<T>, validate: Boolean = true) {
449+
private fun T.borrowTo(future: CompletableFuture<T>, validate: Boolean = true, markForEviction: Boolean = false,) {
424450
if (validate) {
425451
validate(this)
426452
}
427-
inUseItems[this] = ItemInUseHolder(this.id, isInTest = false)
453+
inUseItems[this] = ItemInUseHolder(this.id, isInTest = false, markForEviction = markForEviction)
428454
logger.trace { "borrowed: ${this.id} ; $poolStatusString" }
429455
future.complete(this)
430456
}
@@ -450,6 +476,11 @@ private class ObjectPoolActor<T : PooledObject>(
450476
}
451477
validate(message.returnedItem)
452478
message.future.complete(Unit)
479+
if (removed.markForEviction) {
480+
logger.trace { "GiveBack got item ${message.returnedItem.id} marked for eviction, so destroying it" }
481+
message.returnedItem.destroy()
482+
return
483+
}
453484
if (waitingQueue.isEmpty()) {
454485
if (availableItems.any { holder -> message.returnedItem === holder.item }) {
455486
logger.warn { "trying to give back an item to the pool twice ${message.returnedItem.id}, will ignore that" }
@@ -533,10 +564,11 @@ private class ObjectPoolActor<T : PooledObject>(
533564
val created = objectFactory.create()
534565
val itemCreateId = createIndex
535566
createIndex++
536-
inCreateItems[itemCreateId] = ObjectHolder(created)
567+
val objectHolder = ObjectHolder(created)
568+
inCreateItems[itemCreateId] = objectHolder
537569
logger.trace { "createObject createRequest=$itemCreateId" }
538570
created.onComplete { tried ->
539-
offerOrLog(Created(itemCreateId, tried, future)) {
571+
offerOrLog(Created(itemCreateId, tried, future, objectHolder)) {
540572
"failed to offer on created item $itemCreateId"
541573
}
542574
}
@@ -558,9 +590,11 @@ private open class PoolObjectHolder<T : PooledObject>(
558590
val timeElapsed: Long get() = System.currentTimeMillis() - time
559591
}
560592

561-
private class ObjectHolder<T : Any>(val item: T) {
593+
private class ObjectHolder<T : Any>(
594+
val item: T,
595+
var markForEviction: Boolean = false,
596+
) {
562597
val time = System.currentTimeMillis()
563-
564598
val timeElapsed: Long get() = System.currentTimeMillis() - time
565599
}
566600

@@ -569,7 +603,8 @@ private data class ItemInUseHolder<T : PooledObject>(
569603
val isInTest: Boolean,
570604
val testFuture: CompletableFuture<T>? = null,
571605
val time: Long = System.currentTimeMillis(),
572-
var cleanedByPool: Boolean = false
606+
var cleanedByPool: Boolean = false,
607+
var markForEviction: Boolean = false,
573608

574609
) {
575610
val timeElapsed: Long get() = System.currentTimeMillis() - time

pool-async/src/main/java/com/github/jasync/sql/db/pool/AsyncObjectPool.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ interface AsyncObjectPool<T> {
4040

4141
fun giveBack(item: T): CompletableFuture<AsyncObjectPool<T>>
4242

43+
/**
44+
* Mark all objects in the pool as invalid. Objects will be evicted when not in use.
45+
*/
46+
fun softEvict(): CompletableFuture<AsyncObjectPool<T>>
47+
4348
/**
4449
*
4550
* Closes this pool and future calls to **take** will cause the Future to raise an

0 commit comments

Comments
 (0)