axiom//zk-plan.core A batch processing framework

Author: Temporarily Removed  (temporarily@removed.com)
Date: 25 April 2018
Repository: https://github.com/temporarily/removed
Version: 0.4.1

1    Introduction

zk-plan is a tool for orchestrating execution of parallel jobs. It is based on the notion of a plan, which is dependecy graph of Clojure functions to be executed. After creating a plan, populating it and marking it ready for execution, a cluster of workers start taking tasks from it. A worker will only take a task for which all dependencies are met. If a task fails it is automatically retried by another worker. Eventually, plan-completed? will indicated if all tasks in a given plan have been successfully completed.

2    module

The content of this library is provided as a dependency injection module. The zk-plan resource containing the external API of this library depends on the zookeeper resource which is a derefable Zookeeper connection object. zookeeper itself is provided by this module, but it depends on zookeeper-config, containing the coordinates of the Zookeeper server.

(let [$ (di/injector {:zookeeper (atom :zk)})] ;; :zk is a mock zookeeper connection object
 (module $)
 (di/startup $)
 (def zk-plan (di/do-with! $ [zk-plan] zk-plan)))

The zk-plan resource is a map containing function comprising the external API of this library, already addressing a given connection.

(def create-plan (:create-plan zk-plan))
 create-plan => fn?
 (def add-task (:add-task zk-plan))
 add-task => fn?
 (def mark-as-ready (:mark-as-ready zk-plan))
 mark-as-ready => fn?
 (def worker (:worker zk-plan))
 worker => fn?
 (def plan-completed? (:plan-completed? zk-plan))
 plan-completed? => fn?

The module also spawns worker threads when the zk-plan-config resource is available as a map containing the number of threads to spawn and the parent node under which plans are created. See our usage example to see how it is being used.

3    create-plan

Parameters:

  • parent: the parent node for the new plan

Returns: the path to the plan It calls zk/createn to create a new zookeeper node

(create-plan ..parent..) => ..node..
 (provided
(zk/create-all :zk ..prefix.. :persistent? true :sequential? true) => ..node..
(str ..parent.. "/plan-") => ..prefix..)

4    add-task

Parameters:

  • plan: the path to the plan
  • fn: the function to be executed
  • arg-tasks: a sequence of task paths, which return values are to become arguments for fn

Returns: path to the new task

It creates a sequential node under the plan

(add-task ..plan.. ..fn.. []) => ..task..
    (provided
     (zk/create :zk ..prefix.. :persistent? true :sequential? true) => ..task..
     (str ..plan.. "/task-") => ..prefix..
     (set-initial-clj-data irrelevant irrelevant irrelevant) => irrelevant
     (mark-as-ready-internal irrelevant irrelevant) => irrelevant)

It sets the task node's data to contain a serialization of fn

(add-task ..plan.. ..fn.. []) => ..task..
    (provided
     (zk/create irrelevant irrelevant :persistent? true :sequential? true) => ..task..
     (set-initial-clj-data :zk ..task.. ..fn..) => irrelevant
     (mark-as-ready-internal irrelevant irrelevant) => irrelevant)

It calls add-dependency for each arg-task

(add-task ..plan.. ..fn.. [..arg1.. ..arg2.. ..arg3..]) => irrelevant
    (provided
     (zk/create irrelevant irrelevant :persistent? true :sequential? true) => ..task..
     (set-initial-clj-data irrelevant irrelevant irrelevant) => irrelevant
     (add-dependency :zk ..arg1.. ..task..) => irrelevant
     (add-dependency :zk ..arg2.. ..task..) => irrelevant
     (add-dependency :zk ..arg3.. ..task..) => irrelevant
     (mark-as-ready-internal irrelevant irrelevant) => irrelevant)

It adds a 'ready' node once definition is complete

(add-task ..plan.. ..fn.. []) => irrelevant
    (provided
     (zk/create irrelevant irrelevant :persistent? true :sequential? true) => ..task..
     (set-initial-clj-data irrelevant irrelevant irrelevant) => irrelevant
     (mark-as-ready-internal :zk ..task..) => irrelevant)

