Medusa 0.1 – a supervised thread-pool for Clojure futures

Clojure comes with two kinds of thread-pools – a bounded thread-pool for CPU-bound operations, and one for IO-bound operations that grows as needed. The bounded thread-pool is used every time an action is sent to an agent via the send function. The unbounded thread-pool is used (for instance) every time an action is sent to an agent using the send-off function. Futures also run on this unbounded thread-pool.

Sometimes, however, you might need a third option. This is the case where you don’t want an unbounded pool of threads that grows so much that the system runs out of resources trying to juggle the sheer number of threads. This might happen (say) if you were using send-off to handle incoming requests for IO-bound operations. Under normal circumstances, such a system might perform in an acceptable manner. If the request load were to spike, however, you could quickly create a larger-than-manageable number of threads.

What you need in such a case is a separate thread-pool for IO operations – one that has more threads than the one in the thread-pool for CPU-bound operations, but still bound so that it only grows to a certain size, and then any further requests get queued. Luckily, Clojure allows you to seamlessly use underlying Java libraries.

Medusa is a bounded, supervised thread-pool. A supervisor function runs alongside the thread-pool and it monitors the running tasks. If they take more than a specified amount of time, they are evicted. If the thread-pool is fully occupied, Medusa will queue all further tasks submitted and will run each task as soon as a thread becomes available. The Medusa thread-pool size is thrice the number of cores available to the JVM. In future versions, this number will be configurable.

Here it is in action –

