axiom//axiom-clj.core An Integration of All Dependencies

Author: Temporarily Removed  (temporarily@removed.com)
Date: 25 April 2018
Repository: https://github.com/temporarily/removed
Version: 0.4.1

1    Introduction

This library is the integration point between all the components that make up the Axiom application platform. With the help of our dependency injection library we manage to decouple most libraries from dependency on one another. This is especially important (and true) for libraries providing resources based on external dependencies, such as Zookeeper or DynamoDB. Decoupling them is important to replace these dependencies with others, without modifying other parts of the code.

However, at one point our choice of dependencies must be made concrete. This library provides this point. All it provides is a main function, that loads a configuration file and starts an injector based on it. The choice of module to be used when initializing the injector is made here. After calling di/startup to start all services the main function enters an infinite Thread/sleep loop. However, it uses .addShutdownHook on the JVM's Runtime to register di/shutdown when the process is interrupted or killed from the outside.

This document consists of integration tests for different parts of Axiom.

2    Information Tier Integration Test

In this section we build an integration test that tests Axiom's entire data processing pipeline, including data migration and a topology.

2.1    Overview

We base our test on the tweetlog-clj project, which introduces a simple Twitter-like application. We will stream a sequence of facts to it, and sometime during this stream we will introduce the tweetlog application, triggerring a migration. We will test that the derived facts created by the rule in the application take into account all facts, emitted both before and after introducing the app. This will demonstrate how the migration process complements the topology, and how data is not lost in the process.

2.2    Initialization

We use the following configuration:

(def config
  {:zookeeper-config {:url "127.0.0.1:2181"}
   :zk-plan-config {:num-threads 5
                    :parent "/my-plans"}
   :dynamodb-config {:access-key (str "AXIOM" (rand-int 1000000))
                     :secret-key "XXYY"
                     :endpoint "http://localhost:8006"}
   :num-database-retriever-threads 1
   :dynamodb-default-throughput {:read 1 :write 1}
   :dynamodb-event-storage-num-threads 3
   :rabbitmq-config {:username "guest"
                 :password "guest"
                 :vhost "/"
                 :host "localhost"
                 :port 5672}
   :migration-config {:number-of-shards 3
                      :plan-prefix "/my-plans"
                      :clone-location "/tmp"
                      :clone-depth 10}
   :s3-config {:bucket-name (System/getenv "PERMACODE_S3_BUCKET")
               :access-key (System/getenv "AWS_ACCESS_KEY")
               :secret-key (System/getenv "AWS_SECRET_KEY")}
   :local-storm-cluster true
   :fact-spout {:include [:rabbitmq-config]}
   :store-bolt {:include [:dynamodb-event-storage-num-threads
                          :dynamodb-default-throughput
                          :dynamodb-config]}
   :output-bolt {:include [:rabbitmq-config]}
   :initlal-link-bolt {:include [:s3-config]}
   :link-bolt {:include [:s3-config
                         :dynamodb-config
                         :dynamodb-default-throughput
                         :num-database-retriever-threads]}})

The injector function creates an injector based on the given config, and calls all the module functions for all the dependencies.

(def $ (injector config))

Now we can start-up the system.

:integ
 (di/startup $)

We need to make sure our state is not stored in Zookeeper, so we clear the trees that store it.

:integ
 (println 0)
 (di/do-with! $ [zookeeper]
            (println 1)
            (when (zk/exists @zookeeper "/perms")
              (zk/delete-all @zookeeper "/perms"))
            (println 2)
            (zk/create @zookeeper "/perms" :persistent? true)
            (println 3)
            (when (zk/exists @zookeeper "/my-plans")
              (zk/delete-all @zookeeper "/my-plans"))
            (println 4)
            (zk/create @zookeeper "/my-plans" :persistent? true)
            (println 5))

2.3    Input Data

For the purpose of this test we consider a Twitter-like app for numbers. Our "users" will therefore be the numbers between 10 and 100, exclusive. Numbers follow other numbers if they literally follow them. Specifically, each number "follows" the ten numbers that precede it.

We do this in two phases. In the first phase all numbers follow the five preceiding numbers, and in the second phase they follow the five preceding them. This is done in a thread that waits 100 milliseconds between each publication.

:integ
 (def following-thread
 (di/do-with! $ [publish]
              (async/thread
                (let [ts (atom 1000000)
                      phase (fn [offs]
                              (doseq [u1 (range 10 100)
                                      u2 (range (- u1 5 offs) (- u1 offs))]
                                (publish
                                 {:kind :fact
                                  :name "tweetlog/follows"
                                  :key (str u1)
                                  :data [(str u2)]
                                  :ts (swap! ts inc)
                                  :change 1
                                  :readers #{}
                                  :writers #{(str u1)}})
                                (Thread/sleep 300)))]
                  (phase 0)
                  (phase 5)))))

Each number makes two tweets: Hello and Goodbye.

:integ
 (def tweet-thread
 (di/do-with! $ [publish]
              (async/thread
                (let [ts (atom 1000000)]
                  (doseq [u (range 100)
                          msg ["Hello" "Goodbye"]]
                    (publish
                     {:kind :fact
                      :name "tweetlog/tweeted"
                      :key (str u)
                      :data [(str msg " from " u)]
                      :ts (swap! ts inc)
                      :change 1
                      :readers #{}
                      :writers #{(str u)}})
                    (Thread/sleep 200))))))

2.4    Introducing the App

To start a migration and a subsequent topology we introduce a axiom/app-version fact with the version of our code. We do this after a 10 second delay intended to allow some (but not all) of the facts to already be stored when this rule is introduced.

:integ
 (def app-thread
 (di/do-with! $ [publish]
              (async/thread
                (Thread/sleep (* 10 1000))
                (publish {:kind :fact
                          :name "axiom/app-version"
                          :key "https://github.com/brosenan/tweetlog-clj.git"
                          :data ["d3a8c6c5b946279186f857381e751801a657f70c"]
                          :ts 1000
                          :change 1
                          :writers #{}
                          :readers #{}}))))

2.5    Testing the Output

If all works as expected, each number in the range 10 to 100 (exclusive) should have exactly 20 (= 10 followees * 2 tweets) tweets in their timeline. To make sure this is the case we walk through all the numbers in this range and query their timeline. If we get a smaller number of tweets we sleep and try again to allow the processing to complete for this user. If the number exceeds 20 we fail.

:integ
 (di/do-with! $ [database-chan]
            (doseq [i (range 10 100)]
              (loop []
                (let [chan (async/chan)
                      ev {:kind :fact
                          :name "perm.QmdLhmeiaJTMPdv7oT7mUsztdtjHq7f16Dw6nkR6JhxswP/followee-tweets"
                          :key (str i)}]
                  (async/>!! database-chan [ev chan])
                  (let [timeline (->> chan
                                      (async/reduce conj #{})
                                      async/<!!)]
                    (cond (< (count timeline) 20)
                          (do
                            (Thread/sleep 1000)
                            (recur))
                          :else
                          (do
                            (count timeline) => 20)))))))

2.6    Shutting Down

Eventually we wait for all threads to complete and shut down the system.

:integ
 (async/<!! following-thread)
 (async/<!! tweet-thread)
 (async/<!! app-thread)
 (di/shutdown $)