5    mark-as-ready

Parameters:

  • task: the task to be marked as ready

Returns: nothing in particular

It creates a child node named 'ready'

(mark-as-ready "/foo/bar") => irrelevant
    (provided
     (zk/create :zk "/foo/bar/ready" :persistent? true) => true)

6    worker

Parameters:

  • parent: the parent node of all plans
  • attributes: a map with attributes for the behavior of the worker
  • $: a DI injector to be used from within tasks

Returns: nothing in particular

It does the following:

  • calls get-task-from-any-plan to get a task to work on
  • if a task is returned (we have something to do), it calls perform-task to run it
(let [alive (atom true)]
 (worker ..parent.. ..attrs.. alive ..$..) => irrelevant
 (provided
  (get-task-from-any-plan :zk ..parent..) => "/foo/bar"
  (perform-task :zk "/foo/bar" ..$..) => irrelevant
  (zk/exists irrelevant irrelevant) => nil))

If get-task-from-any-plan returns nil, we call calc-sleep-time to calculate for how long we need to sleep before the next retry. We retry until we get a task.

(let [alive (atom true)]
 (worker ..parent.. ..attrs.. alive ..$..) => irrelevant
 (provided
  (get-task-from-any-plan :zk ..parent..) =streams=> [nil nil "/foo/bar"]
  (calc-sleep-time ..attrs.. 0) => 1
  (calc-sleep-time ..attrs.. 1) => 2
  (perform-task irrelevant irrelevant ..$..) => irrelevant
  (zk/exists irrelevant irrelevant) => nil))

If perform-task exists (abnomally) before clearing the task node, we remove the owner node from it to allow another task to complete the job

(let [alive (atom true)]
 (worker ..parent.. ..attrs.. alive ..$..) => (throws Exception)
 (provided
  (get-task-from-any-plan :zk ..parent..) => "/foo/bar"
  (perform-task :zk "/foo/bar" ..$..) =throws=> (Exception.)
  (zk/exists :zk "/foo/bar") => {:some "thing"}
  (zk/delete :zk "/foo/bar/owner") => irrelevant))

When get-task-from-any-plan returns nil and the thread is shut down (alive is false), we return without waiting.

(let [alive (atom false)]
 (worker ..parent.. ..attrs.. alive ..$..) => irrelevant
 (provided
  (get-task-from-any-plan :zk ..parent..) => nil))

7    plan-completed?

Parameters:

  • zk: the Zookeeper connection object
  • plan: the path to the plan

Returns: whether the plan is completed

It returns true if the plan has no tasks in it

