Clojure comes with two kinds of thread-pools – a bounded thread-pool for CPU-bound operations, and one for IO-bound operations that grows as needed. The bounded thread-pool is used every time an action is sent to an agent via the send function. The unbounded thread-pool is used (for instance) every time an action is sent to an agent using the send-off function. Futures also run on this unbounded thread-pool.
Sometimes, however, you might need a third option. This is the case where you don’t want an unbounded pool of threads that grows so much that the system runs out of resources trying to juggle the sheer number of threads. This might happen (say) if you were using send-off to handle incoming requests for IO-bound operations. Under normal circumstances, such a system might perform in an acceptable manner. If the request load were to spike, however, you could quickly create a larger-than-manageable number of threads.
What you need in such a case is a separate thread-pool for IO operations – one that has more threads than the one in the thread-pool for CPU-bound operations, but still bound so that it only grows to a certain size, and then any further requests get queued. Luckily, Clojure allows you to seamlessly use underlying Java libraries.
Medusa is a bounded, supervised thread-pool. A supervisor function runs alongside the thread-pool and it monitors the running tasks. If they take more than a specified amount of time, they are evicted. If the thread-pool is fully occupied, Medusa will queue all further tasks submitted and will run each task as soon as a thread becomes available. The Medusa thread-pool size is thrice the number of cores available to the JVM. In future versions, this number will be configurable.
Here it is in action -
(use 'org.rathore.amit.medusa.core) (start-supervisor) (defn new-task [id sleep-seconds] (println (System/currentTimeMillis) "| Starting task" id "will sleep for" sleep-seconds) (Thread/sleep (* 1000 sleep-seconds)) (println (System/currentTimeMillis) "| Done task" id)) (defn run-tasks [n] (println "Will submit" n "jobs") (dotimes [i n] (medusa-future i #(new-task i (* 5 (inc i)))))) (run-tasks 20)
The output is -
Will submit 20 jobs 1276068494442 | Starting task 0 will sleep for 5 1276068494448 | Starting task 1 will sleep for 10 1276068494449 | Starting task 2 will sleep for 15 1276068494449 | Starting task 3 will sleep for 20 1276068494451 | Starting task 4 will sleep for 25 1276068494451 | Starting task 5 will sleep for 30 1276068499447 | Done task 0 1276068499448 | Starting task 6 will sleep for 35 1276068504448 | Done task 1 1276068504448 | Starting task 7 will sleep for 40 1276068509448 | Done task 2 1276068509448 | Starting task 8 will sleep for 45 1276068514448 | Done task 3 1276068514449 | Starting task 9 will sleep for 50 1276068519450 | Starting task 10 will sleep for 55 1276068523547 | Starting task 11 will sleep for 60 1276068523548 | Starting task 13 will sleep for 70 1276068523547 | Starting task 12 will sleep for 65 1276068523548 | Starting task 14 will sleep for 75 1276068523548 | Starting task 15 will sleep for 80 1276068523549 | Starting task 16 will sleep for 85 1276068533547 | Starting task 17 will sleep for 90 1276068533547 | Starting task 18 will sleep for 95 1276068543547 | Starting task 19 will sleep for 100
Notice that the first few tasks complete, since the pre-emption time is 20 seconds. The rest of the tasks get pre-empted out of the thread-pool by the supervisor since they take too long (simulated above by the sleeps). Since all the later tasks have been coded to take more than 20 seconds, they will all get pre-empted. The Medusa thread-pool is then ready for more tasks. This pre-emption is what allows the other tasks to start, as can be seen by looking at the timestamps of the log messages. This fulfills the requirement that we have a bounded-threadpool with supervised pre-emption of tasks that take too long.
Here’s the thread-usage when the program starts, and the supervisor has started:
Here’s the thread-usage when the tasks complete:
The semantics are still not of the standard Clojure futures – currently, Medusa “futures” only handle side-effects. A next step would be to give them the same future semantics so that they return the result of their computation – that will come in the next version.