Preface
Goal: A simple case example of concurrency using Clojure.
I have been wondering,
if Clojure
have concurency with actor model,
using sender and receiver.
After spending a days, I finally find what I need,
and I share these code for you.
6: Concurrency with Channel using Go Macro
For starter, there is a good reading. But this won’t be enough, as I’ll show you later.
Reference Reading
Clojure Project File
We need to a more setup, so that all script in this artcile could be handled.
:profiles {…
:t14-async {:main t14-async}
:t15-loop {:main t15-loop}
:t16-recursion {:main t16-recursion}
}
Setting Up The Asynchronous Library
First, we need to setup the dependency.
(ns t14-async
(:gen-class)
(:use my-songs)
(:require [clojure.core.async :as a]))
(def tag-chan (a/chan))
Simple Asynchrounous Case
Then we can send and receive using put!
and take!
.
(a/go
(while true
(a/take! tag-chan println))
)
(defn -main [& args]
(doseq [song songs]
(if (not (nil? (:tags song)))
(doseq [tag (:tags song)]
(a/put! tag-chan tag)
))))
With the result similar as below string
:
$ lein with-profile t14-async run
60s
jazz
60s
rock
70s
rock
70s
pop
Sender and Receiver
We can, rewrite above code with actor pattern,
using sender
and receiver
.
(ns t14-async
(:gen-class)
(:use my-songs)
(:require [clojure.core.async :as async]))
(def tag-chan (async/chan))
(defn receiver []
(while true
(async/take! tag-chan println))
)
(async/go (receiver))
(defn sender []
(doseq [song songs]
(if (not (nil? (:tags song)))
(doseq [tag (:tags song)]
(async/put! tag-chan tag)
))))
(defn -main [& args] (sender))
With the result exactly the same as above.
This is the basic of sender
and receiver
.
Or in other term called producer
and customer
.
Simplified Scope
We can arrange the namespace so we have shorter code
(ns t15-loop
(:gen-class)
(:use my-songs)
(:require [clojure.core.async
:as async
:refer [>! <! >!! <!! go chan close!]]))
(def tag-chan (chan))
Breaking The Loop
Real life concurrency, require a stop signal, so we can quit the loop. This is a little bit tricky with clojure.
(go (do
(def tags (atom []))
(loop [tag (<!! tag-chan)]
(when (not (= tag 0))
(swap! tags conj tag)
(println @tags)
(recur (<!! tag-chan))
))
(println "save me")
(println @tags)
))
(defn -main [& args]
(do
(doseq [song songs]
(if (not (nil? (:tags song)))
(doseq [tag (:tags song)]
(>!! tag-chan tag)
)))
(>!! tag-chan 0)
))
With the result similar as below:
$ lein with-profile t15-loop run
[60s]
[60s jazz]
[60s jazz 60s]
[60s jazz 60s rock]
[60s jazz 60s rock 70s]
[60s jazz 60s rock 70s rock]
[60s jazz 60s rock 70s rock 70s]
[60s jazz 60s rock 70s rock 70s pop]
save me
[60s jazz 60s rock 70s rock 70s pop]
The code above rely on this part:
(loop [tag (<!! tag-chan)]
(when (not (= tag 0))
(swap! tags conj tag)
(println @tags)
(recur (<!! tag-chan))
))
Laziness in Asynchronous
I have a problem with laziness.
It is not me that lazy.
This issue is in context of lazy call in clojure
.
What if we get rid of this unwanted (println @tags)
code line?
(go (do
(def tags (atom []))
(loop [tag (<!! tag-chan)]
(when (not (= tag 0))
(swap! tags conj tag)
(recur (<!! tag-chan))
))
(println "save me")
(println @tags)
))
The result is not what we have expected, as shown below:
$ lein with-profile t15-loop run
save me
The last (println @tags)
won’t print at all!
What am I supposed to do?
7: Concurrency with Future
Luckily we have other concurrency feature called future
.
Mutable using Atom
We can utilize future
instead of go macro
.
(defn receiver [] (do
(def tags (atom []))
(loop [tag (<!! tag-chan)]
(when (not (= tag 0))
(swap! tags conj tag)
(recur (<!! tag-chan))
))
(println @tags)
(close! tag-chan)
))
(defn sender []
(do
(doseq [song songs]
(if (not (nil? (:tags song)))
(doseq [tag (:tags song)]
(>!! tag-chan tag))))
(>!! tag-chan 0)))
(defn -main [& args]
(future (receiver))
(sender)
(shutdown-agents)
The result is as good as what we expected, as shown below:
$ lein with-profile t15-loop run
[60s jazz 60s rock 70s rock 70s pop]
Immutable Using Recursion
Immutability is very common in functinal programming.
As an alternate approach, we can use recursive instead of using atom
.
(defn receiver
([] (receiver []))
([tags]
(let [tag (<!! tag-chan)]
(if (not= tag 0)
(receiver (conj tags tag))
(println tags))
)))
With the result exactly the same as above.
$ lein with-profile t16-recursion run
[60s jazz 60s rock 70s rock 70s pop]
Which one is better, is depend on the use case.
What is Next 🤔?
We should be ready for our next topic, concurrency case example.
Consider continue reading [ Scala - Playing with Records - Part One ].