(plan-completed? ..plan..) => true
 (provided
(zk/children :zk ..plan..) => '("foo" "bar"))

It returns false if there is at least one task-* child

(plan-completed? ..plan..) => false
 (provided
(zk/children :zk ..plan..) => '("foo" "task-39893" "bar"))

8    Usage Example

The idea of this test is to stress zk_plan by launching N parallel worker threads to execute a randomized plan with M tasks, each depending on K preceding tasks (if such exist).

(def N 10) ; the number of workers
(def M 100) ; the number of tasks
(def K 10) ; the number of dependencies per task

The tasks work against a map of M atoms, one atom per each task. These atoms count the workers working on this task.

(def worker-counters (into {} (map (fn [i] [i (atom 0)]) (range M))))

Each task will also report it completed its work by adding its ordinal number to this set.

(def workers-completed (atom #{}))

Each task begins by incrementing the atom, then sleeps for a while, then decrements the atom. After incrementing, it checks that the value is 1, that is, no other worker is working on the same task. The function compares its arguments against the expected vector, and returns its number. The function below creates a task function (s-expression) for task i

(defn stress-task-func [i expected]
  (fn [$ & args]
    (println i)
    (let [my-atom (worker-counters i)]
      (try
        (swap! my-atom inc)
        (if (not= @my-atom 1)
          (throw (Exception. (str "Bad counter value: " @my-atom))))
        (if (not= (vec args) (vec expected))
          (throw (Exception. (str "Bad arguments.  Expected: " (vec expected) "  Actual: " args))))
        (Thread/sleep 100)
        (swap! workers-completed #(conj % i))
        (finally 
          (swap! my-atom dec)))
      i)))

We build the plan. The first K tasks are built without arguments. The other M-K tasks are built with K arguments each, which are randomly selected from the range [0,i)

(defn build-stress-plan [{:keys [create-plan
                                 add-task
                                 mark-as-ready]} parent]
  (let [plan (create-plan parent)]
    (loop [tasks {}
           i 0]
      (if (< i M)
        (let [next-task (if (< i K)
                          (add-task plan `(stress-task-func ~i nil) [])
                          ;; else
                          (let [selected (take K (shuffle (range i)))]
                            (add-task plan `(stress-task-func ~i ~(vec selected)) (map tasks selected))))]
          (recur (assoc tasks i next-task) (inc i)))))
    (mark-as-ready plan)
    plan))

Puttint this all together:

  • Using dependency injection:
    • Gain access to a zookeeper client and the zk-plan functions
    • Spawn N worker threads
  • Clear the parent: /stress if exists
  • (Re) Create the parent
  • Create the plan
  • Wait until the plan is complete
  • Shut down
  • Go home happy.
:integ ; This is an integration test
 (let [$ (di/injector {:zookeeper-config {:url "127.0.0.1:2181"}
                     :zk-plan-config {:num-threads N
                                      :parent "/stress"}})]
 (module $)
 (di/startup $)
 (di/do-with! $ [zookeeper
                 zk-plan]
              (let [{:keys [plan-completed?]} zk-plan
                    parent "/stress"]
                (zk/delete-all @zookeeper "/stress")
                (zk/create @zookeeper parent :persistent? true)
                (let [plan (build-stress-plan zk-plan parent)]
                  (loop []
                    (when-not (plan-completed? plan)
                      (Thread/sleep 100)
                      (recur)))
                  (doseq [m (range M)]
                    (when-not (contains? @workers-completed m)
                      (println "Task " m " was not completed"))))))
 (di/shutdown $))

9    Under the Hood

9.1    get-task

Parameters:

  • zk: the Zookeeper connection object
  • plan: the path to the plan

Returns: path to the task

It returns nil if the plan is empty

(get-task ..zk.. ..plan..) => nil
 (provided
(zk/children ..zk.. ..plan..) => nil)

It returns a task if it does not have dep-* or owner as children

(get-task ..zk.. "/foo") => "/foo/task-1234"
 (provided
(zk/children ..zk.. "/foo") => '("task-1234")
(zk/children ..zk.. "/foo/task-1234") => '("task-2345" "ready" "quux")
(take-ownership ..zk.. "/foo/task-1234") => true)

It does not return tasks that have dep-* children

(get-task ..zk.. "/foo") => nil
 (provided
(zk/children ..zk.. "/foo") => '("task-1234")
(zk/children ..zk.. "/foo/task-1234") => '("task-2345" "ready" "quux" "dep-0001"))

It does not return tasks that have owner nodes

(get-task ..zk.. "/foo") => nil
 (provided
(zk/children ..zk.. "/foo") => '("task-1234")
(zk/children ..zk.. "/foo/task-1234") => '("task-2345" "quux" "ready" "owner"))

It does not take tasks that are not marked ready

(get-task ..zk.. "/foo") => nil
 (provided
(zk/children ..zk.. "/foo") => '("task-1234")
(zk/children ..zk.. "/foo/task-1234") => '("task-2345" "quux"))

It takes ownership over the task by adding an 'owner' node

(get-task ..zk.. "/foo") => "/foo/task-1234"
 (provided
(zk/children ..zk.. "/foo") => '("task-1234")
(zk/children ..zk.. "/foo/task-1234") => '("ready")
(take-ownership ..zk.. "/foo/task-1234") => true)

It moves to the next one if it is unable to take ownership

(get-task ..zk.. "/foo") => "/foo/task-2345"
 (provided
(zk/children ..zk.. "/foo") => '("task-1234" "task-2345")
(zk/children ..zk.. "/foo/task-1234") => '("ready")
(take-ownership ..zk.. "/foo/task-1234") => false
(zk/children ..zk.. "/foo/task-2345") => '("ready")
(take-ownership ..zk.. "/foo/task-2345") => true)

It looks up children lazily

(get-task ..zk.. "/foo") => "/foo/task-1234"
 (provided
(zk/children ..zk.. "/foo") => '("task-1234" "task-2345" "bat")
(zk/children ..zk.. irrelevant) => '("ready") :times 1
(take-ownership ..zk.. "/foo/task-1234") => true)

In case a task is removed before we got the chance to examine it, we move to the next task

(get-task ..zk.. "/foo") => "/foo/task-2345"
 (provided
(zk/children ..zk.. "/foo") => '("task-1234" "task-2345")
(zk/children ..zk.. "/foo/task-1234") => false
(zk/children ..zk.. "/foo/task-2345") => '("ready")
(take-ownership ..zk.. "/foo/task-2345") => true)

If it comes across an empty task, it removes it and moves on

(get-task ..zk.. "/foo") => "/foo/task-2345"
 (provided
(zk/children ..zk.. "/foo") => '("task-1234" "task-2345")
(zk/children ..zk.. "/foo/task-1234") => nil
(zk/delete ..zk.. "/foo/task-1234") => irrelevant
(zk/children ..zk.. "/foo/task-2345") => '("ready")
(take-ownership ..zk.. "/foo/task-2345") => true)

It ignores children of the plan which are not of the form task-*

(get-task ..zk.. "/foo") => nil
 (provided
(zk/children ..zk.. "/foo") => '("bar" "baz" "ready"))

9.2    perform-task

Parameters:

  • zk: the Zookeeper connection object
  • task: path to the task to perform
  • $: a DI injector to be passed to the task

Returns: Nothing in particular

If the task has a 'result' child and no 'prov-*' children, this means the task completed successfully, and the result has been distributed to all dependent tasks (if any). In such a case we remove the task.

(perform-task ..zk.. "/foo/task-1234" ..$..) => irrelevant
 (provided
(zk/children ..zk.. "/foo/task-1234") => '("result")
(get-clj-data irrelevant irrelevant) => 123
(zk/delete-all ..zk.. "/foo/task-1234") => irrelevant)

If prov-* children exist, it reads the result and distributes it across the tasks depending on this task (the corresponding dep-* nodes)

(perform-task ..zk.. "/foo/task-1234" ..$..) => irrelevant
 (provided
(zk/children ..zk.. "/foo/task-1234") => '("result" "prov-00000" "prov-0001")
(get-clj-data ..zk.. "/foo/task-1234/result") => 3.1415
(propagate-result ..zk.. "/foo/task-1234/prov-00000" 3.1415) => irrelevant
(propagate-result ..zk.. "/foo/task-1234/prov-0001" 3.1415) => irrelevant
(zk/delete-all irrelevant irrelevant) => irrelevant)

If the task does not have a result, we need to calculate the result ourselves. We call execute-function to get the result, and store it as the 'result' child.

(perform-task ..zk.. "/foo/task-1234" ..$..) => irrelevant
 (provided
(zk/children ..zk.. "/foo/task-1234") => '()
(execute-function ..zk.. "/foo/task-1234" ..$..) => 1234.5
;; It should create a result child node and store the result to it
(zk/create ..zk.. "/foo/task-1234/result" :persistent? true) => true
(set-initial-clj-data ..zk.. "/foo/task-1234/result" 1234.5) => irrelevant
(zk/delete-all irrelevant irrelevant) => irrelevant)

Sometimes, due to race conditions, when a worker grabs a task it is already handled by another worker. In such cases the task node may be already gone when we try to list its children. In such a case, perform-task returns without doing anything.

(perform-task ..zk.. "/foo/task-1234" ..$..) => irrelevant
 (provided
(zk/children ..zk.. "/foo/task-1234") => false)

If execute-function returns :task-does-not-exist, we do not write the result and return.

(perform-task ..zk.. "/foo/task-1234" ..$..) => irrelevant
 (provided
(zk/children ..zk.. "/foo/task-1234") => '()
(execute-function ..zk.. "/foo/task-1234" ..$..) => :task-does-not-exist)

9.3    set-initial-clj-data

Parameters:

  • zk: the Zookeeper connection object
  • node: the node
  • data: the Clojure s-expression to be stored

Returns: Nothing in particular

It calls zk/set-data to update the data

(set-initial-clj-data ..zk.. ..node.. ..data..) => irrelevant
    (provided
     (pr-str ..data..) => ..str..
     (to-bytes ..str..) => ..bytes..
     (zk/set-data ..zk.. ..node.. ..bytes.. irrelevant) => irrelevant)

It uses zero as the default version.

(set-initial-clj-data ..zk.. ..node.. ..data..) => irrelevant
    (provided
     (zk/set-data irrelevant irrelevant irrelevant 0) => irrelevant)

9.4    add-dependency

Parameters:

  • zk: the Zookeeper connection object
  • from: path of the task that provides the dependency
  • to: path of the task that depends on 'from'

Returns: nothing in particular

It adds sequential children to both the 'from' and the 'to' tasks

(add-dependency ..zk.. "/path/from" "/path/to") => irrelevant
    (provided
     (zk/create ..zk.. "/path/from/prov-" :persistent? true :sequential? true) => irrelevant
     (zk/create ..zk.. "/path/to/dep-" :persistent? true :sequential? true) => irrelevant
     (set-initial-clj-data irrelevant irrelevant irrelevant) => irrelevant)

It sets the data of the prov child to be the path to the corresponding dep child

(add-dependency ..zk.. "/path/from" "/path/to") => irrelevant
    (provided
     (zk/create ..zk.. "/path/from/prov-" :persistent? true :sequential? true) => ..from-link..
     (zk/create ..zk.. "/path/to/dep-" :persistent? true :sequential? true) => ..to-link..
     (set-initial-clj-data ..zk.. ..from-link.. ..to-link..) => irrelevant)

9.5    take-ownership

Parameters:

  • zk: the Zookeeper connection object
  • task: the task to take ownership over

Returns: whether or not we managed to take ownership

It tries to add an ephemeral 'owner' node to the task, and return whether it was successful

(take-ownership ..zk.. "/foo/task-1234") => ..result..
    (provided
     (zk/create ..zk.. "/foo/task-1234/owner" :persistent? false) => ..result..)

9.6    execute-function

Parameters:

  • zk: the Zookeeper connection object
  • task: path to the task node containing arguments for the function
  • $: an injector to be passed to the task function

Returns: the return value from the task's function

It reads the function definition from the content of the task node. If no parameters exist in the task it executes the function without parameters.

(def $ (di/injector))
 (di/startup $)
 (defn foo [$])
 (execute-function ..zk.. ..task.. $) => 4
 (provided
(get-clj-data ..zk.. ..task..) => 'foo
(foo $) => 4
(zk/children ..zk.. ..task..) => '("foo" "task-1234"))

It passes the task arguments to the function

(execute-function ..zk.. "/foo/task-1234" $) => [1 2 3]
 (provided
(get-clj-data ..zk.. "/foo/task-1234") => '(fn [$ & args] args)
(zk/children ..zk.. "/foo/task-1234") => '("arg-00001" "arg-00002" "arg-00000")
(get-clj-data ..zk.. "/foo/task-1234/arg-00000") => 1
(get-clj-data ..zk.. "/foo/task-1234/arg-00001") => 2
(get-clj-data ..zk.. "/foo/task-1234/arg-00002") => 3)

If the node is removed before we manage to get the arguments we return :task-does-not-exist without doing anything.

(execute-function ..zk.. "/foo/task-1234" $) => :task-does-not-exist
 (provided
(get-clj-data ..zk.. "/foo/task-1234") => '(fn [$ & args] args)
(zk/children ..zk.. "/foo/task-1234") => false)

9.7    propagate-result

Parameters:

  • zk: the Zookeeper connection object
  • prov: path to the prov-* node to propagate
  • value: the value to be propagated

Returns: nothing in particular

It does the following:

  • reads the path of the dep-* node from the prov-* node
  • create an arg-* node at the same task and with the same serial number as the dep-* node
  • set the value of the arg-* node to be value
  • remove the dep-* node
(propagate-result ..zk.. ..prov.. ..value..) => irrelevant
 (provided
(get-clj-data ..zk.. ..prov..) => "/foo/task-1234/dep-01472"
(zk/create ..zk.. "/foo/task-1234/arg-01472" :persistent? true) => true
(set-initial-clj-data ..zk.. "/foo/task-1234/arg-01472" ..value..) => irrelevant
(zk/delete ..zk.. "/foo/task-1234/dep-01472") => irrelevant)

9.8    get-task-from-any-plan

Parameters:

  • zk: the Zookeeper connection object
  • parent: path to the parent of all plans

Returns: path to a task, if one is found, or nil if not

It starts by getting the list of children (plans). If this list is empty, it returns nil

(get-task-from-any-plan ..zk.. ..parent..) => nil
 (provided
(zk/children ..zk.. ..parent..) => nil)

If a plan exists, we check that it is ready and then call get-task on it

(get-task-from-any-plan ..zk.. "/foo") => ..task..
 (provided
(zk/children ..zk.. "/foo") => '("task-1234")
(zk/exists ..zk.. "/foo/task-1234/ready") => {:some-key "value"}
(get-task ..zk.. "/foo/task-1234") => ..task..)

If a plan is not ready, it should be skipped

(get-task-from-any-plan ..zk.. "/foo") => ..task..
 (provided
(zk/children ..zk.. "/foo") => '("task-1234" "task-2345")
(zk/exists ..zk.. "/foo/task-1234/ready") => nil
(zk/exists ..zk.. "/foo/task-2345/ready") => {:some-key "value"}
(get-task ..zk.. "/foo/task-2345") => ..task..)

If a get-task does not return a task (e.g., no ready tasks), we move on to the next plan. This should be done lazily, so that additional plans must not be queried.

(get-task-from-any-plan ..zk.. "/foo") => ..task..
 (provided
(zk/children ..zk.. "/foo") => '("task-1234" "task-2345" "quux")
(zk/exists ..zk.. "/foo/task-1234/ready") => {:some-key "value"}
(get-task ..zk.. "/foo/task-1234") => nil
(zk/exists ..zk.. "/foo/task-2345/ready") => {:some-key "value"}
(get-task ..zk.. "/foo/task-2345") => ..task..)

If the parent does not exist, nil should be returned.

(get-task-from-any-plan ..zk.. "/foo") => nil
 (provided
(zk/children ..zk.. "/foo") => false)

9.9    calc-sleep-time

Parameters:

  • attrs: a map of attributes based on which we calculate the sleep time, including:
    • :initial - the value to be returned for count 0 (default: 100ms)
    • :increase - the increase factor, by which the value gets multiplied each time (default: 1.5)
    • :max - the maximum sleep time (default: 10 seconds)
  • count: the number of times we already had to wait before getting the last task

Returns: the number of milliseconds to sleep

For count = 0, returns the :initial

(calc-sleep-time {:initial 1234} 0) => 1234

:initial defaults to 100

(calc-sleep-time {} 0) => 100

For count > 0, the :initial value is multiplied by :increase to the power of count

(calc-sleep-time {:increase 2 :initial 1} 8) => 256

:increase defaults to 1.5

(calc-sleep-time {} 1) => 150

The value is capped by :max

(calc-sleep-time {:max 300} 20) => 300

:max defaults to 10000

(calc-sleep-time {} 20) => 10000

:max is applied at any step, such that the function does not overflow even with high count values

(calc-sleep-time {:max 1000} 100) => 1000