new thread pooling strategy and test program WIP

This commit is contained in:
Minjae Song
2018-12-14 22:53:25 +09:00
parent 6f49dcff4b
commit 513c5a17eb
8 changed files with 231 additions and 32 deletions

View File

@@ -1,6 +1,10 @@
package net.torvald.terrarum.concurrent
import net.torvald.terrarum.Terrarum
import net.torvald.terrarum.lock
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.locks.ReentrantReadWriteLock
import kotlin.collections.ArrayList
typealias RunnableFun = () -> Unit
@@ -57,7 +61,7 @@ object ThreadParallel {
* Primitive locking
*/
fun allFinished(): Boolean {
pool.forEach { if (it?.state != Thread.State.TERMINATED) return false }
pool.forEach { if (it != null && it.state != Thread.State.TERMINATED) return false }
return true
}
}
@@ -71,24 +75,30 @@ object BlockingThreadPool {
val threadCount = Terrarum.THREADS // modify this to your taste
private val pool: Array<Thread?> = Array(threadCount, { null })
private var tasks: List<RunnableFun> = ArrayList<RunnableFun>()
private var allTasksDone = false
private var dispatchedTasks = 0
@Volatile private var dispatchedTasks = 0
private var threadPrefix = ""
/** @return false on failure (likely the previous jobs not finished), true on success */
fun map(prefix: String, tasks: List<RunnableFun>) = setTasks(tasks, prefix)
fun setTasks(tasks: List<RunnableFun>, prefix: String) {
/** @return false on failure (likely the previous jobs not finished), true on success */
fun setTasks(tasks: List<RunnableFun>, prefix: String): Boolean {
if (!allFinished())
return false
this.tasks = tasks
dispatchedTasks = 0
threadPrefix = prefix
return true
}
fun dequeueTask(): RunnableFun {
private fun dequeueTask(): RunnableFun {
dispatchedTasks += 1
return tasks[dispatchedTasks - 1]
}
fun startAllWaitForDie() {
while (!allTasksDone) {
while (dispatchedTasks <= tasks.lastIndex) {
// marble rolling down the slanted channel-track of threads, if a channel is empty (a task assigned
// to the thread is dead) the marble will roll into the channel, and the marble is a task #MarbleMachineX
for (i in 0 until threadCount) {
@@ -96,7 +106,8 @@ object BlockingThreadPool {
// of marbles and put it into an empty channel whenever we encounter one
// SO WHAT WE DO is first fill any empty channels:
if (pool[i] == null || pool[i]!!.state == Thread.State.TERMINATED) {
if (dispatchedTasks <= tasks.lastIndex && // because cache invalidation damnit
(pool[i] == null || pool[i]!!.state == Thread.State.TERMINATED)) {
pool[i] = Thread(dequeueTask().makeRunnable(), "$threadPrefix-$dispatchedTasks") // thread name index is one-based
pool[i]!!.start()
}
@@ -109,6 +120,11 @@ object BlockingThreadPool {
}
}
fun allFinished(): Boolean {
pool.forEach { if (it != null && it.state != Thread.State.TERMINATED) return false }
return true
}
private fun RunnableFun.makeRunnable() = Runnable { this.invoke() }
}