Clojure

并发编程

当今的系统必须处理许多同时进行的任务,并利用多核 CPU 的强大功能。使用线程来实现这一点可能非常困难,因为同步会带来很多复杂性。Clojure 通过多种方式简化了多线程编程。由于核心数据结构是不可变的,因此它们可以轻松地在线程之间共享。但是,程序中通常需要发生状态更改。Clojure 作为一种实用的语言,允许状态更改,但提供了机制来确保在状态更改时保持一致性,同时使开发人员不必使用锁等手动避免冲突。通过 软件事务内存系统 (STM)(通过 dosyncrefref-setalter 等公开),支持以同步协调的方式在线程之间共享更改状态。 代理系统支持以异步独立的方式在线程之间共享更改状态。 原子系统支持以同步独立的方式在线程之间共享更改状态。通过 动态变量系统(通过 defbinding 等公开),支持在线程内隔离更改状态。

在所有情况下,Clojure 并没有替换 Java 线程系统,而是与之协同工作。Clojure 函数是 java.util.concurrent.Callable,因此它们可以与 Executor 框架等一起使用。

大部分内容在 并发屏幕录像中进行了更详细的介绍。

Refs 是对对象的可变引用。它们可以在事务期间 ref-setalter 为不同的对象,这些事务以 dosync 块分隔。事务中的所有 ref 修改要么全部发生,要么全部不发生。此外,事务中对 ref 的读取反映了特定时间点引用世界的快照,即每个事务都与其他事务隔离。如果两个尝试修改相同引用的事务之间发生冲突,其中一个事务将被重试。所有这些都在没有显式锁定的情况下发生。

在此示例中,创建了一个包含整数的 Refs 向量(refs),然后设置了一组线程(pool)来运行多次迭代以递增每个 Ref(tasks)。这会产生极大的争用,但会产生正确的结果。无需锁!

(import '(java.util.concurrent Executors))

(defn test-stm [nitems nthreads niters]
  (let [refs  (map ref (repeat nitems 0))
        pool  (Executors/newFixedThreadPool nthreads)
        tasks (map (fn [t]
                      (fn []
                        (dotimes [n niters]
                          (dosync
                            (doseq [r refs]
                              (alter r + 1 t))))))
                   (range nthreads))]
    (doseq [future (.invokeAll pool tasks)]
      (.get future))
    (.shutdown pool)
    (map deref refs)))

(test-stm 10 10 10000)
-> (550000 550000 550000 550000 550000 550000 550000 550000 550000 550000)

在典型用法中,refs 可以引用 Clojure 集合,由于这些集合是持久且不可变的,因此可以有效地支持多个事务同时进行的投机“修改”。不应将可变对象放入 refs 中。

默认情况下,Vars 是静态的,但是使用 元数据 定义的 Vars 的每个线程绑定将其标记为动态。 动态变量 也是对对象的可变引用。它们具有一个(线程共享的)根绑定,可以通过 def 建立,并且可以使用*set!* 设置,但前提是它们已使用 binding 在线程本地绑定到新的存储位置。这些绑定以及随后对这些绑定的任何修改将仅在线程内由绑定块动态范围内的代码看到。嵌套绑定遵循堆栈协议,并在控制退出绑定块时展开。

(def ^:dynamic *v*)

(defn incv [n] (set! *v* (+ *v* n)))

(defn test-vars [nthreads niters]
  (let [pool (Executors/newFixedThreadPool nthreads)
        tasks (map (fn [t]
                     #(binding [*v* 0]
                        (dotimes [n niters]
                          (incv t))
                        *v*))
                   (range nthreads))]
      (let [ret (.invokeAll pool tasks)]
        (.shutdown pool)
        (map #(.get %) ret))))

(test-vars 10 1000000)
-> (0 1000000 2000000 3000000 4000000 5000000 6000000 7000000 8000000 9000000)
(set! *v* 4)
-> java.lang.IllegalStateException: Can't change/establish root binding of: *v* with set

动态变量提供了一种在调用栈上的不同点之间进行通信的方法,而无需污染中间调用的参数列表和返回值。此外,动态变量支持一种面向上下文的编程风格。由于使用 defn 定义的 fns 存储在 vars 中,因此它们也可以动态重新绑定。

(defn ^:dynamic say [& args]
  (apply str args))

(defn loves [x y]
  (say x " loves " y))

(defn test-rebind []
  (println (loves "ricky" "lucy"))
  (let [say-orig say]
    (binding [say (fn [& args]
                      (println "Logging say")
                      (apply say-orig args))]
      (println (loves "fred" "ethel")))))

(test-rebind)

ricky loves lucy
Logging say
fred loves ethel