檢視原始碼 Task.Supervisor (Elixir v1.16.2)

任務監督器。

此模組定義了一個可動態監督任務的監督器。

任務監督器在沒有子項目的情況下啟動,通常在監督器和名稱之下

children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

Supervisor.start_link(children, strategy: :one_for_one)

子項目規範中提供的選項記載於 start_link/1 中。

啟動後,您可以直接在監督器下啟動任務,例如

task = Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
  :do_some_work
end)

請參閱 Task 模組以取得更多範例。

可擴充性和分割

Task.Supervisor 是負責啟動其他程序的單一程序。在某些應用程式中, Task.Supervisor 可能會成為瓶頸。為了解決此問題,您可以啟動多個 Task.Supervisor 執行個體,然後選擇一個隨機執行個體來啟動任務。

取代

children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor}
]

Task.Supervisor.async(MyApp.TaskSupervisor, fn -> :do_some_work end)

您可以執行下列動作

children = [
  {PartitionSupervisor,
   child_spec: Task.Supervisor,
   name: MyApp.TaskSupervisors}
]

然後

Task.Supervisor.async(
  {:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
  fn -> :do_some_work end
)

在上述程式碼中,我們啟動一個分割監督器,預設會為您機器中的每個核心啟動一個動態監督器。接著,您不必透過名稱呼叫 Task.Supervisor,而是使用 {:via, PartitionSupervisor, {name, key}} 格式透過分割監督器呼叫,其中 name 是分割監督器名稱, key 是路由金鑰。我們選擇 self() 作為路由金鑰,這表示每個程序將會指派一個現有的任務監督器。請閱讀 PartitionSupervisor 文件以取得更多資訊。

名稱註冊

Task.SupervisorGenServer 繫結至相同的名稱註冊規則。在 GenServer 文件中進一步了解它們。

摘要

類型

start_link 使用的選項值

函式

啟動可以等待的任務。

啟動可以等待的任務。

傳回一個串流,在 enumerable 中的每個元素上並行執行給定的函式 fun

傳回一個串流,其中給定的函式 (modulefunction) 並行對應到 enumerable 中的每個元素。

傳回一個串流,在 enumerable 中的每個元素上並行執行給定的 function

傳回一個串流,其中給定的函式 (modulefunction) 並行對應到 enumerable 中的每個元素。

傳回所有子項 PID,但不包括正在重新啟動的子項。

將任務作為給定 supervisor 的子項啟動。

將任務作為給定 supervisor 的子項啟動。

啟動新的監督程式。

終止具有給定 pid 的子項。

類型

start_link 使用的選項值

函式

連結至這個函式

async(supervisor, fun, options \\ [])

檢視原始碼
@spec async(Supervisor.supervisor(), (-> any()), Keyword.t()) :: Task.t()

啟動可以等待的任務。

supervisor 必須是 Supervisor 中定義的參考。工作仍然會連結至呼叫者,詳情請參閱 Task.async/1,非連結變體請參閱 async_nolink/3

如果 supervisor 已達到最大子項數,則會引發錯誤。

選項

  • :shutdown - :brutal_kill 如果工作必須在關閉時直接終止,或表示逾時值(毫秒)的整數,預設為 5000 毫秒。工作必須攔截退出,逾時才有作用。
連結至這個函式

async(supervisor, module, fun, args, options \\ [])

檢視原始碼
@spec async(Supervisor.supervisor(), module(), atom(), [term()], Keyword.t()) ::
  Task.t()

啟動可以等待的任務。

supervisor 必須是 Supervisor 中定義的參考。工作仍然會連結至呼叫者,詳情請參閱 Task.async/1,非連結變體請參閱 async_nolink/3

如果 supervisor 已達到最大子項數,則會引發錯誤。

選項

  • :shutdown - :brutal_kill 如果工作必須在關閉時直接終止,或表示逾時值(毫秒)的整數,預設為 5000 毫秒。工作必須攔截退出,逾時才有作用。
連結至這個函式

async_nolink(supervisor, fun, options \\ [])

檢視原始碼
@spec async_nolink(Supervisor.supervisor(), (-> any()), Keyword.t()) :: Task.t()

啟動可以等待的任務。

supervisor 必須是 Supervisor 中定義的參考。工作不會連結至呼叫者,詳情請參閱 Task.async/1

如果 supervisor 已達到最大子項數,則會引發錯誤。

請注意,此函式要求工作監督者將 :temporary 作為 :restart 選項(預設值),因為 async_nolink/3 保留工作的一個直接參考,如果工作重新啟動,此參考會遺失。

選項

  • :shutdown - :brutal_kill 如果工作必須在關閉時直接終止,或表示逾時值(毫秒)的整數,預設為 5000 毫秒。工作必須攔截退出,逾時才有作用。

與 OTP 行為相容

如果您在 GenServer 等 OTP 行為中使用 async_nolink 建立工作,您應該在 GenServer.handle_info/2 回呼中比對來自工作的訊息。

工作傳送的回覆格式為 {ref, result},其中 ref 是工作結構保留的監視器參考,result 是工作函式的傳回值。

請記住,不論使用 async_nolink 建立的工作如何終止,呼叫者的程序總是會收到 :DOWN 訊息,其中 ref 值與工作結構保留的值相同。如果工作正常終止,:DOWN 訊息中的原因將會是 :normal

範例

通常,當合理預期任務可能會失敗,而且您不希望它中斷呼叫者時,您會使用 async_nolink/3。讓我們看一個 GenServer 用於執行單一任務並追蹤其狀態的範例

defmodule MyApp.Server do
  use GenServer

  # ...

  def start_task do
    GenServer.call(__MODULE__, :start_task)
  end

  # In this case the task is already running, so we just return :ok.
  def handle_call(:start_task, _from, %{ref: ref} = state) when is_reference(ref) do
    {:reply, :ok, state}
  end

  # The task is not running yet, so let's start it.
  def handle_call(:start_task, _from, %{ref: nil} = state) do
    task =
      Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
        ...
      end)

    # We return :ok and the server will continue running
    {:reply, :ok, %{state | ref: task.ref}}
  end

  # The task completed successfully
  def handle_info({ref, answer}, %{ref: ref} = state) do
    # We don't care about the DOWN message now, so let's demonitor and flush it
    Process.demonitor(ref, [:flush])
    # Do something with the result and then return
    {:noreply, %{state | ref: nil}}
  end

  # The task failed
  def handle_info({:DOWN, ref, :process, _pid, _reason}, %{ref: ref} = state) do
    # Log and possibly restart the task...
    {:noreply, %{state | ref: nil}}
  end
