rabbit-microservices
is a library that supports the definition of microservices. It provides the serve
function as a resource. In turn, the serve
function registers given functions to a subset of the events published through AMQP. It is provided a partial event, and calls the given function on any event that matches the given partial event.
The event structure we refer to here is the one discussed in cloudlog-events.
serve
is a dependency-injection resource which depends on two functions:
We will consider the following service function for the examples below:
(defn my-service [ev])
It takes a service function and a partial event, and does the following:
declare-service
to declare it.assign-service
.(let [calls (transient [])
$ (di/injector {:declare-service (fn [key reg] (conj! calls [:declare-service key reg]))
:assign-service (fn [key func] (conj! calls [:assign-service key func]))})]
(module $)
(di/startup $)
(di/do-with! $ [serve]
(serve my-service {:partial :event}) => nil
(persistent! calls)
=> [[:declare-service "rabbit-microservices.core-test/my-service" {:partial :event}]
[:assign-service "rabbit-microservices.core-test/my-service" my-service]]))
declare-service
depends on an rabbitmq-service
resource (which itself depends on rabbitmq-config
).
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}})]
(module $)
(di/startup $)
(def declare-service (di/do-with! $ [declare-service] declare-service)))
declare-service
declares a queue and binds it to the facts
exchange to receive events matching the given partial event.
(declare-service "foobar" {:kind :fact
:name "foo/bar"}) => nil
(provided
(lq/declare :some-chan "foobar" {:durable true
:auto-delete false}) => irrelevant
(lq/bind :some-chan "foobar"
facts-exch {:routing-key "f.17cdeaefa5cc6022481c824e15a47a7726f593dd.#"})
=> irrelevant)
Sometimes we wish services to exist only temporarily. We set them up for a particular reason, and when that reason no longer exists we have no use for this service. declare-volatile-service
creates a non-durable AMQP queue, so it is removed after a broker restart.
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}})]
(module $)
(di/startup $)
(di/do-with! $ [declare-volatile-service]
(declare-volatile-service "foobar" {:kind :fact
:name "foo/bar"}) => nil
(provided
(lq/declare :some-chan "foobar" {:durable false}) => irrelevant
(lq/bind :some-chan "foobar"
facts-exch {:routing-key "f.17cdeaefa5cc6022481c824e15a47a7726f593dd.#"})
=> irrelevant)))
assign-service
depends on an rabbitmq-service
.
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}})]
(module $)
(di/startup $)
(def assign-service (di/do-with! $ [assign-service] assign-service)))
assign-service
takes a key that was previously declared and a service function, and subscribes to this queue using a function that wraps the given function.
(assign-service "foobar" my-service) => nil
(provided
(lc/subscribe :some-chan "foobar" irrelevant {:auto-ack true}) => irrelevant)
Auto acknowledgement is disabled when the provided function has three parameters. The third of which is expected to be bound to an explicit ack
function.
(defn my-other-service [event publish ack])
(assign-service "foobar" my-other-service) => nil
(provided
(lc/subscribe :some-chan "foobar" irrelevant {:auto-ack false}) => irrelevant)
The lc/subscribe
function is given a closure returned by handle-event. Please refer to it for more information.
publish
is a service that allows its users to publish events from outside the context of a service. It depends on rabbitmq-service
for publishing, and time
for filling in the :ts
field if one is not specified.
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}
:time (constantly 12345)})]
(module $)
(di/startup $)
(def publish (di/do-with! $ [publish] publish)))
publish
fills in missing details if they are not provided in the original event.
(publish {:some :event}) => nil
(provided
(nippy/freeze {:some :event
:ts 12345 ;; Returned by `time`.
:change 1
:writers #{}
:readers #{}}) => ..bin..
(event-routing-key {:some :event}) => ..routing-key..
(lb/publish :some-chan facts-exch ..routing-key.. ..bin.. {}) => irrelevant)
If these fields exist in the event, their original values are retained.
(publish {:some :event
:ts 34567
:change -1
:writers #{"foo"}
:readers #{"bar"}}) => nil
(provided
(nippy/freeze {:some :event
:ts 34567
:change -1
:writers #{"foo"}
:readers #{"bar"}}) => ..bin..
(event-routing-key {:some :event
:ts 34567
:change -1
:writers #{"foo"}
:readers #{"bar"}}) => ..routing-key..
(lb/publish :some-chan facts-exch ..routing-key.. ..bin.. {}) => irrelevant)
The most efficient and conveinent way to receive messages from a queue is in push mode, where the AMQP broker has control. In this approach (as implemented by assign-service), we implement a function that is called whenever a new event arrives.
However, sometimes we need to work the other way. We need to have control, and consume messages off a queue when we decide to do so.
An example for such a situation is where we respond to a user request. In such a case, without the user request we have nothing to do with the data, so it does not make sense to have a service function handling all messages. When a user request comes in, we wish to read all messages currently on the queue and return them to the user. This is exactly what the poll-events
function does.
poll-events
is a DI resource which is based on rabbitmq-service
.
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}})]
(module $)
(di/startup $)
(di/do-with! $ [poll-events]
(def poll-events poll-events)))
When called, it calls lb/get to read events off the given queue. It reads all events until the queue is empty. Events are auto-acknowledged.
(poll-events ..queue..) => [..ev1.. ..ev2..]
(provided
(lb/get :some-chan ..queue.. true) =streams=> [[..metadata.. ..payload1..]
[..metadata.. ..payload2..]
nil]
(nippy/thaw ..payload1..) => ..ev1..
(nippy/thaw ..payload2..) => ..ev2..)
An event-bridge
is a bidirectional bridge between events flowing globally through Axiom (as AMQP messages), and local events flowing through core.async
channels. It is intended to connect Axiom (the server) to an external client.
It takes a pair of channels named c2s
(client to server) and s2c
(server to client). It does the following:
:fact
events flowing on the c2s
channel.:reg
events by registering to :fact
events they refer to, ands2c
channel.event-bridge
is a DI resource with the following dependencies:
c2s
.s2c
channel.database-chan
(e.g., this), to retrieve any existing events, if so requested during registration.c2s
channel closes.(def calls (async/chan 10))
(defn get-call []
(let [[call chan] (async/alts!! [calls
(async/timeout 1000)])]
(when (not= chan calls)
(throw (Exception. "Timed out waiting to read call")))
call))
(def database-chan (async/chan 10))
(let [$ (di/injector {:publish
(fn [ev] (async/>!! calls [:publish ev]))
:declare-private-queue
(fn [] (async/>!! calls [:declare-private-queue])
"some-random-queue")
:assign-service
(fn [queue func] (async/>!! calls [:assign-service queue func]))
:database-chan
database-chan
:register-events-to-queue
(fn [queue reg] (async/>!! calls [:register-events-to-queue queue reg]))
:delete-queue
(fn [queue] (async/>!! calls [:delete-queue queue]))})]
(module $)
(di/startup $)
(di/do-with! $ [event-bridge]
(def event-bridge event-bridge)))
To create a session, event-bridge
is called with a pair [c2s s2c]
of channels.
(def chan-pair [(async/chan 100) (async/chan 100)])
(event-bridge chan-pair) => nil
The first thing event-bridge
does is declare a private queue, and assigns a handler to it.
(get-call) => [:declare-private-queue]
(let [[assign queue func] (get-call)]
assign => :assign-service
queue => "some-random-queue"
(def incoming-handler func))
Pushing a :fact
event to the c2s
channel will cause it to be published.
(let [[c2s s2c] chan-pair]
(async/>!! c2s {:kind :fact
:name "foo"
:key "bar"
:data [1 2]})
(async/>!! c2s {:kind :fact
:name "foo"
:key "baz"
:data [2 3]})
(get-call) => [:publish {:kind :fact
:name "foo"
:key "bar"
:data [1 2]}]
(get-call) => [:publish {:kind :fact
:name "foo"
:key "baz"
:data [2 3]}])
Pushing a :reg
event will cause it to register to a corresponding :fact
event.
(let [[c2s s2c] chan-pair]
(async/>!! c2s {:kind :reg
:name "x"
:key "y"})
(get-call) => [:register-events-to-queue "some-random-queue" {:kind :fact
:name "x"
:key "y"}])
When an event is received, it is pushed to the s2c
channel.
(let [[c2s s2c] chan-pair]
(incoming-handler {:kind :fact
:name "boo"
:key "tar"
:data [1 2]})
(let [[pushed chan] (async/alts!! [s2c
(async/timeout 1000)])]
chan => s2c
pushed => {:kind :fact
:name "boo"
:key "tar"
:data [1 2]}))
A :reg
event may contain a :get-existing
field. If such a field exists and is true
, a request is made to the database (through database-chan
) to retrieve all existing events matching the registration.
(let [[c2s s2c] chan-pair]
(async/>!! c2s {:kind :reg
:name "x"
:key "y"
:get-existing true})
;; Registration is the same
(get-call) => [:register-events-to-queue "some-random-queue" {:kind :fact
:name "x"
:key "y"}]
(let [[[query reply-chan] chan] (async/alts!! [database-chan
(async/timeout 1000)])]
chan => database-chan
(def reply-chan reply-chan)))
Aggregated results returned from the database are pushed to the s2c
channel.
(let [[c2s s2c] chan-pair]
(async/>!! reply-chan {:kind :fact
:name "some"
:key "event"
:data [1 2 3]
:ts 1000
:change 1})
(async/>!! reply-chan {:kind :fact
:name "some"
:key "other event"
:data [2 3 4]
:ts 2000
:change 1})
(async/>!! reply-chan {:kind :fact
:name "some"
:key "event"
:data [1 2 3]
:ts 3000
:change 1})
(async/close! reply-chan)
(let [[ev chan] (async/alts!! [s2c
(async/timeout 1000)])]
chan => s2c
ev => {:kind :fact
:name "some"
:key "event"
:data [1 2 3]
:ts 3000
:change 2})
(let [[ev chan] (async/alts!! [s2c
(async/timeout 1000)])]
chan => s2c
ev => {:kind :fact
:name "some"
:key "other event"
:data [2 3 4]
:ts 2000
:change 1}))
When the c2s
channel is closed, event-bridge
deletes its queue.
(let [[c2s s2c] chan-pair]
(async/close! c2s)
(get-call) => [:delete-queue "some-random-queue"])
To demonstrate how the above functions work together we build a small usage example.
In our example we have the following service functions:
sum-foo
that sums the values in the data of events of name "foo" and emits events named "foo-sum".count-ev
that counts the events of the different names and emits events of type "count" (it does not count "count" events).foo-from-count
that listens on "count" events and emits corresponding "foo" events with the same value.:integ ; Integration test. Does not run during continouts integration
(def foo-sum (atom 0))
(def ev-count (atom {}))
(def $ (di/injector))
(module $)
(di/do-with $ [serve]
(println "Registering services")
(serve (fn ;; sum-foo
[ev pub]
(swap! foo-sum (partial + (:data ev)))
(pub {:kind :fact
:name "foo-sum"
:key 0
:data @foo-sum}))
{:kind :fact
:name "foo"})
(serve (fn ;; count-ev
[ev pub]
(let [key (:name ev)]
(swap! ev-count (fn [m]
(let [old (or (m key) 0)
new (inc old)]
(assoc m key new))))
(pub {:kind :fact
:name "count"
:key key
:data (@ev-count key)})))
{:kind :fact})
(serve (fn ;; foo-from-count
[ev pub]
(pub {:kind :fact
:name "foo"
:key (:key ev)
:data (:data ev)}))
{:kind :fact
:name "count"}))
For the serve
resource to initialize, we need to provide an rabbitmq-config
resource with values needed for the underlying langohr
library to find the AMQP broker. We will use the default values.
:integ
(di/provide $ rabbitmq-config []
rmq/*default-config*)
(di/startup $)
Now our service is ready to roll.
All we need is to get the fire started is a little match – a single event.
:integ ; Integration test. Does not run during continouts integration
(di/do-with! $ [publish]
(println "Kicking the first event")
(publish {:kind :fact
:name "foo"
:key 0
:data 1}))
(println "waiting for the sum to reach ten-milion")
(while (< @foo-sum 10000000)
(Thread/sleep 100))
(println "done!")
rabbitmq-service
is a DI resource that represents a running RabbitMQ broker. It is a map containing the following properties:
:conn
– a connection object to a server.:chan
– an open channel.:alive
– an atom holding a Boolean value used for shutting down the conection.Initialization also declares the facts
exchange, which is a topic-based AMQP exchange.
(let [$ (di/injector {:rabbitmq-config rmq/*default-config*})]
(module $)
(di/startup $)
(di/do-with! $ [rabbitmq-service]
[(:conn rabbitmq-service)
(:chan rabbitmq-service)
@(:alive rabbitmq-service)])) => [..conn.. ..chan.. true]
(provided
(rmq/connect) => ..conn..
(lch/open ..conn..) => ..chan..
(le/declare ..chan.. facts-exch "topic") => irrelevant)
The dynamic variable langohr.core/*default-config*
controls the configuration. See the Langohr documentation for more information.
The event-routing-key
function returns a AMQP-conforming routing key for a given event map.
(event-routing-key {:kind :fact
:name "foo/bar"
:key 1234
:ts 1000
:data ["foo" :bar]
:change 1
:writers #{}
:readers #{}}) => "f.17cdeaefa5cc6022481c824e15a47a7726f593dd.7110eda4d09e062aa5e4a390b0a572ac0d2c0220"
The initial f
is derived from :kind :fact
. For :kind :rule
an exception is thrown.
(event-routing-key {:kind :rule
:name "foo/bar"
:key 1234
:ts 1000
:data ["foo" :bar]
:change 1
:writers #{}
:readers #{}}) => (throws "Only fact events are supported")
The ony fields that effect the routing key are kind
, name
and key
.
(let [base (event-routing-key {:kind :fact
:name "foo/bar"
:key 1234
:ts 1000
:data ["foo" :bar]
:change 1
:writers #{}
:readers #{}})]
(= (event-routing-key {:kind :fact
:name "foo/bar"
:key 1234
:ts 1001
:data ["fool" :baz]
:change -1
:writers #{:me}
:readers #{:you}}) base) => true
(= (event-routing-key {:kind :fact
:name "foo/baz" ; Different name
:key 1234
:ts 1001
:data ["fool" :baz]
:change -1
:writers #{:me}
:readers #{:you}}) base) => false
(= (event-routing-key {:kind :fact
:name "foo/bar"
:key [:foo :bar] ; Different key
:ts 1001
:data ["fool" :baz]
:change -1
:writers #{:me}
:readers #{:you}}) base) => false)
event-routing-key
can handle partial events. If :name
is omitted, a pattern that matches all facts is returned.
(event-routing-key {:kind :fact}) => "f.#"
If :name
is provided, but :key
is omitted, a pattern matching all keys with that name is returned.
(event-routing-key {:kind :fact
:name "foo/bar"}) => #"f\.[0-9a-f]+\.#"
handle-event
takes a function to be called when an event arrives and the :alive
atom for the service along with arguments provided by a subscription to a queue, so that the function (partial handle-event some-func alive)
can be used with lc/subscribe
.
The wrapped function can take one, two, or three parameters. If it takes one parameter, we pass this parameter a de-serialization of the event.
(def alive (atom true))
(def received (atom '()))
(defn my-func [ev]
(swap! received #(conj % ev)))
(def my-event {:kind :fact
:name "foo/bar"
:key 1234
:ts 1000
:data ["foo" :bar]
:change 1
:writers #{}
:readers #{}})
(def my-event-bin (nippy/freeze my-event))
; By running handle-event...
(handle-event my-func alive constantly :the-channel {:delivery-tag 777} my-event-bin) => nil
; we execute my-func, which adds the event to received
@received => [my-event]
If the wrapped function accepts two parameters, the second parameter is taken to be a publish
function, which publishes events on the facts exchange. The event handler (my-func
in the example below) does not have to specify all fields in the event it publishes (event2
in the example). All fields that are not specified in the event default to their values in the event that triggerred the function (event-that-was-sent
in the example).
(def event2 {:name "foo/baz"
:key 5555
:data [1 2 3 4]})
(defn my-func [ev publish]
(publish event2))
(def event-that-was-sent
(merge my-event event2))
(handle-event my-func alive constantly :the-channel :meta-attrs my-event-bin) => nil
(provided
(nippy/freeze event-that-was-sent) => ..bin..
(lb/publish :the-channel facts-exch (event-routing-key event2) ..bin.. :meta-attrs) => irrelevant)
When the alive
atom evaluates to false
, the publish function does nothing.
(reset! alive false)
(handle-event my-func alive constantly :the-channel :meta-attrs my-event-bin) => nil
; Nothing is published
If the wrapped function accepts three parameters, the third parameter is taken to be an ack
function, that explicitly acknowledges the received event.
(defn my-func [ev publish ack]
(ack))
(handle-event my-func alive constantly :the-channel {:delivery-tag "foo"} my-event-bin) => nil
(provided
(lb/ack :the-channel "foo") => irrelevant)
declare-private-queue
is made to allow the creation of private queues, to be used not for a service, but rather for a specific user session. Unlike declare-service, it does not bind an exchange to this queue.
It is a DI resource, based on rabbitmq-service
.
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}})]
(module $)
(di/startup $)
(di/do-with! $ [declare-private-queue]
(def declare-private-queue declare-private-queue)))
It takes no parameters, and calls lq/declare-server-named to create a new queue, named by the AMQP broker, exclusive to this connection, non-durable and auto-deletable.
(declare-private-queue) => ..queue..
(provided
(lq/declare-server-named :some-chan {:exclusive true}) => ..queue..)
delete-queue
deletes a queue of a given name.
It is a DI resource that depends on rabbitmq-service
.
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}})]
(module $)
(di/startup $)
(di/do-with! $ [delete-queue]
(def delete-queue delete-queue)))
It takes one parameter – the name of the queue to be deleted, and calls lq/delete
with that name.
(delete-queue ..queue..) => nil
(provided
(lq/delete :some-chan ..queue..) => irrelevant)
register-events-to-queue
takes a queue name and a partial events, and makes sure events matching this partial event are sent to this queue.
It is a DI resource based on rabbitmq-service
.
(let [$ (di/injector {:rabbitmq-service {:chan :some-chan}})]
(module $)
(di/startup $)
(di/do-with! $ [register-events-to-queue]
(def register-events-to-queue register-events-to-queue)))
When called, it calls event-routing-key to convert the given partial event to an AMQP routing key. Then it calls lq/bind to make sure matching events are placed in the given queue.
(register-events-to-queue ..queue.. ..partial..) => nil
(provided
(event-routing-key ..partial..) => ..routing-key..
(lq/bind :some-chan ..queue.. facts-exch {:routing-key ..routing-key..}) => irrelevant)