{:deps
{org.clojure/clojure {:mvn/version "1.11.2"}
org.clojure/core.async {:mvn/version "1.6.673"}}}
该 core.async 库通过使用通道支持异步编程。
要使用 core.async,请声明对 Clojure 1.10.0 或更高版本以及最新 core.async 库的依赖关系
{:deps
{org.clojure/clojure {:mvn/version "1.11.2"}
org.clojure/core.async {:mvn/version "1.6.673"}}}
要开始使用 core.async,请在 REPL 中加载 clojure.core.async
命名空间
(require '[clojure.core.async :as a :refer [<!! >!! <! >!]])
或者在你的命名空间中包含它
(ns my.ns
(:require [clojure.core.async :as a :refer [<!! >!! <! >!]]))
值在类似队列的通道上传递。默认情况下,通道是无缓冲的(0 长度) - 它们需要生产者和消费者进行会合才能通过通道传输值。
使用 chan
创建一个无缓冲通道
(a/chan)
传递一个数字以创建具有固定缓冲区大小的通道
(a/chan 10)
close!
关闭一个通道以停止接受 put 操作。剩余的值仍然可供获取。已清空的通道在获取时返回 nil。不能显式地将 nil 发送到通道上!
(let [c (a/chan)]
(a/close! c))
通道还可以使用自定义缓冲区,这些缓冲区对“满”情况有不同的策略。API 中提供了两个有用的示例。
;; Use `dropping-buffer` to drop newest values when the buffer is full:
(a/chan (a/dropping-buffer 10))
;; Use `sliding-buffer` to drop oldest values when the buffer is full:
(a/chan (a/sliding-buffer 10))
在普通线程中,我们使用 >!!
(阻塞 put)和 <!!
(阻塞 take)通过通道进行通信。
(let [c (a/chan 10)]
(>!! c "hello")
(assert (= "hello" (<!! c)))
(a/close! c))
因为这些是阻塞调用,如果我们尝试在无缓冲通道上执行 put 操作,我们将阻塞主线程。我们可以使用 thread
(类似于 future
)在池线程中执行一个代码块并返回一个包含结果的通道。在这里,我们启动一个后台任务将“hello”放入通道,然后在当前线程中读取该值。
(let [c (a/chan)]
(a/thread (>!! c "hello"))
(assert (= "hello" (<!! c)))
(a/close! c))
go
宏在特殊的线程池中异步执行其代码块。会阻塞的通道操作将暂停执行而不是阻塞线程。此机制封装了在事件/回调系统中外部的反转控制。在 go
块内部,我们使用 >!
(put)和 <!
(take)。
在这里,我们将之前的通道示例转换为使用 go 块
(let [c (a/chan)]
(a/go (>! c "hello"))
(assert (= "hello" (<!! (a/go (<! c)))))
(a/close! c))
我们没有使用显式的线程和阻塞调用,而是为生产者使用了 go 块。消费者使用 go 块进行 take,然后返回一个结果通道,我们从该通道执行阻塞 take 操作。
通道相对于队列的一个杀手级特性是能够同时等待多个通道(类似于套接字 select)。这是通过 alts!!
(普通线程)或 go 块中的 alts!
完成的。
我们可以创建一个使用 alts 的后台线程,该线程组合两个通道中的任何一个的输入。alts!!
获取要执行的一组操作 - 从中获取的通道或要放入的 [通道值],并返回成功的值(put 操作为 nil)和通道
(let [c1 (a/chan)
c2 (a/chan)]
(a/thread (while true
(let [[v ch] (a/alts!! [c1 c2])]
(println "Read" v "from" ch))))
(>!! c1 "hi")
(>!! c2 "there"))
打印(在标准输出上,可能在你的 repl 中不可见)
Read hi from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Read there from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
我们可以使用 alts! 用 go 块做同样的事情
(let [c1 (a/chan)
c2 (a/chan)]
(a/go (while true
(let [[v ch] (a/alts! [c1 c2])]
(println "Read" v "from" ch))))
(a/go (>! c1 "hi"))
(a/go (>! c2 "there")))
由于 go 块是与线程无关的轻量级进程,因此我们可以拥有很多 go 块!在这里,我们创建了 1000 个 go 块,它们在 1000 个通道上说 hi。我们使用 alts!! 在它们准备好时读取它们。
(let [n 1000
cs (repeatedly n a/chan)
begin (System/currentTimeMillis)]
(doseq [c cs] (a/go (>! c "hi")))
(dotimes [i n]
(let [[v c] (a/alts!! cs)]
(assert (= "hi" v))))
(println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
timeout
创建一个等待指定毫秒数然后关闭的通道
(let [t (a/timeout 100)
begin (System/currentTimeMillis)]
(<!! t)
(println "Waited" (- (System/currentTimeMillis) begin)))
我们可以将 timeout 与 alts!
结合使用以执行定时通道等待。在这里,我们等待 100 毫秒以查看通道上是否有值到达,然后放弃
(let [c (a/chan)
begin (System/currentTimeMillis)]
(a/alts!! [c (a/timeout 100)])
(println "Gave up after" (- (System/currentTimeMillis) begin)))
原文作者:Alex Miller