end
連結至這個函式

async_nolink(supervisor, module, fun, args, options \\ [])

檢視原始碼
@spec async_nolink(Supervisor.supervisor(), module(), atom(), [term()], Keyword.t()) ::
  Task.t()

啟動可以等待的任務。

supervisor 必須是 Supervisor 中定義的參考。工作不會連結至呼叫者,詳情請參閱 Task.async/1

如果 supervisor 已達到最大子項數,則會引發錯誤。

請注意,此函式需要任務監督程式具有 :temporary 作為 :restart 選項(預設值),因為 async_nolink/5 保留對任務的直接參照,如果任務重新啟動,則會遺失此參照。

連結至這個函式

async_stream(supervisor, enumerable, fun, options \\ [])

檢視原始碼 (自 1.4.0 起)
@spec async_stream(
  Supervisor.supervisor(),
  Enumerable.t(),
  (term() -> term()),
  keyword()
) ::
  Enumerable.t()

傳回一個串流,在 enumerable 中的每個元素上並行執行給定的函式 fun

enumerable 中的每個元素都作為引數傳遞給指定的函式 fun,並由其自己的任務處理。這些任務將在指定的 supervisor 下產生,並連結到呼叫者程序,類似於 async/3

請參閱 async_stream/6 以了解討論、選項和範例。

連結至這個函式

async_stream(supervisor, enumerable, module, function, args, options \\ [])

檢視原始碼 (自 1.4.0 起)
@spec async_stream(
  Supervisor.supervisor(),
  Enumerable.t(),
  module(),
  atom(),
  [term()],
  keyword()
) ::
  Enumerable.t()

傳回一個串流,其中給定的函式 (modulefunction) 並行對應到 enumerable 中的每個元素。

每個元素都將附加到指定的 args 之前,並由其自己的任務處理。這些任務將在指定的 supervisor 下產生,並連結到呼叫者程序,類似於 async/5

串流時,每個任務在成功完成時會發出 {:ok, value},如果呼叫者正在捕捉退出,則會發出 {:exit, reason}。結果的順序取決於 :ordered 選項的值。

可以透過選項控制並行處理的層級和任務允許執行的時間(請參閱以下「選項」區段)。

如果您發現自己正在捕捉退出以處理非同步串流中的退出,請考慮使用 async_stream_nolink/6 來啟動未連結到呼叫程序的任務。

