Clojure

Агенты и асинхронные действия

Содержание

Как и ссылки, агенты предоставляют общий доступ к изменяемому состоянию. Ссылки поддерживают согласованные, синхронные изменения множества хранилищ. Агенты обеспечивают независимое, асинхронное изменение отдельных хранилищ. Агенты привязаны к одному хранилищу в течении всей его жизни и позволяет проводить изменения хранилища (в новое состояние) в виде действий. Действия - это функции (возможно с аргументами), которые асинхронно применяются к состоянию агента и чьё возвращаемое значение становится новым состоянием агента. Так как действия являются функциями, они также могут быть мультиметодами, а следовательно действия потенциально полиморфны. Также, так как множество функций является открытым, то множество действий, поддерживаемых агентами также открыто, что резко контрастирует с циклами обработки сообщений предоставляемым другими языками программирования.

Агенты Clojure реактивны, не автономны - нет никакого императивного цикла сообщений и нет блокировок. Состояние агента само должно быть неизменяемым (предпочтительно экземпляром одной из коллекций Clojure) и состояние агента всегда немедленно становится доступно для чтения любым другим потоком (при помощи функции deref или макроса считывателя @) без каких бы то ни было сообщений, то есть отслеживание не требует согласования или координации.

Отправка действий агенту происходит в форме (send agent fn args*). sendsend-off) всегда немедленно завершаются. В какой-то момент позже, в другом потоке, произойдет следующее:

  1. Данная функция fn будет применена к состоянию агента и аргументам, если они предоставлены.

  2. Значение, возвращенное fn будет передано в функцию-валидатор, если она установлена для агента. Более подробно см. set-validator!.

  3. Если валидатор завершится успешно или его не существует, возвращенное значение данной функции fn станет новым состоянием агента.

  4. Если какие-либо наблюдатели были зарегистрированы для агента, они будут вызваны. Болле подробно см. add-watch.

  5. Если во время выполнения функции осуществлятся любые другие вызовы - напрямую или любым другим способом, они будут ждать, пока состояние агента не изменится.

Если действующая функция сгенерирует какое-либо исключение-ошибку, она будет сохранено агентом. Когда у агента имеется сохраненная ошибка, все последующие взаимодействия будут немедленно завершаться с исключением, пока список сохраненных ошибок в агенте не будет очищен. Ошибки агента могут быть просмотрены с помощью agent-error, агент может быть перезапущен с помощью restart-agent.

Действия всех агентов выполняются вперемешку в пуле потоков. В любой момент времени выполняется не более одного действия для каждого из агентов. Действия, переданные агенту из другого агента или потока будут обрабатываться в том порядке, в котором они были посланы. Эти действия могут быть перемешаны с другими действиями, посланными тому же агенту из других источников. Функция send должна использоваться для действий-вычислений. Функция send-off подходит для действий, которые связаны с блокирующими операциями ввода вывода.

Агенты интегрированы с STM (программной транзакционной памятью) - любые изменения, сделанные внутри транзакции, ожидают её окончания и отменяются, если транзакция отменяется или запускается заново.

При этом не требуется никакого пользовательского кода, как и везде при поддержке многопоточности в Clojure.

Заметим, что использование агентов запускает в фоне множество потоков не-демонов, которые будут предотвращать завершение работы JVM. Используйте shutdown-agents чтобы остановить работу таких потоков и позволить виртуальной машине отключиться.

Пример

Этот пример пересылает сообщение по кругу. Создается набор из N агентов, а затем последовательность из M действий посылается в первый агент и передается далее.

(defn relay [x i]
  (when (:next x)
    (send (:next x) relay i))
  (when (and (zero? i) (:report-queue x))
    (.put (:report-queue x) i))
  x)

(defn run [m n]
  (let [q (new java.util.concurrent.SynchronousQueue)
        hd (reduce (fn [next _] (agent {:next next}))
                   (agent {:report-queue q}) (range (dec m)))]
    (doseq [i (reverse (range n))]
      (send hd relay i))
    (.take q)))

; 1 million message sends:
(time (run 1000 1000))
->"Elapsed time: 2959.254 msecs"

Функции

Создать агент: agent

Просмотреть агент: deref (см. также макрос считывателя @) agent-error error-handler error-mode

Изменить состояние агента: send send-off restart-agent

Блокирующее ожидание на агенте: await await-for

Валидаторы: set-validator! get-validator

Наблюдатели: add-watch remove-watch

Управление потоками: shutdown-agents