diff --git a/src/net/torvald/terrarum/concurrent/ThreadParallel.kt b/src/net/torvald/terrarum/concurrent/ThreadParallel.kt index c6bc08c6a..756c42dfa 100644 --- a/src/net/torvald/terrarum/concurrent/ThreadParallel.kt +++ b/src/net/torvald/terrarum/concurrent/ThreadParallel.kt @@ -1,14 +1,19 @@ package net.torvald.terrarum.concurrent import net.torvald.terrarum.Terrarum +import kotlin.collections.ArrayList + +typealias RunnableFun = () -> Unit +/** Int: index of the processing core */ +typealias ThreadableFun = (Int) -> Unit /** * Created by minjaesong on 2016-05-25. */ object ThreadParallel { - val threads = Terrarum.THREADS // modify this to your taste + val threadCount = Terrarum.THREADS // modify this to your taste - private val pool: Array = Array(threads, { null }) + private val pool: Array = Array(threadCount, { null }) /** * Map Runnable object to certain index of the thread pool. @@ -23,7 +28,7 @@ object ThreadParallel { /** * @param runFunc A function that takes an int input (the index), and returns nothing */ - fun map(index: Int, prefix: String, runFunc: (Int) -> Unit) { + fun map(index: Int, prefix: String, runFunc: ThreadableFun) { val runnable = object : Runnable { override fun run() { runFunc(index) @@ -57,10 +62,60 @@ object ThreadParallel { } } +/** + * A thread pool that will hold the execution until all the tasks are completed. + * + * Tasks are not guaranteed to be done orderly; but the first task in the list will be executed first. + */ +object BlockingThreadPool { + val threadCount = Terrarum.THREADS // modify this to your taste + private val pool: Array = Array(threadCount, { null }) + private var tasks: List = ArrayList() + private var allTasksDone = false + private var dispatchedTasks = 0 + private var threadPrefix = "" + + fun map(prefix: String, tasks: List) = setTasks(tasks, prefix) + fun setTasks(tasks: List, prefix: String) { + this.tasks = tasks + dispatchedTasks = 0 + threadPrefix = prefix + } + + fun dequeueTask(): RunnableFun { + dispatchedTasks += 1 + return tasks[dispatchedTasks - 1] + } + + fun startAllWaitForDie() { + while (!allTasksDone) { + // marble rolling down the slanted channel-track of threads, if a channel is empty (a task assigned + // to the thread is dead) the marble will roll into the channel, and the marble is a task #MarbleMachineX + for (i in 0 until threadCount) { + // but unlike the marble machine, marble don't actually roll down, we can just pick up any number + // of marbles and put it into an empty channel whenever we encounter one + + // SO WHAT WE DO is first fill any empty channels: + if (pool[i] == null || pool[i]!!.state == Thread.State.TERMINATED) { + pool[i] = Thread(dequeueTask().makeRunnable(), "$threadPrefix-$dispatchedTasks") // thread name index is one-based + pool[i]!!.start() + } + + // then, sleep this very thread, wake if any of the thread in the pool is terminated, + // and GOTO loop_start; if we don't sleep, this function will be busy-waiting + + + } + } + } + + private fun RunnableFun.makeRunnable() = Runnable { this.invoke() } +} + object ParallelUtils { fun Iterable.parallelMap(transform: (T) -> R): List { - val tasks = this.sliceEvenly(ThreadParallel.threads) - val destination = Array(ThreadParallel.threads) { ArrayList() } + val tasks = this.sliceEvenly(ThreadParallel.threadCount) + val destination = Array(ThreadParallel.threadCount) { ArrayList() } tasks.forEachIndexed { index, list -> ThreadParallel.map(index, "ParallelUtils.parallelMap@${this.javaClass.canonicalName}") { for (item in list)