Clojure

core.async 教程

入门

core.async 库通过使用通道支持异步编程。

要使用 core.async,请声明对 Clojure 1.10.0 或更高版本以及最新 core.async 库的依赖关系

{:deps
 {org.clojure/clojure {:mvn/version "1.11.2"}
  org.clojure/core.async {:mvn/version "1.6.673"}}}

要开始使用 core.async,请在 REPL 中加载 clojure.core.async 命名空间

(require '[clojure.core.async :as a :refer [<!! >!! <! >!]])

或者在你的命名空间中包含它

(ns my.ns
  (:require [clojure.core.async :as a :refer [<!! >!! <! >!]]))

通道

值在类似队列的通道上传递。默认情况下,通道是无缓冲的(0 长度) - 它们需要生产者和消费者进行会合才能通过通道传输值。

使用 chan 创建一个无缓冲通道

(a/chan)

传递一个数字以创建具有固定缓冲区大小的通道

(a/chan 10)

close! 关闭一个通道以停止接受 put 操作。剩余的值仍然可供获取。已清空的通道在获取时返回 nil。不能显式地将 nil 发送到通道上!

(let [c (a/chan)]
  (a/close! c))

通道还可以使用自定义缓冲区,这些缓冲区对“满”情况有不同的策略。API 中提供了两个有用的示例。

;; Use `dropping-buffer` to drop newest values when the buffer is full:
(a/chan (a/dropping-buffer 10))

;; Use `sliding-buffer` to drop oldest values when the buffer is full:
(a/chan (a/sliding-buffer 10))

线程

在普通线程中,我们使用 >!!(阻塞 put)和 <!!(阻塞 take)通过通道进行通信。

(let [c (a/chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (a/close! c))

因为这些是阻塞调用,如果我们尝试在无缓冲通道上执行 put 操作,我们将阻塞主线程。我们可以使用 thread(类似于 future)在池线程中执行一个代码块并返回一个包含结果的通道。在这里,我们启动一个后台任务将“hello”放入通道,然后在当前线程中读取该值。

(let [c (a/chan)]
  (a/thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (a/close! c))

Go 块和 IOC 线程

go 宏在特殊的线程池中异步执行其代码块。会阻塞的通道操作将暂停执行而不是阻塞线程。此机制封装了在事件/回调系统中外部的反转控制。在 go 块内部,我们使用 >!(put)和 <!(take)。

在这里,我们将之前的通道示例转换为使用 go 块

(let [c (a/chan)]
  (a/go (>! c "hello"))
  (assert (= "hello" (<!! (a/go (<! c)))))
  (a/close! c))

我们没有使用显式的线程和阻塞调用,而是为生产者使用了 go 块。消费者使用 go 块进行 take,然后返回一个结果通道,我们从该通道执行阻塞 take 操作。

Alts

通道相对于队列的一个杀手级特性是能够同时等待多个通道(类似于套接字 select)。这是通过 alts!!(普通线程)或 go 块中的 alts! 完成的。

我们可以创建一个使用 alts 的后台线程,该线程组合两个通道中的任何一个的输入。alts!! 获取要执行的一组操作 - 从中获取的通道或要放入的 [通道值],并返回成功的值(put 操作为 nil)和通道

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/thread (while true
              (let [[v ch] (a/alts!! [c1 c2])]
                (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

打印(在标准输出上,可能在你的 repl 中不可见)

Read hi from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
Read there from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]

我们可以使用 alts! 用 go 块做同样的事情

(let [c1 (a/chan)
      c2 (a/chan)]
  (a/go (while true
          (let [[v ch] (a/alts! [c1 c2])]
            (println "Read" v "from" ch))))
  (a/go (>! c1 "hi"))
  (a/go (>! c2 "there")))

由于 go 块是与线程无关的轻量级进程,因此我们可以拥有很多 go 块!在这里,我们创建了 1000 个 go 块,它们在 1000 个通道上说 hi。我们使用 alts!! 在它们准备好时读取它们。

(let [n 1000
      cs (repeatedly n a/chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (a/go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (a/alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeout 创建一个等待指定毫秒数然后关闭的通道

(let [t (a/timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

我们可以将 timeout 与 alts! 结合使用以执行定时通道等待。在这里,我们等待 100 毫秒以查看通道上是否有值到达,然后放弃

(let [c (a/chan)
      begin (System/currentTimeMillis)]
  (a/alts!! [c (a/timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))

更多信息

有关更多信息,请参阅以下内容

原文作者:Alex Miller