選項

  • :max_concurrency - 設定同時執行的最大任務數。預設為 System.schedulers_online/0

  • :ordered - 是否應以與輸入串流相同的順序傳回結果。當您有大型串流且不希望在傳送結果前進行緩衝時,此選項很有用。當您將任務用於副作用時,這也很有用。預設為 true

  • :timeout - 在未收到任務回覆(在所有正在執行的任務中)的情況下等待的最大時間(以毫秒為單位)。預設為 5000

  • :on_timeout - 在任務逾時時要執行的動作。可能的值為

    • :exit(預設) - 產生任務的程序會結束。
    • :kill_task - 逾時的任務會被終止。為該任務發出的值為 {:exit, :timeout}
  • :zip_input_on_exit -(自 v1.14.0 起)將原始輸入新增至 :exit 元組。為該任務發出的值為 {:exit, {input, reason}},其中 input 是在處理期間導致結束的集合元素。預設為 false

  • :shutdown - :brutal_kill 如果必須在關閉時直接終止任務,或表示逾時值的整數。預設為 5000 毫秒。任務必須攔截結束才能讓逾時生效。

範例

讓我們建立一個串流,然後列舉它

stream = Task.Supervisor.async_stream(MySupervisor, collection, Mod, :expensive_fun, [])
Enum.to_list(stream)
連結至這個函式

async_stream_nolink(supervisor, enumerable, fun, options \\ [])

檢視原始碼 (自 1.4.0 起)
@spec async_stream_nolink(
  Supervisor.supervisor(),
  Enumerable.t(),
  (term() -> term()),
  keyword()
) ::
  Enumerable.t()

傳回一個串流,在 enumerable 中的每個元素上並行執行給定的 function

enumerable 中的每個元素都作為引數傳遞給指定的函式 fun,並由其自己的任務處理。這些任務將在指定的 supervisor 下產生,並且不會連結到呼叫程序,類似於 async_nolink/3

請參閱 async_stream/6 以取得討論和範例。

連結至這個函式

async_stream_nolink(supervisor, enumerable, module, function, args, options \\ [])

檢視原始碼 (自 1.4.0 起)
@spec async_stream_nolink(
  Supervisor.supervisor(),
  Enumerable.t(),
  module(),
  atom(),
  [term()],
  keyword()
) :: Enumerable.t()

傳回一個串流,其中給定的函式 (modulefunction) 並行對應到 enumerable 中的每個元素。

enumerable 中的每個元素都將加到指定的 args 之前,並由它自己的工作處理。這些工作將在指定的 supervisor 下產生,且不會連結到呼叫者程序,類似於 async_nolink/5

請參閱 async_stream/6 以了解討論、選項和範例。

@spec children(Supervisor.supervisor()) :: [pid()]

傳回所有子項 PID,但不包括正在重新啟動的子項。

請注意,在記憶體不足的情況下,監督大量子程序時呼叫此函數可能會導致記憶體不足例外狀況。

連結至這個函式

start_child(supervisor, fun, options \\ [])

檢視原始碼
@spec start_child(Supervisor.supervisor(), (-> any()), keyword()) ::
  DynamicSupervisor.on_start_child()

將任務作為給定 supervisor 的子項啟動。

Task.Supervisor.start_child(MyTaskSupervisor, fn ->
  IO.puts "I am running in a task"
end)

請注意,產生的程序不會連結到呼叫者,只會連結到監督者。如果工作需要執行副作用(例如 I/O),而您對其結果或是否成功完成不感興趣,則此命令很有用。

選項

  • :restart - 重新啟動策略,可能是 :temporary(預設值)、:transient:permanent:temporary 表示工作從不重新啟動,:transient 表示如果結束不是 :normal:shutdown{:shutdown, reason},則重新啟動。 :permanent 重新啟動策略表示總是重新啟動。

  • :shutdown - :brutal_kill 如果工作必須在關閉時直接終止,或表示逾時值的整數,預設為 5000 毫秒。工作必須攔截結束才能讓逾時生效。

連結至這個函式

start_child(supervisor, module, fun, args, options \\ [])

檢視原始碼

將任務作為給定 supervisor 的子項啟動。

類似於 start_child/3,但工作由指定的 modulefunargs 指定。

@spec start_link([option()]) :: Supervisor.on_start()

啟動新的監督程式。

範例

工作監督者通常使用元組格式在監督樹下啟動

{Task.Supervisor, name: MyApp.TaskSupervisor}

您也可以透過直接呼叫 start_link/1 來啟動它

Task.Supervisor.start_link(name: MyApp.TaskSupervisor)

但這只建議用於指令碼,應避免在生產程式碼中使用。一般來說,程序應始終在監督樹內啟動。

選項

  • :name - 用於註冊監督者名稱,支援的值說明在 GenServer 模組文件中的 名稱註冊 區段下;

  • :max_restarts:max_seconds:max_children - 如 DynamicSupervisor 中所指定;

此函數也可以接收 :restart:shutdown 作為選項,但這兩個選項已過時,現在建議將它們直接傳遞給 start_child

連結至這個函式

terminate_child(supervisor, pid)

檢視原始碼
@spec terminate_child(Supervisor.supervisor(), pid()) :: :ok | {:error, :not_found}

終止具有給定 pid 的子項。