(use 'org.rathore.amit.medusa.core)

(start-supervisor)

(defn new-task [id sleep-seconds]
  (println (System/currentTimeMillis) "| Starting task" id "will sleep for" sleep-seconds)
  (Thread/sleep (* 1000 sleep-seconds))
  (println (System/currentTimeMillis) "| Done task" id))

(defn run-tasks [n]
  (println "Will submit" n "jobs")
  (dotimes [i n]
    (medusa-future i #(new-task i (* 5 (inc i))))))

(run-tasks 20)

The output is –

Will submit 20 jobs
1276068494442 | Starting task 0 will sleep for 5
1276068494448 | Starting task 1 will sleep for 10
1276068494449 | Starting task 2 will sleep for 15
1276068494449 | Starting task 3 will sleep for 20
1276068494451 | Starting task 4 will sleep for 25
1276068494451 | Starting task 5 will sleep for 30
1276068499447 | Done task 0
1276068499448 | Starting task 6 will sleep for 35
1276068504448 | Done task 1
1276068504448 | Starting task 7 will sleep for 40
1276068509448 | Done task 2
1276068509448 | Starting task 8 will sleep for 45
1276068514448 | Done task 3
1276068514449 | Starting task 9 will sleep for 50
1276068519450 | Starting task 10 will sleep for 55
1276068523547 | Starting task 11 will sleep for 60
1276068523548 | Starting task 13 will sleep for 70
1276068523547 | Starting task 12 will sleep for 65
1276068523548 | Starting task 14 will sleep for 75
1276068523548 | Starting task 15 will sleep for 80
1276068523549 | Starting task 16 will sleep for 85
1276068533547 | Starting task 17 will sleep for 90
1276068533547 | Starting task 18 will sleep for 95
1276068543547 | Starting task 19 will sleep for 100

Notice that the first few tasks complete, since the pre-emption time is 20 seconds. The rest of the tasks get pre-empted out of the thread-pool by the supervisor since they take too long (simulated above by the sleeps). Since all the later tasks have been coded to take more than 20 seconds, they will all get pre-empted. The Medusa thread-pool is then ready for more tasks. This pre-emption is what allows the other tasks to start, as can be seen by looking at the timestamps of the log messages. This fulfills the requirement that we have a bounded-threadpool with supervised pre-emption of tasks that take too long.

Here’s the thread-usage when the program starts, and the supervisor has started:

Thread-pool when the program starts

Here’s the thread-usage when the tasks complete:

Thread-pool when the tasks complete

The semantics are still not of the standard Clojure futures – currently, Medusa “futures” only handle side-effects. A next step would be to give them the same future semantics so that they return the result of their computation – that will come in the next version.

The project is hosted on github, as usual – http://github.com/amitrathore/medusa. Click here to see the basic implementation.

conjure – simple mocking and stubbing for Clojure unit-tests

Siva and I were pairing on a unit-test that involved writing something to HBase. When Siva said that mocking the call to the save-to-hbase function would make testing easier (a simple thing using JMock, he said), I decided to write a quick mocking utility for Clojure.

Then later, we realized that we wanted to go one step further. The row-id that was used as the key to the object in HBase was generated using system-time. That meant that even if we wanted to confirm that the object was indeed saved, we had no way of knowing what the row-id was. One solution to such a problem is to inject the row-id in (instead of being tightly coupled to the function that generated the row-id). Instead, I wrote a stubbing utility that makes this arbitrarily easy to do.

So here they are – mocking and stubbing – packaged up as the conjure project on github.

The set up

Imagine we had the following functions –

(defn xx [a b]
  10)

(defn yy [z]
  20)

(defn fn-under-test []
  (xx 1 2)
  (yy  "blah"))

(defn another-fn-under-test []
  (+ (xx nil nil) (yy nil)))

Also imagine that we had to test fn-under-test and another-fn-under-test, and we didn’t want to have to deal with the xx or yy functions. Maybe they’re horrible functions that open connections to computers running Windoze or something, I dunno.

Mocking

Here’s how we might mock them out –

(deftest test-basic-mocking
  (mocking [xx yy]
    (fn-under-test))
  (verify-call-times-for xx 1)
  (verify-call-times-for yy 1)
  (verify-first-call-args-for xx 1 2)
  (verify-first-call-args-for yy "blah"))

Pretty straightforward, eh? You just use the mocking macro, specifying all the functions that need to be mocked out. Then, within the scope of mocking, you call your functions that need to be tested. The calls to the specified functions will get mocked out (they won’t occur), and you can then use things like verify-call-times-for and verify-first-call-args-for to ensure things worked as expected.

Stubbing

As mentioned in the intro to this post, sometimes your tests need to specify values to be returned by the functions being mocked out. That’s where stubbing comes in. Here’s how it works –

(deftest test-basic-stubbing
  (is (= (another-fn-under-test) 30))
  (stubbing [xx 1 yy 2]
    (is (= (another-fn-under-test) 3))))

So that’s it! Pretty simple. Note how within the scope of stubbing, xx returns 1 and yy returns 2. Now, for the implementation.

Implementation

The code is almost embarrassingly straight-forward. Take a look –

(ns org.rathore.amit.conjure.core
  (:use clojure.test))

(def call-times (atom {}))

(defn stub-fn [function-name return-value]
  (swap! call-times assoc function-name [])
  (fn [& args]
    (swap! call-times update-in [function-name] conj args)
    return-value))

(defn mock-fn [function-name]
  (stub-fn function-name nil))

(defn verify-call-times-for [fn-name number]
  (is (= number (count (@call-times fn-name)))))

(defn verify-first-call-args-for [fn-name & args]
  (is (= args (first (@call-times fn-name)))))

(defn verify-nth-call-args-for [n fn-name & args]
  (is (= args (nth (@call-times fn-name) (dec n)))))

(defn clear-calls []
  (reset! call-times {}))

(defmacro mocking [fn-names & body]
  (let [mocks (map #(list 'mock-fn %) fn-names)]
    `(binding [~@(interleave fn-names mocks)]
       ~@body)))

(defmacro stubbing [stub-forms & body]
  (let [stub-pairs (partition 2 stub-forms)
        fn-names (map first stub-pairs)
        stubs (map #(list 'stub-fn (first %) (last %)) stub-pairs)]
    `(binding [~@(interleave fn-names stubs)]
       ~@body)))

It’s just an hour or so of work, so it’s probably rough, and certainly doesn’t support more complex features of other mocking/stubbing libraries. But I thought the simplicity was enjoyable.

frumiOS – a simple object-system for Clojure

I’ve nearly stopped blogging, because all my spare time goes into writing Clojure in Action. But I was a bit bored this weekend, and wrote this little library that can be used to write traditional looking Object-Oriented (TM) code in Clojure.

Why would you do that, when you can use a rifle-oriented programming style instead? Think of it like using the rifle as a club… On the other had, the implementation makes plenty use of closures and macros, so it is probably a rifle-oriented program 🙂

The implementation is hosted on github, in a project called frumios. And if you reall want to see it now, click below.

(ns org.rathore.amit.frumios.core)
 
(declare new-object find-method) 
 
(defn new-class [class-name parent methods]
  (let [klass ((comp resolve symbol name) class-name)]
    (fn [command & args]
      (cond
	(= :parent command) parent
	(= :name command) klass
	(= :method-names command) (keys methods)
	(= :methods command) methods
	(= :new command) (new-object klass)
	(= :method command) 
          (let [[method-name] args]
	    (find-method method-name methods parent))
	:else (throw (RuntimeException. (str "Unknown message: " command)))))))
 
(def OBJECT (new-class :org.rathore.amit.frumios.core/OBJECT nil {}))
(def this)
 
(defn new-object [klass]
  (let [state (ref {})]
    (fn thiz [command & args]
      (cond
        (= :class command) klass
        (= :set! command) (let [[k v] args]
			    (dosync (alter state assoc k v))
			    nil)
        (= :get command) (let [[key] args]
			   (state key))
        :else (let [method (klass :method command)]
		(if method 
		  (binding [this thiz]
		    (apply method args))))))))
 
(defn find-method [method-name instance-methods parent-class]
  (let [method (instance-methods method-name)]
    (or method
	(if-not (= #'org.rathore.amit.frumios.core/OBJECT parent-class)
	  (find-method method-name (parent-class :methods) (parent-class :parent))))))
 
(defn parent-class-spec [sexprs]
  (let [extends-spec (filter #(= :extends (first %)) sexprs)
        extends (first extends-spec)]
    (if (empty? extends)
      'org.rathore.amit.frumios.core/OBJECT
      (do 
	(if-not (= 1 (count extends-spec))
	  (throw (RuntimeException. "defclass only accepts a single extends clause")))
	(if-not (= 2 (count extends))
	  (throw (RuntimeException. "the extends clause only accepts a single parent class")))
	(last extends)))))
 
(defn method-spec [sexpr]
  (let [name (keyword (second sexpr))
	remaining (next sexpr)]
    {name (conj remaining 'fn)}))
 
(defn method-specs [sexprs]
  (let [method-spec? #(= 'method (first %))
	specs (filter method-spec? sexprs)]
    (apply merge (map method-spec sexprs))))
 
(defmacro defclass [class-name & specs]
  (let [parent-class-symbol (parent-class-spec specs)
        this-class-name (keyword class-name)
	fns (method-specs specs)]
    `(def ~class-name 
        (new-class ~this-class-name (var ~parent-class-symbol) ~(or fns {})))))

But first, examples –

(ns frumios-spec)

(use 'org.rathore.amit.frumios.core)

(defclass animal
  (method sound []
    "grr")

  (method say-something []
    (str (this :sound) ", I say!"))

  (method move []
    "going!"))

(defclass cat
  (:extends animal)

  (method sound []
    "meow"))

There, that defines a simple class hierarchy. Let’s examine these classes –

frumios-spec> (cat :parent)
#'frumios-spec/animal

frumios-spec> (animal :parent)
#'org.rathore.amit.frumios.core/OBJECT

frumios-spec> (animal :method-names)
(:move :say-something :sound)

frumios-spec> (cat :method-names)
(:sound)

Now, let’s define a couple of instances –

(def a (animal :new))
(def c (cat :new))

What can we do with these instances? Let’s explore –

frumios-spec> (c :class)
#'frumios-spec/cat

frumios-spec> (c :set! :name "Mr. Muggles")
nil

frumios-spec> (c :get :name)
"Mr. Muggles"

That’s the basic stuff, how about calling methods?

frumios-spec> (a :move)
"going!"

frumios-spec> (a :sound)
"grr"

frumios-spec> (c :sound)
"meow"

Notice how cat overrides the sound method. OK, how about a method that calls another method? It calls for the this keyword. Here it is in action –

frumios-spec> (a :say-something)
"grr, I say!"

frumios-spec> (c :say-something)
"meow, I say!"

Notice how in the second call, (this :sound) resolved itself to the overridden sound method in the cat class. That’s subtype polymorphism, common to languages such as Java and Ruby. We could use it to implement something like the template pattern. We can do fairly arbitrary things with frumiOS –

(defclass person
  (method greet [visitor]
    (println "Hi" visitor ", I'm here!"))

  (method dob []
    (str "I was born on " (this :get :birth-date)))

  (method age []
    2)

  (method experience [years]
    (str years " years"))

  (method bio []
    (let [msg (str (this :dob) ", and have " (this :experience (this :age)) " of experience.")]
      (println msg))))

Let’s play with it –

frumios-spec> (def kyle (person :new))
#'frumios-spec/kyle

frumios-spec> (kyle :greet "rob")
Hi rob , I'm here!
nil

The bio method makes two calls using the this construct, one nested inside the other. It works as expected –

frumios-spec> (kyle :set! :birth-date "1977-01-01")
nil

frumios-spec> (kyle :bio)
I was born on 1977-01-01, and have 2 years of experience.
nil

So there it is. I’m sure it doesn’t do lots of stuff a real object-system does. But at 70 lines of Clojure code, you can’t expect a whole lot more. Silly as this is, I had fun writing it! Click here to see how the frumiOS is implemented.

Clojure, the REPL and test-driven development

I’ve been using Clojure for nearly a year now, and something strange has been happening… I still think unit-tesitng is extremely important, but for some reason I don’t seem to be writing the same number of tests any more. I’m ashamed to say it, but there it is. And it gets stranger – this new lower test count doesn’t seem to matter.

It seems to me that my Clojure code works right the first time more often than my Ruby or Java code ever did. And I seem to find less defects in the Clojure code over time, too.

This is not just a fanboy speaking, though I am a huge fan of Clojure. I think that the reasons I’m observing this is due to a an important characteristic of the language. Instead of just talking about it, let me first walk you through an example.

This is something I had to do recently – we wanted to build a kind of reverse index for an HBase table. The row ids of this table are time-stamps. The idea was that this “reverse index” would allow us to answer the question of what the first time-stamp for a given day was. In other words, we needed to convert a list of time-stamps into a lookup of day vs. the first time-stamp of that day. Eg.

Input:


[“112323123” “1231231231” “123123123” “ 1231231123” ....]
 

Output:


{“2009-07-01” “123123123”
 “2009-07-02” “123131213”
 “2009-07-03” “123123122”}
 

(Note: I plucked the numbers out of the air, they aren’t accurate. But the idea is that the input is a long stream of timestamps, and possibly hundreds could correspond to each day.)

So I get started… thinking to myself – I know how to convert a timestamp to a day. From there, it’s easy to write a function that returns a hash containing the day vs. timestamp (Since I already had a function day-for-timestamp, it was easy) –


(defn day-vs-timestamp [time-stamp]
  {(day-for-time-stamp time-stamp) time-stamp})

So now, all I have to do is map the above function across the input. This gives me a list of hash-maps, each with one key-value pair. To ensure that I’m doing this in order of oldest first, I sort the input as well. Inside of a let form, all of this looks like –


(let [all-pairs (map day-vs-timestamp (sort input-list))]

Now, I have this list of hashes, each with one key (the day) and one corresponding value (the time-stamp itself). I want to combine these into one single hash-map which would be the final answer. But I have to deal with the issue of duplicate keys – when I find a duplicate key, I want to keep the first value associated with the key since it would be the oldest.

Clojure has a merge-with function which does just this – it accepts a function with 2 arguments (which are the two values in case a duplicate key is found) and the returning value is used in the merged hash-map.


(apply merge-with #(first [%1 %2]) all-pairs)

That’s basically it.

Combining everything –


(defn day-vs-timestamp [time-stamp]
  {(day-for-time-stamp time-stamp) time-stamp})

(defn lookup-table [input-timestamps]
  (let [all-pairs (map day-vs-timestamp (sort input-list))]
    (apply merge-with #(first [%1 %2]) all-pairs)))

When I write code like this – I often ask myself, what exactly should I test? I end up writing a few happy path tests that prove my code works. And then a couple of tests that test border cases and negative paths. And I sometimes do it test first.

But the REPL has spoilt me. What I used TDD for when coding with Ruby (and still do), I often do at the REPL. I build tiny functions that work – these are often single lines of code. Then I combine these into other functions, often no more than two lines of code each, sometimes three. And it all just works – leaving me wondering what to cover with tests.

The main reason I still write tests is for regression – if something breaks in the future, I catch it quickly. However, the other thing – the test *driven* design aspect of TDD – has been somewhat replaced by the REPL. And its very much more dynamic than a set of static tests. It really brings out the rapid, in rapid application development – especially when combined with Emacs and SLIME.

One main difference with Clojure vs. Ruby (say) is that Clojure is functional (I use very little of Clojure’s constructs for state). And in the functional world, I just don’t have to worry about state (obviously), and this tremendously simplifies code. I think in terms of map, filter, reduce, some, every, merge, etc. and the actual logic is in tiny functions used from within these other higher level constructs. The idea of first-class functions is also key – I can build up the business logic by writing small functions that do a tiny thing each – and combine them using higher-order functions.

This is one reason why we’re so productive with Clojure. We’ve moved to Clojure for 90% of our work. That said, we still use Ruby for parts of our code-base, and it’s still my favorite imperative language 🙂

Startup logbook – v0.2 – distributed Clojure system in production

This past weekend, we pushed another major release into production. We’ve been working on several things and have made a few pushes since the last time I wrote about this – but this release has a bunch of interesting Clojure related stuff.

Long-running processes

The main thing of note is that the majority of our back-end is now written in Clojure. You might recall that our online-merchant customers send us a lot of data, and we run a ton of analytics on that data. Our initial plans involved Ruby, but as we started using Clojure, it turned out that it is very well suited for this job as well (long running, message-driven processes that crunch numbers).

The raw data sits in HBase, and every night a “master” process starts up which kicks-off the processing of the previous day’s worth of data. The job of this master is only to coordinate the work (it doesn’t actually do any real work), it does this by breaking work into chunks and dispatching messages that each assign work to any worker process that picks it up. The master is single threaded for simplicity, but failure tolerant – it checkpoints everything in a local MySQL database, and if it crashes, it is automatically re-spawned and it recovers from where it left off.

clojure-in-production-v0.2.png

An elastic cloud of worker processes run in anticipation of the master handing out this work. The worker processes use the MySQL database to keep track of their progress as well. The rest is rather domain-specific. We use intermediate representations of the raw data, which is also stored in HBase, before finally storing the summarized version again in HBase.

Swarmiji

We use an in-house distributed-programming framework called Swarmiji to make such distributed programs very easy to write and run. Swarmiji implements a flavor of staged event-driven architecture (SEDA) to allow server processes that exhibit scalable, predictable throughput. This is especially true in the face of over-load, which we can certainly expect in our environment.

The reason I wrote this framework was that I wanted to create distributed, parallel programs which exploited large numbers of machines (like in a data-center) – without being limited by clojure’s in-JVM-threads-based model. So each worker process in Swarmiji gets deployed as a shared-nothing JVM process.

I will write up a post introducing Swarmiji in the next few weeks – once its a bit more battle-tested, and I’ve added a few more features (mainly around process management).

Capjure: a simple HBase persistence layer

Or how to get some free help putting stuff into HBase. For Clojure programmers.

OK, so maybe this one can even be called simplistic. Still, it works for my needs right now – so I thought it might help others. I wrote about my use-case in a previous post about HBase schema design – and this is the little helper utility I mentioned towards the end of it.

How to use it

Download here. There are two vars that need appropriate bindings for capjure to work – *hbase-master* and *primary-keys-config*

*hbase-master*

This one is obvious. It must be bound to an hbase name-node or a master server. For example, I might bind it to hbase.test-site.net:60000. Watch out for the fact that Amazon’s EC2 instances need lots of configuration to open such ports to the world. This has no real relevance to this post, just thought I’d mention it.

*primary-keys-config*

This one is a bit more involved – and I’m sure I’ve done a bad job of simplifying this usage pattern. Still, lets consider the example from the previous post. When you have an array of several hashes as a value in your JSON object that is being persisted (eg. for the :cars key) –

  :cars => [
    {:make => 'honda', :model => 'fit', :license => 'ah12001'},
    {:make => 'toyota', :model => 'yaris', :license => 'xb34544'}],

it will be converted into

{
  "cars_model:ah12001" => "fit",
  "cars_make:ah12001" => "honda",
  "cars_model:xb34544" => "yaris",
  "cars_make:xb34544" => "toyota"
}

To make this happen, capjure needs to know what to use as a primary-key. Or something like that 🙂 Here, we have decided upon the :license attribute of each hash. Capjure then removes that property from the child-hashes being saved, and sticks the value into the key part of the flattened data-structure as shown above.

This is accomplished by –

(def encoders (config-keys
  (config-for :cars :license  (fn [car-map]
					       (car-map :license))))

Similarly, other primary-keys can be configured. And because the actual value used is the value returned by the function defined (as above), it can be as complex as needed. For example of the values have spaces, you can encode it using some scheme.

A similar configuration is needed for this process to be reversed during reading out of HBase. The

(def decoders (config-keys
  (config-for :cars :license  (fn [value]
					       		value)))

In this case, we just use an identity function because the reverse mapping is straight-forward (in other words, we didn’t do anything fancy during the previous flattening operation). What happens is that a key-value pair (key being the one specified (:license)) and the value as whatever is returned by the function is added to the flattened object being re-hydrated.

Similarly, other configuration parameters can be added for other sub-objects that have primary-keys.

Together, the encoders and decoders form the *primary-keys-config*. Thus, if you do the following –

 
(def keys-config {:encode encoders :decode decoders})

then keys-config should be used as the value that *primary-keys-config* gets bound to.

Methods of interest

Once this is done, objects can be pushed into and out of HBase quite trivially –

 
(binding [*hbase-master* "hbase.test-site.net:60000" *primary-keys-config* keys-config]
	(capjure-insert some-json-object "hbase_table_name" "some-row-id"))

and –

 
(binding [*hbase-master* "hbase.test-site.net:60000" *primary-keys-config* keys-config]
	(read-as-hydrated "hbase_table_name" "some-row-id"))

Other convenience methods

Capjure provides other convenience methods like –

 
row-exists? [hbase-table-name row-id-string]
cell-value-as-string [row column-name]
read-all-versions-as-strings [hbase-table-name row-id-string number-of-versions column-family-as-string]
read-cell [hbase-table-name row-id column-name]
rowcount [hbase-table-name & columns]
delete-all [hbase-table-name & row-ids-as-strings]

and others. Everything is based off (uses it underneath) the HBase client API. Thanks to Dan Larkin for clojure-json.

Limitations

I’m no expert in persistence systems – and I’m sure this one has plenty of issues. The main limitation is that the object that capjure can persist can only be so deep. Specifically, the object should be a hash that contain symbols (or strings) as keys, and the values can either be strings (or other primitives), arrays of such primitives, a hash with one level of key-values, or an array of hashes that are one level deep.

Feedback welcome

Please contact me if you have suggestions and stuff. Again, the code is on github.