Medusa 0.1 – a supervised thread-pool for Clojure futures

Clojure comes with two kinds of thread-pools – a bounded thread-pool for CPU-bound operations, and one for IO-bound operations that grows as needed. The bounded thread-pool is used every time an action is sent to an agent via the send function. The unbounded thread-pool is used (for instance) every time an action is sent to an agent using the send-off function. Futures also run on this unbounded thread-pool.

Sometimes, however, you might need a third option. This is the case where you don’t want an unbounded pool of threads that grows so much that the system runs out of resources trying to juggle the sheer number of threads. This might happen (say) if you were using send-off to handle incoming requests for IO-bound operations. Under normal circumstances, such a system might perform in an acceptable manner. If the request load were to spike, however, you could quickly create a larger-than-manageable number of threads.

What you need in such a case is a separate thread-pool for IO operations – one that has more threads than the one in the thread-pool for CPU-bound operations, but still bound so that it only grows to a certain size, and then any further requests get queued. Luckily, Clojure allows you to seamlessly use underlying Java libraries.

Medusa is a bounded, supervised thread-pool. A supervisor function runs alongside the thread-pool and it monitors the running tasks. If they take more than a specified amount of time, they are evicted. If the thread-pool is fully occupied, Medusa will queue all further tasks submitted and will run each task as soon as a thread becomes available. The Medusa thread-pool size is thrice the number of cores available to the JVM. In future versions, this number will be configurable.

Here it is in action -

(use 'org.rathore.amit.medusa.core)

(start-supervisor)

(defn new-task [id sleep-seconds]
  (println (System/currentTimeMillis) "| Starting task" id "will sleep for" sleep-seconds)
  (Thread/sleep (* 1000 sleep-seconds))
  (println (System/currentTimeMillis) "| Done task" id))

(defn run-tasks [n]
  (println "Will submit" n "jobs")
  (dotimes [i n]
    (medusa-future i #(new-task i (* 5 (inc i))))))

(run-tasks 20)

The output is -

Will submit 20 jobs
1276068494442 | Starting task 0 will sleep for 5
1276068494448 | Starting task 1 will sleep for 10
1276068494449 | Starting task 2 will sleep for 15
1276068494449 | Starting task 3 will sleep for 20
1276068494451 | Starting task 4 will sleep for 25
1276068494451 | Starting task 5 will sleep for 30
1276068499447 | Done task 0
1276068499448 | Starting task 6 will sleep for 35
1276068504448 | Done task 1
1276068504448 | Starting task 7 will sleep for 40
1276068509448 | Done task 2
1276068509448 | Starting task 8 will sleep for 45
1276068514448 | Done task 3
1276068514449 | Starting task 9 will sleep for 50
1276068519450 | Starting task 10 will sleep for 55
1276068523547 | Starting task 11 will sleep for 60
1276068523548 | Starting task 13 will sleep for 70
1276068523547 | Starting task 12 will sleep for 65
1276068523548 | Starting task 14 will sleep for 75
1276068523548 | Starting task 15 will sleep for 80
1276068523549 | Starting task 16 will sleep for 85
1276068533547 | Starting task 17 will sleep for 90
1276068533547 | Starting task 18 will sleep for 95
1276068543547 | Starting task 19 will sleep for 100

Notice that the first few tasks complete, since the pre-emption time is 20 seconds. The rest of the tasks get pre-empted out of the thread-pool by the supervisor since they take too long (simulated above by the sleeps). Since all the later tasks have been coded to take more than 20 seconds, they will all get pre-empted. The Medusa thread-pool is then ready for more tasks. This pre-emption is what allows the other tasks to start, as can be seen by looking at the timestamps of the log messages. This fulfills the requirement that we have a bounded-threadpool with supervised pre-emption of tasks that take too long.

Here’s the thread-usage when the program starts, and the supervisor has started:

Thread-pool when the program starts

Here’s the thread-usage when the tasks complete:

Thread-pool when the tasks complete

The semantics are still not of the standard Clojure futures – currently, Medusa “futures” only handle side-effects. A next step would be to give them the same future semantics so that they return the result of their computation – that will come in the next version.

The project is hosted on github, as usual – http://github.com/amitrathore/medusa. Click here to see the basic implementation.

About these ads

3 thoughts on “Medusa 0.1 – a supervised thread-pool for Clojure futures

  1. Pingback: Medusa 0.1 – a supervised thread-pool for Clojure futures » Clojure, Medusa, Read, Comments » Adjoozey

  2. Pingback: Clojure – Destillat #11 | Open Source und Wetware

  3. Can you help me to convert this into a fixed pool size with send off ?

    this is my definition \

    (defn call-initiateWirelessPostDashMultithread[^com.att.myatt.login.multithread.WirelessPostdashService wirelessPostdashService ^javax.servlet.http.HttpSession session ^java.util.HashMap invokedCtnList]
    (let [a (ref (agent nil))]
    (send-off (deref a) (fn [_] (.initiateWirelessPostDashMultithread wirelessPostdashService session invokedCtnList)))))

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s