migrator
is a microservice that listens to events describing new rules being published, and initiates data migration using zk-plan.
Consider for example the following definitions originally introduced in our cloudlog documentation.
(clg/defrule timeline [user author tweet]
[:test/follows user author] (clg/by-anyone)
[:test/tweeted author tweet] (clg/by-anyone))
(clg/defrule trending [tweet]
[:test/influencer influencer] (clg/by-anyone)
[timeline influencer tweet] (clg/by-anyone))
(def stop-words #{"a" "is" "to" "the"})
(clg/defrule index-docs [word id]
[:test/doc id text] (clg/by-anyone)
(for [word (clojure.string/split text #"[,!.? ]+")])
(let [word (clojure.string/lower-case word)])
(when-not (contains? stop-words word)))
(clg/defclause multi-keyword-search
[:test/multi-keyword-search keywords -> text]
(let [keywords (map clojure.string/lower-case keywords)
first-kw (first keywords)])
[index-docs first-kw id]
[:test/doc id text] (clg/by-anyone)
(let [lc-text (clojure.string/lower-case text)])
(when (every? #(clojure.string/includes? lc-text %)
(rest keywords))))
If we introduce these rules when :test/follows
, :test/tweeted
and :test/influencer
facts already exist, a migration process is necessary to do the following:
To allow this to happen, we need to go through all the :test/follows
facts first and create intermediate rule tuples based on them. Then we need to go through all the :test/tweeted
facts and match them against the tuples we generated in the previous step to know to which timelines each tweet needs to go. Then, after all timeline entries are calculated we can go through all influencers and create trending data.
Going through all the existing facts with a certain name can be a lengthy process. To speed things up we do the following:
When developers push a new version of their code, they publish an :axiom/app-version
event stating the clone URL and git commit hash for that new version. The origin of such operation could be a handler for a GitHub webhook or something similar.
This service uses git to clone the specified version into the local file system. It relies on the following resources:
sh
: A shell to run git
commands in.migration-config
: where the :clone-location
– the directory where all clones are performed, is specified, and the :clone-depth
.declare-service
and assign-service
: which registers this function with the :axiom/app-version
event.(def cmds (transient []))
(def staticdir (atom nil))
(let [decl (transient {})
assign (transient {})
$ (di/injector {:sh (fn [& args]
(conj! cmds args)
{:exit 0
:out "some output"
:err ""})
:migration-config {:clone-location "/my/clone/location"
:clone-depth 12}
:declare-service (fn [q partial]
(assoc! decl q partial))
:assign-service (fn [q func]
(assoc! assign q func))
:deploy-dir (fn [ver dir publish]
(publish {:ver ver
:dir dir}))})]
(module $)
(di/startup $)
((persistent! decl) "migrator.core/push-handler") => {:kind :fact
:name "axiom/app-version"}
(def push-handler ((persistent! assign) "migrator.core/push-handler")))
push-handler => fn?
The push-handler
function responds to such events by calling git clone
and git checkout
to get the specified version of the specified repo inside the given directory. Then permacode.publish/hash-all
and hash-static-files
are called on the local repo, and then the directory is removed.
(def published (transient []))
(push-handler {:kind :fact
:name "axiom/app-version"
:key "https://example.com/some/repo"
:data ["ABCD1234"]}
(fn [ev]
(conj! published ev))) => nil
(provided
(rand-int 1000000000) => 12345)
(persistent! cmds) => [["git" "clone" "--depth" "12" "https://example.com/some/repo" "/my/clone/location/repo12345"]
["git" "checkout" "ABCD1234" :dir "/my/clone/location/repo12345"]
["rm" "-rf" "/my/clone/location/repo12345"]]
The handler publishes an :axiom/perm-versions
event, for which the key is the new version, and the data consists of two maps:
(persistent! published) => [{:ver "ABCD1234"
:dir "/my/clone/location/repo12345"}]
perm-tracker
registers to :axiom/perm-versions
and tracks the quantity of each permacode module by summing the :change
field of the event.
It depends on the resources zookeeper-counter-add declare-service
and assign-service
, which we will mock.
(def mock-counters (transient {"/perms/perm.ABCD123" 2}))
(def calls (transient []))
(let [$ (di/injector {:zookeeper-counter-add (fn [path change]
(let [old (mock-counters path 0)
new (+ old change)]
(assoc! mock-counters path new)
new))
:declare-service (fn [key reg] (conj! calls [:declare-service key reg]))
:assign-service (fn [key func] (conj! calls [:assign-service key func]))})]
(module $)
(di/startup $)
(def calls (persistent! calls))
(first calls) => [:declare-service "migrator.core/perm-tracker" {:kind :fact
:name "axiom/perm-versions"}])
The function rule-tracker
is the second argument given to assign-service
.
(let [call (second calls)]
(take 2 call) => [:assign-service "migrator.core/perm-tracker"]
(def rule-tracker (call 2)))
The perm-tracker
service function is given an :axiom/perm-versions
event and a publish
function.
(rule-tracker {:kind :fact
:name "axiom/perm-versions"
:key "ABCD1234"
:data [{'foo 'perm.ABCD123} {}]
:change 3}
(fn publish [ev]
(throw (Exception. "This should not be called")))) => nil
It calls zookeeper-counter-add
to increment the counter corresponding to the rule.
(mock-counters "/perms/perm.ABCD123") => 5
If one or more perms go from 0 to a positive count, an :axiom/perms-exist
event with :change = 1
is published.
(rule-tracker {:kind :fact
:name "axiom/perm-versions"
:key "ABCD1234"
:data [{'foo 'perm.ABCD123
'bar 'perm.EFGH456} {}]
:change 3} ..pub..) => nil
(provided
(..pub.. {:kind :fact
:name "axiom/perms-exist"
:key "ABCD1234"
:data [#{'perm.EFGH456}]
:change 1}) => irrelevant)
This of-course only happens when the change is positive.
(rule-tracker {:kind :fact
:name "axiom/perm-versions"
:key "ABCD1234"
:data [{'foobar 'perm.FOOBAR} {}]
:change -3} (fn publish [ev]
(throw (Exception. "This should not be called")))) => nil
If the aggregated value of the rule goes down to 0, an :axiom/perms-exist
event with :change = -1
is published.
(rule-tracker {:kind :fact
:name "axiom/perm-versions"
:key "ABCD1234"
:data [{'foo 'perm.ABCD123
'bar 'perm.EFGH456} {}]
:change -3} ..pub..) => nil
(provided
(..pub.. {:kind :fact
:name "axiom/perms-exist"
:key "ABCD1234"
:data [#{'perm.EFGH456}]
:change -1}) => irrelevant)
When the perm-tracker finds out new permacode modules have been introduced (for the first time), a migration process needs to take place to process all the existing facts that interact with any new rules defined in these modules, to create all the derived facts and intermediate rule tuples that are the result of this interaction.
rule-migrator
is the service function responsible for this. It depends on zk-plan, which it uses to create the migration plan. We will mock this module with functions that record their own operation, so that we will later be able to view the plan that was created.
(def calls (transient []))
(def last-task (atom 0))
(def mock-zk-plan
{:create-plan (fn [parent]
(when-not (= permval/*hasher* [:my-hasher])
(throw (Exception. "A custom hasher needs to be bound")))
(conj! calls [:create-plan parent])
:plan-node)
:add-task (fn [plan func args]
(conj! calls [:add-task plan func args])
(swap! last-task inc)
@last-task)
:mark-as-ready (fn [node]
(conj! calls [:mark-as-ready node]))})
With this mock, calls
will contain a list of the calls that were made. create-plan
always returns :plan-node
, and add-task
returns an ordinal number.
Since rule-migrator
is a service, we mock declare-service
and assign-service
to get the actual function. We also provides it a migration-config
resource containing the :number-of-shards
parameter, determining how much parallelism we wish to have. The hasher
resource is used to retrieve the content of the permacode modules.
(def published (atom []))
(let [calls-chan (async/chan 10)
decl (transient {})
assign (transient {})
$ (di/injector {:declare-service (fn [key reg]
(assoc! decl key reg))
:assign-service (fn [key func]
(assoc! assign key func))
:zk-plan mock-zk-plan
:migration-config {:number-of-shards 3
:plan-prefix "/my-plans"}
:hasher [:my-hasher]})]
(module $)
(di/startup $)
((persistent! decl) "migrator.core/rule-migrator")
=> {:kind :fact
:name "axiom/perms-exist"}
(def migrate-rules ((persistent! assign) "migrator.core/rule-migrator")))
Now, if we call the migration function migrate-rules
on a rule, it will create a migration plan. First, it will extract the rules out of the given permacode modules by calling extract-version-rules. Then it will sort them according to their dependencies by calling sort-rules. Finally, it will create a migration plan that will cover all rule functions in these modules, to be migrated one by one, in topological order.
The plan will include, for each rule, a singleton fact-declarer tasks that will start collecting events to be processed by the given rule once the migration is complete; a first phase of initial-migrators to process link 0 of the rule and any number of link-migrator phases to process the rest of the links.
(migrate-rules {:kind :fact
:name "axiom/perms-exist"
:key "ABCD1234"
:data [#{'perm.ABC1234 'perm.DEF5678}]
:writers #{:some-writers}
:change 1}) => nil
(provided
(extract-version-rules 'perm.ABC1234) => [timeline]
(extract-version-rules 'perm.DEF5678) => [trending]
(clg/sort-rules [trending timeline]) => [timeline trending])
(persistent! calls)
=> [[:create-plan "/my-plans"]
[:add-task :plan-node `(fact-declarer 'migrator.core-test/timeline 0) []] ;; => 1
[:add-task :plan-node `(initial-migrator 'migrator.core-test/timeline 0 3) [1]] ;; => 2
[:add-task :plan-node `(initial-migrator 'migrator.core-test/timeline 1 3) [1]] ;; => 3
[:add-task :plan-node `(initial-migrator 'migrator.core-test/timeline 2 3) [1]] ;; => 4
[:add-task :plan-node `(fact-declarer 'migrator.core-test/timeline 1) [2 3 4]] ;; => 5
[:add-task :plan-node `(link-migrator 'migrator.core-test/timeline 1 0 3) [5]] ;; => 6
[:add-task :plan-node `(link-migrator 'migrator.core-test/timeline 1 1 3) [5]] ;; => 7
[:add-task :plan-node `(link-migrator 'migrator.core-test/timeline 1 2 3) [5]] ;; => 8
[:add-task :plan-node `(migration-end-notifier 'migrator.core-test/timeline
#{:some-writers}) [6 7 8]] ;; => 9
[:add-task :plan-node `(fact-declarer 'migrator.core-test/trending 0) [9]] ;; => 10
[:add-task :plan-node `(initial-migrator 'migrator.core-test/trending 0 3) [10]] ;; => 11
[:add-task :plan-node `(initial-migrator 'migrator.core-test/trending 1 3) [10]] ;; => 12
[:add-task :plan-node `(initial-migrator 'migrator.core-test/trending 2 3) [10]] ;; => 13
[:add-task :plan-node `(fact-declarer 'migrator.core-test/trending 1) [11 12 13]] ;; => 14
[:add-task :plan-node `(link-migrator 'migrator.core-test/trending 1 0 3) [14]] ;; => 15
[:add-task :plan-node `(link-migrator 'migrator.core-test/trending 1 1 3) [14]] ;; => 16
[:add-task :plan-node `(link-migrator 'migrator.core-test/trending 1 2 3) [14]] ;; => 17
[:add-task :plan-node `(migration-end-notifier 'migrator.core-test/trending
#{:some-writers}) [15 16 17]] ;; => 18
[:mark-as-ready :plan-node]]
Clauses, unlike rules, do not require migration. When a new clause is introduced we need to declare its input queues (for the query and all contributing facts), and then we need to initiate a topology by publishing an axiom/rule-ready
event.
clause-migrator
is a microservice that registers to axiom/perms-exist
events. It depends on publish
(e.g., this for publishing the axiom/perms-exist
events, hasher
, to retrieve the clause's definition, and on declare-service
and assign-service
for registering itself. declare-service
is also used for declaring the input queues for the clause's topology.
(def declared-queues (atom {}))
(def published (atom []))
(let [decl (atom {})
assign (transient {})
$ (di/injector
{:publish (partial swap! published conj)
:hasher [:hash :unhash]
:declare-service (fn [key part]
(swap! declared-queues assoc key part))
:assign-service (fn [key func]
(assoc! assign key func))})]
(module $)
(di/startup $)
(@declared-queues "migrator.core/clause-migrator") => {:kind :fact
:name "axiom/perms-exist"}
(def clause-migrator ((persistent! assign) "migrator.core/clause-migrator")))
In response to an axiom/perms-exist
event with a positive :change
value, clause-migrator
look up all clauses in the new perms.
(with-redefs-fn {#'extract-version-clauses
(fn [perm]
(when-not (= permval/*hasher* [:hash :unhash])
(throw (Exception. "Hasher not set in call to extract-version-clauses")))
(cond (= perm 'perm.ABC1234)
[multi-keyword-search]
(= perm 'perm.DEF5678)
[]
:else
(throw (Exception. "Unexpected perm value"))))}
#(clause-migrator {:kind :fact
:name "axiom/perms-exist"
:key "ABCD1234"
:data [#{'perm.ABC1234 'perm.DEF5678}]
:writers #{:some-writers}
:change 1})) => nil
Then it declares a queue associated with the facts that feed each link of each clause:
(loop [link multi-keyword-search
n 0]
(when link
(@declared-queues (str "fact-for-rule/migrator.core-test/multi-keyword-search!" n))
=> {:kind :fact
:name (["test/multi-keyword-search?"
"migrator.core-test/index-docs"
"test/doc"] n)}
(recur (-> link meta :continuation) (inc n))))
Finally, it publishes an axiom/rule-ready
event for each clause.
@published => [{:kind :fact
:name "axiom/rule-ready"
:key 0
:data ['migrator.core-test/multi-keyword-search]}]
In this example we will migrate the rules provided in our tweetlog example.
First, we need to provide configuration to connect to Zookeeper, RabbitMQ, DynamoDB (local) and S3 (where we provide coordinates using environment variables).
(def config
{:zookeeper-config {:url "127.0.0.1:2181"}
:zk-plan-config {:num-threads 5
:parent "/my-plans"}
:dynamodb-config {:access-key "FOO"
:secret-key "BAR"
:endpoint "http://localhost:8006"}
:num-database-retriever-threads 1
:dynamodb-default-throughput {:read 1 :write 1}
:dynamodb-event-storage-num-threads 3
:rabbitmq-config {:username "guest"
:password "guest"
:vhost "/"
:host "localhost"
:port 5672}
:migration-config {:number-of-shards 3
:plan-prefix "/my-plans"
:clone-location "/tmp"
:clone-depth 10}
:s3-config {:bucket-name (System/getenv "PERMACODE_S3_BUCKET")
:access-key (System/getenv "AWS_ACCESS_KEY")
:secret-key (System/getenv "AWS_SECRET_KEY")}})
We now create an injector based on the config, and inject dependencies to the migrator and its dependencies.
:integ
(def $ (di/injector config))
(module $)
(rms/module $)
(zkp/module $)
(dyn/module $)
(s3/module $)
(di/startup $)
Let's create the root elements we need in Zookeeper
:integ
(di/do-with! $ [zookeeper]
(when (zk/exists @zookeeper "/perms")
(zk/delete-all @zookeeper "/perms"))
(zk/create @zookeeper "/perms" :persistent? true)
(when (zk/exists @zookeeper "/my-plans")
(zk/delete-all @zookeeper "/my-plans"))
(zk/create @zookeeper "/my-plans" :persistent? true))
The next step would be to generate test data. We will start with tweets:
:integ
(def users ["alice" "bob" "charlie"])
(def time (atom 1000))
(di/do-with! $ [publish]
(doseq [greeting ["hello" "hi" "howdy"]
greeted ["world" "clojure" "axiom"]
user users]
(publish {:kind :fact
:name "tweetlog/tweeted"
:key user
:data [(str greeting " " greeted " from " user)]
:ts @time
:change 1
:writers #{user}
:readers #{}})
(swap! time inc)))
Now let's create a full-factorial following matrix (everyone follows everyone else).
:integ
(di/do-with! $ [publish]
(doseq [u1 users
u2 users]
(when-not (= u1 u2)
(publish {:kind :fact
:name "tweetlog/follows"
:key u1
:data [u2]
:ts @time
:change 1
:writers #{u1}
:readers #{}}))
(swap! time inc))
:not-nil)
Let's give the microservices a few seconds to store all the facts.
:integ
(Thread/sleep 5000)
To know when our migration is complete we need to listen to axiom/rule-ready
events. The following service function listens to such events and for the rule followee-tweets
it closes a channel to indicate it is done.
:integ
(def done (async/chan))
(di/do-with! $ [serve]
(serve (fn [ev]
(when (= (-> ev :data first name) "followee-tweets")
(async/close! done)))
{:kind :fact
:name "axiom/rule-ready"}))
To kick the migration, we need to publish an axiom/app-version
event with a version of the example application.
:integ
(di/do-with! $ [publish]
(publish {:kind :fact
:name "axiom/app-version"
:key "https://github.com/brosenan/tweetlog-clj.git"
:data ["d3a8c6c5b946279186f857381e751801a657f70c"]
:ts 1000
:change 1
:writers #{}
:readers #{}}))
So now we wait for the migration to complete.
:integ
(async/<!! done)
After the migration, all facts have been processed by the rules. This means that Alice's timeline should contain 18 tweets.
:integ
(di/do-with! $ [database-chan]
(let [chan-out (async/chan 30)]
(async/>!! database-chan [{:kind :fact
:name "perm.QmdLhmeiaJTMPdv7oT7mUsztdtjHq7f16Dw6nkR6JhxswP/followee-tweets"
:key "alice"} chan-out])
(loop [res 0]
(let [ev (async/<!! chan-out)]
(cond (nil? ev)
res
:else
(recur (inc res))))))) => 18
Finally, we shut down the injector to stop the workers.
:integ
(di/shutdown $)
zookeeper-counter-add
depends on the zookeeper
resource as dependency, and uses it to implement a global atomic counter.
(let [$ (di/injector {:zookeeper (atom :zk)})]
(module $)
(di/startup $)
(def zookeeper-counter-add (di/do-with! $ [zookeeper-counter-add] zookeeper-counter-add)))
zookeeper-counter-add
takes a path to a counter and a number to be added. If a node corresponding to the given path does not exist, it is assumed to be equal 0, and is therefore created using the given number. The new value, which is the given number, is returned.
(zookeeper-counter-add "/rules/foobar" 3) => 3
(provided
(zk/exists :zk "/rules/foobar") => nil
(zk/create-all :zk "/rules/foobar" :persistent? true) => "/rules/foobar"
(zkp/set-initial-clj-data :zk "/rules/foobar" 3) => irrelevant)
If a node exists, its value is updated to add the given number.
(zookeeper-counter-add "/rules/foobar" 3) => 5
(provided
(zk/exists :zk "/rules/foobar") => {:some :values
:version 7}
(zkp/get-clj-data :zk "/rules/foobar") => 2
(zkp/to-bytes "5") => ..bin..
(zk/set-data :zk "/rules/foobar" ..bin.. 7) => irrelevant)
zookeeper-counter-add
takes an extra retries
parameter which defaults to 3. If zk/set-data
throws an exception for any reason, the update is retried. This is to account for the possibility of concurrent update.
(zookeeper-counter-add "/rules/foobar" 3) => 6
(provided
(zk/exists :zk "/rules/foobar") =streams=> [{:some :values
:version 7}
{:some :values
:version 8}]
(zkp/get-clj-data :zk "/rules/foobar") =streams=> [2 3]
(zkp/to-bytes "5") => ..bin1..
(zk/set-data :zk "/rules/foobar" ..bin1.. 7) =throws=> (Exception. "boo")
(zkp/to-bytes "6") => ..bin2..
(zk/set-data :zk "/rules/foobar" ..bin2.. 8) => irrelevant)
When the retries are exhasted, the function throws.
(zookeeper-counter-add "/rules/foobar" 3) => (throws "boo")
(provided
(zk/exists :zk "/rules/foobar") =streams=> [{:some :values
:version 7}
{:some :values
:version 8}
{:some :values
:version 9}]
(zkp/get-clj-data :zk "/rules/foobar") =streams=> [2 3 4]
(zkp/to-bytes irrelevant) => irrelevant
(zk/set-data :zk "/rules/foobar" irrelevant irrelevant) =throws=> (Exception. "boo"))
fact-declarer
is used to declare the service that will accept events related to a certain fact. We call it before we start migration based on that fact so that new events related to this fact that come during the migration process are accumulated in the queue. Once the migration is done, functions assigned to it will receive these events as well as new ones, so there will not be any data loss.
fact-declarer
is a generator function that takes the name of the rule and the link number (as a unique identifier)
(def decl-my-fact (fact-declarer 'perm.ABC123/my-rule 1))
The returned fact declarer is intended to be used in a zk-plan, as a task function. As such, it needs to accept one or more arguments. It ignores all but the first one, which is an injector ($
) passed to it directly by zk-plan
.
The injector must be able to resolve the resources declare-service
(e.g., the one implemented for RabbitMQ), and hasher
(e.g., the one implemented for S3). We will mock them here.
(let [calls (async/chan 10)
$ (di/injector {:hasher [:my-hasher]
:declare-service (fn [key reg]
(when-not (= permval/*hasher* [:my-hasher])
(throw (Exception.
"A custom hasher needs to be bound")))
(async/>!! calls [:declare-service key reg]))})]
(decl-my-fact $ :some :args :that :are :ignored) => nil
(provided
(perm/eval-symbol 'perm.ABC123/my-rule) => timeline)
;; Assert the calls
(async/alts!! [calls
(async/timeout 1000)])
=> [[:declare-service "fact-for-rule/perm.ABC123/my-rule!1" {:kind :fact
:name "test/tweeted"}] calls])
initial-migrator
creates a migration function for link 0 of a rule. Link 0 is special in that it only depends on a fact, creates rule tuples based on fact tuples. Other links also depend on previous rule tuples.
initial-migrator
takes the following arguments:
rule
: The name of a rule (as a symbol).shard
: The shard numbershards
: The total number of shards being used.It returns a closure (function) that operates from within a zk-plan.(def my-migrator (initial-migrator 'perm.ABC/my-rule 2 6))
The migration process requires a database-scanner
(e.g., this) to scan given shard of the given table (fact). We mock this function by providing :test/follows
facts for Alice, who follows Bob, Charlie and Dave.
(defn mock-scanner [name shard shards chan]
(when-not (= [name shard shards] ["test/follows" 2 6])
(throw (Exception. (str "Unexpected args in mock-scanner: " [name shard shards]))))
(when-not (= permval/*hasher* [:my-hasher])
(throw (Exception. "A custom hasher needs to be bound")))
(doseq [followee ["bob" "charlie" "dave"]]
(async/>!! chan {:kind :fact
:name name
:key "alice"
:data [followee]
:change 1
:ts 1}))
(async/close! chan))
To store the resulting tuples it depends on a database-event-storage-chan
(e.g., this), a channel to which all generated events need to be sent.
(def rule-tuple-chan (async/chan 10))
As a zk-plan
task function, the first argument of the returned closure (my-migrator
in our case), is expected to be an injector ($
), and all other arguments should be ignored. Once called, it will use permacode.core/eval-symbol to get the rule function. Evaluation leverages the hasher
resource. It will evaluate this function on each result comming from the database-scanner
.
(let [$ (di/injector {:hasher [:my-hasher]
:database-scanner mock-scanner
:database-event-storage-chan rule-tuple-chan})]
(my-migrator $ :ignored) => nil
(provided
(perm/eval-symbol 'perm.ABC/my-rule) => timeline))
The migrator function should push to database-event-storage-chan
rule events. In our case, these should be one per each fact.
(defn read-event []
(let [resp (async/alts!! [rule-tuple-chan
(async/timeout 1000)])]
(when-not (= (second resp) rule-tuple-chan)
(throw (Exception. "Opration timed out")))
(async/close! (second (first resp))) ;; ack
(first (first resp))))
(read-event) => {:change 1
:data ["alice" "bob"]
:key "bob"
:kind :rule
:name "migrator.core-test/timeline!0"
:readers nil
:writers #{"migrator.core-test"}
:ts 1}
(read-event) => {:change 1
:data ["alice" "charlie"]
:key "charlie"
:kind :rule
:name "migrator.core-test/timeline!0"
:readers nil
:writers #{"migrator.core-test"}
:ts 1}
(read-event) => {:change 1
:data ["alice" "dave"]
:key "dave"
:kind :rule
:name "migrator.core-test/timeline!0"
:readers nil
:writers #{"migrator.core-test"}
:ts 1}
For links other than 0, migration requires applying a matcher. A matcher takes one event (a fact event in our case), and matches it against all matching events (rule events in our case). We need to provide a database-chan
to allow the matcher to cross reference facts and rules.
The link-migrator
function takes the following arguments:
rule
: The rule to be migrated.link
: The link number within the rule (> 0).shard
: The shard number to be processed by this node.shards
: The overall number of shards.It returns a closure to be processed as a zk-plan
task.(def my-link-migrator (link-migrator 'perm.ABC/my-rule 1 3 17))
For the migration process we will need a database-scanner
that will provide fact events. We will mock one to produce :test/tweeted
facts, stating that Bob, Charlie and Dave all tweeted 'hello'.
(defn mock-scanner [name shard shards chan]
(when-not (= [name shard shards] ["test/tweeted" 3 17])
(throw (Exception. (str "Unexpected args in mock-scanner: " [name shard shards]))))
(when-not (= permval/*hasher* [:my-hasher])
(throw (Exception. "A custom hasher needs to be bound")))
(doseq [user ["bob" "charlie" "dave"]]
(async/>!! chan {:kind :fact
:name name
:key user
:data ["hello"]
:change 1
:ts 1}))
(async/close! chan))
We also need to provide a database-chan
(e.g., this), which in our case, will answer that regardless of who was making the tweet, both Alice and Eve are followers. These events are similar to the ones we got from the initial-migrator.
(def mock-db-chan (async/chan 10))
(async/go
(loop []
(let [[query out-chan] (async/<! mock-db-chan)]
(when-not (nil? query)
;; The query is for rule tuples of link 0
(when-not (= (:name query) "migrator.core-test/timeline!0")
(throw (Exception. (str "Wrong fact in query: " (:name query)))))
(async/>!! out-chan {:change 1
:data ["alice" (:key query)]
:key (:key query)
:kind :rule
:name "migrator.core-test/timeline!0"
:readers nil
:writers #{:some-writer}
:ts 1})
(async/>!! out-chan {:change 1
:data ["eve" (:key query)]
:key (:key query)
:kind :rule
:name "migrator.core-test/timeline!0"
:readers nil
:writers #{:some-writer}
:ts 1})
(async/close! out-chan)
(recur)))))
Once more, we will need a database-event-storage-chan
(e.g., this) to store the events we get.
(def mock-store-chan (async/chan 10))
The closure we got from link-migrator
(my-link-migrator
in our case) takes an injector as a first argument, and ignores all others. It calls permacode.core/eval-symbol to get the rule and then creates a matcher based on it.
(let [$ (di/injector {:database-scanner mock-scanner
:database-chan mock-db-chan
:database-event-storage-chan mock-store-chan
:hasher [:my-hasher]})]
(my-link-migrator $ :ignored) => nil
(provided
(perm/eval-symbol 'perm.ABC/my-rule) => timeline))
The migration function reads all fact events from the scanner, for each event it consults the database-chan
for matching rule tuples, and the resulting events (timeline facts in our case) are pushed to the database-event-storage-chan
.
(defn read-event []
(let [resp (async/alts!! [mock-store-chan
(async/timeout 1000)])]
(when-not (= (second resp) mock-store-chan)
(throw (Exception. "Operation timed out")))
(async/close! (second (first resp))) ;; ack
(first (first resp))))
;; For Bob
(read-event) => {:kind :fact
:name "migrator.core-test/timeline"
:key "eve"
:data ["bob" "hello"]
:change 1
:readers nil
:writers #{:some-writer}
:ts 1}
(read-event) => {:kind :fact
:name "migrator.core-test/timeline"
:key "alice"
:data ["bob" "hello"]
:change 1
:readers nil
:writers #{:some-writer}
:ts 1}
(read-event) => {:kind :fact
:name "migrator.core-test/timeline"
:key "eve"
:data ["charlie" "hello"]
:change 1
:readers nil
:writers #{:some-writer}
:ts 1}
(read-event) => {:kind :fact
:name "migrator.core-test/timeline"
:key "alice"
:data ["charlie" "hello"]
:change 1
:readers nil
:writers #{:some-writer}
:ts 1}
(read-event) => {:kind :fact
:name "migrator.core-test/timeline"
:key "eve"
:data ["dave" "hello"]
:change 1
:readers nil
:writers #{:some-writer}
:ts 1}
(read-event) => {:kind :fact
:name "migrator.core-test/timeline"
:key "alice"
:data ["dave" "hello"]
:change 1
:readers nil
:writers #{:some-writer}
:ts 1}
For good citizenship, let's close the mock database-chan
and allow the service we started shut down.
(async/close! mock-db-chan)
rule-migrator finishes when a plan is created and is ready to be executed. However, the actual migration operation only starts at that point. We therefore need a way to tell other parts of axiom that the rule has been migrated, once migration is complete. For this purpose, the plan will include a final piece: migration-end-notifier
, which will publish a axiom/rule-ready
event holding the rule identifier as its :key
.
(def my-end-notifier (migration-end-notifier 'perm.ABC123/my-rule #{:some-writer}))
The returned closure (my-end-notifier
) takes an injector and any number of other parameters. From the injector it takes the publish
resource to publish the desired event.
(def events (transient []))
(let [$ (di/injector {:publish (fn [ev] (conj! events ev))})]
(my-end-notifier $ :some :other :params))
Once called it should publish
an axiom/rule-ready
event (we ignore the :ts
field which changes with time).
(-> events
persistent!
first
(dissoc :ts)) => {:kind :fact
:name "axiom/rule-ready"
:key 0
:data ['perm.ABC123/my-rule]
:change 1
:writers #{:some-writer}}
extract-version-rules
extracts all the rule functions from such a version and publishes corresponding :axiom/rule
events.
It returns only rule functions, identified by having a :source-fact
meta field.
(let [foo (with-meta (fn []) {:source-fact ["foo" 1]})
bar (with-meta (fn []) {:source-fact ["bar" 1]})
baz (fn [])]
(extract-version-rules 'perm.1234ABC) => [foo
bar]
(provided
(perm/module-publics 'perm.1234ABC) => {'foo foo
'bar bar
'baz baz} ; baz will not be returned
))
It excludes clauses, for which the :source-fact
ends with a question mark (?
).
(let [foo (with-meta (fn []) {:source-fact ["foo" 1]})
bar (with-meta (fn []) {:source-fact ["bar?" 1]})]
(extract-version-rules 'perm.1234ABC) => [foo]
(provided
(perm/module-publics 'perm.1234ABC) => {'foo foo
'bar bar} ; bar will not be returned
))
Like extract-version-rules, extract-version-rules
takes a symbol representing a permacode namespace, and returns a collection of clauses defined in that namespace.
(let [foo (with-meta (fn []) {:source-fact ["foo" 1]}) ;; not a clause
bar (with-meta (fn []) {:source-fact ["bar?" 1]})
baz (fn [])] ;; not a clause
(extract-version-clauses 'perm.1234ABC) => [bar]
(provided
(perm/module-publics 'perm.1234ABC) => {'foo foo
'bar bar
'baz baz}))
When a new version is pushed we wish to scan everything under its /resources/public
directory (if exists), and store all these files in a way that will allow the gateway tier to retrieve them.
We use the hasher
resource (e.g., the one provided for S3) to store the content of files. This way, if a file is not modifed, we store only one copy of it.
The resource hash-static-file
depends on a hasher
. It is a function that takes a path to a file in the local file system, reads its content and hashes it.
To demonstrate how it works we create a temporary file:
(with-open [f (clojure.java.io/writer "/tmp/foo.txt")]
(doseq [i (range 1000)]
(.write f (str "This is line number " i "\n"))))
Now, when we call hash-static-file
it will use the hash
part of the hasher
to store the content of the file, given as a binary array.
(let [content (atom nil)
hash (fn [c]
(reset! content c)
"some-hash")
unhash (fn [h]
(throw (Exception. "unhash should not be called")))
$ (di/injector {:hasher [hash unhash]})]
(module $)
(di/startup $)
(di/do-with! $ [hash-static-file]
(hash-static-file (io/file "/tmp/foo.txt")) => "some-hash")
(class @content) => (class (byte-array 1))
(-> @content
String.
(str/split #"\n")
(get 745)) => "This is line number 745")
To recursively store all files under a given directory (the /resources/public
directory in the repo), the function hash-static-files
applies hash-static-file
to each file in the subtree under the given root. It returns a map from relative paths to hash values.
To demonstrate this we will create a directory under /tmp
and populate it with three file – a.html
, b.css
and c.js
.
(let [dir (io/file "/tmp/mock-static")]
(.mkdirs dir)
(with-open [a (io/writer (io/file dir "a.html"))]
(.write a "Some html..."))
(with-open [b (io/writer (io/file dir "b.css"))]
(.write b "Some CSS..."))
(with-open [c (io/writer (io/file dir "c.js"))]
(.write c "Some JavaScript...")))
Calling hash-static-files
on /tmp/mock-static
will return a map containing entries for /a.html
, /b.css
and c.js
.
(let [$ (di/injector {:hash-static-file
(fn [f]
(str "hash-for-" (.getPath f)))})]
(module $)
(di/startup $)
(di/do-with! $ [hash-static-files]
(hash-static-files (io/file "/tmp/mock-static"))
=> {"/a.html" "hash-for-/tmp/mock-static/a.html"
"/b.css" "hash-for-/tmp/mock-static/b.css"
"/c.js" "hash-for-/tmp/mock-static/c.js"}))
deploy-dir
is a function that takes a base directory and publishes an axiom/perm-versions
event containing all permacode source files and all static content in this directory. It is a DI resource based on a hasher
, hash-static-files and publish
.
(def staticdir (atom nil))
(let [$ (di/injector {:hasher [:my-hasher]
:hash-static-files (fn [root]
(reset! staticdir root)
{"/a.html" "hash1"
"/b.css" "hash2"
"/c.js" "hash3"})})]
(module $)
(di/startup $)
(di/do-with! $ [deploy-dir]
(def deploy-dir deploy-dir)))
This function takes version identifier, a root directory and a publish
function, and does the following:
permacode.publish/hash-all
on its src
sub-directory with the hasher
bound as the *hasher*
, to get a source-code map.hash-static-files
on the /resources/public
sub-directory to get a map for the static files.(def published (atom []))
(deploy-dir "some-ver" "/path/to/deploy" (partial swap! published conj)) => nil
(provided
(permacode.publish/hash-all [:my-hasher] (io/file "/path/to/deploy" "src")) => {'foo 'perm.AAA
'bar 'perm.BBB}
(io/file "/path/to/deploy" "resources/public") => ..static..)
@staticdir => ..static..
Then it uses the given publish
function to publish an axiom/perm-versions
event containing both maps.
@published => [{:kind :fact
:name "axiom/perm-versions"
:key "some-ver"
:data [{'foo 'perm.AAA
'bar 'perm.BBB}
{"/a.html" "hash1"
"/b.css" "hash2"
"/c.js" "hash3"}]}]