檢視原始碼 Task (Elixir v1.16.2)

產生和等待任務的便利功能。

任務是專門在生命週期中執行特定動作的程序,通常與其他程序幾乎沒有或沒有任何通訊。任務最常見的用例是透過非同步計算值,將順序碼轉換為並行碼

task = Task.async(fn -> do_some_work() end)
res = do_some_other_work()
res + Task.await(task)

使用 async 產生的任務可以由呼叫程序 (且僅限呼叫程序) 等待,如上例所示。它們是透過產生一個程序來實作,該程序在執行指定的運算後,會傳送訊息給呼叫程序。

與使用 spawn/1 啟動的純粹程序相比,任務包含監控元資料,並在發生錯誤時記錄。

除了 async/1await/2 之外,任務也可以作為監督樹狀結構的一部分啟動,並在遠端節點上動態產生。我們將在接下來探討這些場景。

async 和 await

任務的常見用途之一是使用 Task.async/1 將順序碼轉換為並行碼,同時保留其語意。呼叫時,將建立一個新的程序,由呼叫程序連結和監控。任務動作完成後,將傳送一則訊息給呼叫程序,其中包含結果。

Task.await/2 用於讀取任務傳送的訊息。

使用 async 時,有兩件重要事項需要考量

  1. 如果您正在使用非同步任務,您必須等待回覆,因為它們總是會傳送。如果您不期待回覆,請考慮使用 Task.start_link/1,如下所述。

  2. 非同步任務會連結呼叫程序和產生的程序。這表示,如果呼叫程序發生故障,任務也會發生故障,反之亦然。這是故意的:如果要接收結果的程序不再存在,則完成運算沒有任何意義。如果您不希望這樣,您會想要使用後續部分中描述的受監督任務。

任務是程序

任務是程序,因此資料需要完全複製給它們。以下程式碼為例

large_data = fetch_large_data()
task = Task.async(fn -> do_some_work(large_data) end)
res = do_some_other_work()
res + Task.await(task)

以上程式碼複製了所有 large_data,這可能會消耗資源,具體取決於資料大小。有兩種方法可以解決這個問題。

首先,如果你只需要存取 large_data 的一部分,請考慮在任務之前將其提取出來

large_data = fetch_large_data()
subset_data = large_data.some_field
task = Task.async(fn -> do_some_work(subset_data) end)

或者,如果你可以將資料載入完全移至任務,那可能會更好

task = Task.async(fn ->
  large_data = fetch_large_data()
  do_some_work(large_data)
end)

動態監督任務

Task.Supervisor 模組允許開發人員動態建立多個監督任務。

一個簡短的範例是

{:ok, pid} = Task.Supervisor.start_link()

task =
  Task.Supervisor.async(pid, fn ->
    # Do something
  end)

Task.await(task)

但是,在大部分情況下,你會想要將任務監督器加入你的監督樹

Supervisor.start_link([
  {Task.Supervisor, name: MyApp.TaskSupervisor}
], strategy: :one_for_one)

現在,你可以使用 async/await,方法是傳遞監督器的名稱,而不是 pid

Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
  # Do something
end)
|> Task.await()

我們鼓勵開發人員盡可能依賴監督任務。監督任務可以提高可見度,了解在特定時間點有多少任務正在執行,並啟用各種模式,讓你能夠明確控制如何處理結果、錯誤和逾時。以下是摘要

此外,當您的應用程式關閉時,監督程式保證所有任務會在可設定的關閉期間內終止。有關支援的作業的詳細資訊,請參閱 Task.Supervisor 模組。

分散式任務

使用 Task.Supervisor,可以輕鬆地在節點間動態啟動任務

# On the remote node named :remote@local
Task.Supervisor.start_link(name: MyApp.DistSupervisor)

# On the client
supervisor = {MyApp.DistSupervisor, :remote@local}
Task.Supervisor.async(supervisor, MyMod, :my_fun, [arg1, arg2, arg3])

請注意,在使用分散式任務時,應使用 Task.Supervisor.async/5 函式,該函式預期明確的模組、函式和引數,而不是使用 Task.Supervisor.async/3(使用匿名函式)。這是因為匿名函式預期在所有相關節點上都存在相同的模組版本。請查看 Agent 模組文件,以取得有關分散式程序的更多資訊,因為其中描述的限制適用於整個生態系統。

靜態監督的任務

Task 模組實作 child_spec/1 函式,這允許它直接在常規 Supervisor(而不是 Task.Supervisor)下啟動,方法是傳遞一個包含要執行的函式的元組

Supervisor.start_link([
  {Task, fn -> :some_work end}
], strategy: :one_for_one)

當您需要在設定監督樹時執行一些步驟時,這通常很有用。例如:預熱快取、記錄初始化狀態等。

如果您不希望將 Task 程式碼直接放在 Supervisor 下,您可以將 Task 包裝在自己的模組中,類似於使用 GenServerAgent 的方式

defmodule MyTask do
  use Task

  def start_link(arg) do
    Task.start_link(__MODULE__, :run, [arg])
  end

  def run(arg) do
    # ...
  end
end

然後將它傳遞給監督程式

Supervisor.start_link([
  {MyTask, arg}
], strategy: :one_for_one)

由於這些任務受到監督,且與呼叫者沒有直接關聯,因此無法等待它們。預設情況下,函式 Task.start/1Task.start_link/1 適用於 fire-and-forget 任務,您不必在意結果或它是否成功完成。

使用 Task

當你 use Task 時,Task 模組會定義一個 child_spec/1 函式,因此你的模組可以用作監督樹中的子模組。

use Task 定義一個 child_spec/1 函式,允許定義的模組置於監督樹之下。產生的 child_spec/1 可以使用下列選項自訂

  • :id - 子規格識別碼,預設為目前的模組
  • :restart - 子模組應該重新啟動的時機,預設為 :temporary
  • :shutdown - 如何關閉子模組,可以立即關閉或給予時間讓它關閉

GenServerAgentSupervisor 相反,Task 的預設 :restart:temporary。這表示即使任務崩潰,任務也不會重新啟動。如果你希望任務在非成功退出時重新啟動,請執行

use Task, restart: :transient

如果你希望任務總是重新啟動

use Task, restart: :permanent

請參閱 Supervisor 模組中的「子規格」章節,以取得更詳細的資訊。緊接在 use Task 之前的 @doc 註解會附加到產生的 child_spec/1 函式。

祖先和呼叫者追蹤

每當你啟動一個新程序時,Elixir 會透過程序字典中的 $ancestors 鍵為該程序的父程序加上註解。這通常用於追蹤監督樹內的階層。

例如,我們建議開發人員總是啟動一個監督者下的任務。這提供了更高的可見性,並允許你在節點關閉時控制這些任務如何終止。這可能看起來像 Task.Supervisor.start_child(MySupervisor, task_function)。這表示,儘管你的程式碼是呼叫任務的程式碼,但任務的實際祖先是監督者,因為監督者才是實際啟動任務的程式碼。

為了追蹤你的程式碼和任務之間的關係,我們使用程序字典中的 $callers 鍵。因此,假設上述 Task.Supervisor 呼叫,我們有

[your code] -- calls --> [supervisor] ---- spawns --> [task]

這表示我們儲存下列關係

[your code]              [supervisor] <-- ancestor -- [task]
    ^                                                  |
    |--------------------- caller ---------------------|

目前程序呼叫者的清單可從程序字典中透過 Process.get(:"$callers") 擷取。這將會傳回 nil 或清單 [pid_n, ..., pid2, pid1],其中至少有一個項目,pid_n 是呼叫目前程序的 PID,pid2 呼叫 pid_n,而 pid2 則是由 pid1 呼叫。

如果任務發生異常,呼叫者欄位會包含在 :callers 鍵下的記錄訊息中繼資料中。

摘要

類型

任務不透明參考。

t()

任務類型。

函式

任務結構。

啟動必須等待的任務。

啟動必須等待的任務。

傳回一個串流,在 enumerable 中的每個元素上並行執行給定的函式 fun

傳回一個串流,其中給定的函式 (modulefunction_name) 會並行對應到 enumerable 中的每個元素。

等待任務回覆並傳回。

等待多個任務的回覆並傳回。

傳回在監督者下啟動任務的規格。

啟動一個任務,並立即以給定的 result 完成。

忽略現有的任務。

取消連結並關閉任務,然後檢查是否有回覆。

啟動任務。

使用給定的 fun 作為監督樹的一部分啟動任務。

使用給定的 modulefunctionargs 作為監督樹的一部分啟動任務。

暫時封鎖呼叫者程序,等待任務回覆。

在給定的時間間隔內讓出多個任務。

類型

@opaque ref()

任務不透明參考。

@type t() :: %Task{mfa: mfa(), owner: pid(), pid: pid() | nil, ref: ref()}

任務類型。

請參閱 %Task{} 以取得結構中每個欄位的資訊。

函數

任務結構。

它包含這些欄位

  • :mfa - 一個三元素元組,包含在 async/1async/3 中啟動任務的模組、函數名稱和元數

  • :owner - 啟動任務的程序的 PID

  • :pid - 任務程序的 PID;如果沒有特別指定給任務的程序,則為 nil

  • :ref - 用作任務監視器參考的不透明術語

@spec async((-> any())) :: t()

啟動必須等待的任務。

fun 必須是一個零元數匿名函數。此函數會產生一個與呼叫者程序連結並受其監視的程序。會傳回一個包含相關資訊的 Task 結構。

如果您啟動一個 async,您必須等待。這是透過呼叫 Task.await/2Task.yield/2,然後在傳回的任務上呼叫 Task.shutdown/2 來完成的。或者,如果您在 GenServer 內產生一個任務,那麼 GenServer 將自動為您等待,並使用任務回應和關聯的 :DOWN 訊息呼叫 GenServer.handle_info/2

閱讀 Task 模組文件,以取得有關非同步任務一般用法的更多資訊。

連結

此函數會產生一個與呼叫者程序連結並受其監視的程序。連結的部分很重要,因為如果父程序死亡,它會中止任務。它還保證在加入非同步呼叫後,async/await 之前的程式碼具有相同的屬性。例如,想像您有這個

x = heavy_fun()
y = some_fun()
x + y

現在您想讓 heavy_fun() 非同步

x = Task.async(&heavy_fun/0)
y = some_fun()
Task.await(x) + y

和以前一樣,如果 heavy_fun/0 失敗,整個運算都會失敗,包括呼叫者程序。如果您不希望任務失敗,則必須以與沒有非同步呼叫時相同的方式變更 heavy_fun/0 程式碼。例如,傳回 {:ok, val} | :error 結果或在更極端的案例中,使用 try/rescue。換句話說,非同步任務應被視為呼叫者程序的延伸,而不是將其與所有錯誤隔離的機制。

如果您不想將呼叫者連結到任務,則必須使用 Task.Supervisor 的受監督任務並呼叫 Task.Supervisor.async_nolink/2

無論如何,避免下列任何一種情況

  • 設定 :trap_exittrue - 攔截退出應該只在特殊情況下使用,因為它會讓你的程序不僅對任務退出免疫,也對任何其他程序退出免疫。

    此外,即使攔截退出,呼叫 await 仍會在任務終止且未傳回其結果時退出。

  • 解除連結以 async/await 啟動的任務程序。如果你解除連結程序,而任務不屬於任何監督者,那麼呼叫者程序死亡時,你可能會留下未完成的任務。

元資料

使用此函數建立的任務會在 :mfa 元資料欄位中儲存 :erlang.apply/2,它會在內部用來套用匿名函數。如果你想要使用另一個函數作為元資料,請使用 async/3

連結到此函數

async(module, function_name, args)

檢視原始碼
@spec async(module(), atom(), [term()]) :: t()

啟動必須等待的任務。

類似於 async/1,但要啟動的函數是由指定的 modulefunction_nameargs 指定。出於反映目的,modulefunction_name 和其元數會儲存在 :mfa 欄位中,作為一個元組。

連結到此函數

async_stream(enumerable, fun, options \\ [])

檢視原始碼 (自 1.4.0 起)
@spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()

傳回一個串流,在 enumerable 中的每個元素上並行執行給定的函式 fun

作用與 async_stream/5 相同,但使用匿名函數,而不是模組函數引數元組。 fun 必須是一個元數的匿名函數。

每個 enumerable 元素都會傳遞為引數給指定的函數 fun,並由其自己的任務處理。這些任務會連結到呼叫者程序,類似於 async/1

範例

非同步計算每個字串中的碼點,然後使用 reduce 將這些計數加總。

iex> strings = ["long string", "longer string", "there are many of these"]
iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints() |> Enum.count() end)
iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end)
47

請參閱 async_stream/5 以了解討論、選項和更多範例。

連結到此函數

async_stream(enumerable, module, function_name, args, options \\ [])

檢視原始碼 (自 1.4.0 起)
@spec async_stream(Enumerable.t(), module(), atom(), [term()], keyword()) ::
  Enumerable.t()

傳回一個串流,其中給定的函式 (modulefunction_name) 會並行對應到 enumerable 中的每個元素。

enumerable 的每個元素都會預先加入給定的 args,並由其自己的任務處理。這些任務會連結到中間程序,然後再連結到呼叫程序。這表示任務失敗會終止呼叫程序,而呼叫程序失敗會終止所有任務。

串流時,每個任務會在成功完成時發出 {:ok, value},或是在呼叫程序捕捉退出時發出 {:exit, reason}。使用 :zip_input_on_exit 選項,可以讓退出有 {:exit, {element, reason}}。結果的順序取決於 :ordered 選項的值。

可以透過選項控制並發程度和任務允許執行的時間(請參閱下方的「選項」區段)。

考慮使用 Task.Supervisor.async_stream/6 在監督程序下啟動任務。如果你發現自己捕捉退出以確保任務中的錯誤不會終止呼叫程序,請考慮使用 Task.Supervisor.async_stream_nolink/6 啟動未連結到呼叫程序的任務。

選項

  • :max_concurrency - 設定同時執行的任務最大數量。預設為 System.schedulers_online/0

  • :ordered - 是否應以與輸入串流相同的順序傳回結果。當輸出有順序時,Elixir 可能需要緩衝結果才能以原始順序發出。將此選項設定為 false 會停用緩衝,但代價是移除順序。當你只將任務用於副作用時,這也很有用。請注意,不論 :ordered 設定為何,任務都會非同步處理。如果你需要依序處理元素,請考慮改用 Enum.map/2Enum.each/2

  • :timeout - 每個任務允許執行的最長時間(以毫秒或 :infinity 表示)。預設為 5000

  • :on_timeout - 當任務逾時時要執行的動作。可能的數值為

    • :exit(預設) - 呼叫者(產生任務的程序)會結束。
    • :kill_task - 逾時的任務會被終止。該任務發出的值為 {:exit, :timeout}
  • :zip_input_on_exit -(自 v1.14.0 起)將原始輸入新增至 :exit 元組。該任務發出的值為 {:exit, {input, reason}},其中 input 是在處理過程中導致離開的集合元素。預設為 false

範例

讓我們建立一個串流,然後列舉它

stream = Task.async_stream(collection, Mod, :expensive_fun, [])
Enum.to_list(stream)

可以使用 :max_concurrency 選項增加或減少並行性。例如,如果任務是 IO 密集型的,則可以增加值

max_concurrency = System.schedulers_online() * 2
stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
Enum.to_list(stream)

如果您不關心運算結果,可以使用 Stream.run/1 執行串流。此外,設定 ordered: false,因為您也不關心結果的順序

stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false)
Stream.run(stream)

第一個完成的非同步任務

您也可以使用 async_stream/3 執行 M 個任務,並找出 N 個完成的任務。例如

[
  &heavy_call_1/0,
  &heavy_call_2/0,
  &heavy_call_3/0
]
|> Task.async_stream(fn fun -> fun.() end, ordered: false, max_concurrency: 3)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.take(2)

在上面的範例中,我們執行三個任務,並等待前兩個完成。我們使用 Stream.filter/2 將自己限制在僅完成的任務,然後使用 Enum.take/2 擷取 N 個項目。請注意,設定 ordered: falsemax_concurrency: M(其中 M 是任務數)非常重要,以確保所有呼叫同時執行。

注意:非同步 + 取用未繫結

如果您可能需要處理大量項目,並只保留部分結果,您可能會處理比預期更多的項目。讓我們看一個範例

1..100
|> Task.async_stream(fn i ->
  Process.sleep(100)
  IO.puts(to_string(i))
end)
|> Enum.take(10)

在具有 8 個核心的機器上執行上面的範例將處理 16 個項目,即使您只想要 10 個元素,因為 async_stream/3 會同時處理項目。這是因為它會一次處理 8 個元素。然後所有 8 個元素幾乎同時完成,導致 8 個元素被啟動處理。在這些額外的 8 個元素中,只有 2 個會被使用,其餘的會被終止。

根據問題,您可以過濾或限制元素數量

1..100
|> Stream.take(10)
|> Task.async_stream(fn i ->
  Process.sleep(100)
  IO.puts(to_string(i))
end)
|> Enum.to_list()

在其他情況下,您可能想要調整 :max_concurrency 來限制可能過度處理的元素數量,但會降低並行性。您也可以設定要取用的元素數量為 :max_concurrency 的倍數。例如,在上述範例中設定 max_concurrency: 5

連結到此函數

await(task, timeout \\ 5000)

檢視原始碼
@spec await(t(), timeout()) :: term()

等待任務回覆並傳回。

如果任務處理程序中斷,呼叫程序將會因為與任務相同的理由而中斷。

可以提供逾時時間(以毫秒或 :infinity 表示),預設值為 5000。如果超過逾時時間,呼叫程序將會中斷。如果任務處理程序連結到呼叫程序(當任務以 async 啟動時),任務處理程序也會中斷。如果任務處理程序正在攔截中斷或未連結到呼叫程序,它將會繼續執行。

此函式假設任務的監視器仍然處於活動狀態,或監視器的 :DOWN 訊息在訊息佇列中。如果已經解除監視,或已經收到訊息,此函式將會等待逾時時間以等待訊息。

此函式只能針對任何給定的任務呼叫一次。如果您想要多次檢查長時間執行的任務是否已完成運算,請改用 yield/2

範例

iex> task = Task.async(fn -> 1 + 1 end)
iex> Task.await(task)
2

與 OTP 行為相容

不建議在 OTP 行為(例如 GenServer)中 await 長時間執行的任務。您應該在 GenServer.handle_info/2 回呼中比對來自任務的訊息。

GenServer 會在 handle_info/2 上收到兩則訊息

  • {ref, result} - 回覆訊息,其中 reftask.ref 傳回的監視器參考,而 result 是任務結果

  • {:DOWN, ref, :process, pid, reason} - 由於所有任務也受到監視,您也會收到 Process.monitor/1 傳送的 :DOWN 訊息。如果您收到沒有回覆的 :DOWN 訊息,表示任務已中斷

另一個需要考慮的事項是,由 Task.async/1 啟動的任務始終連結到其呼叫者,您可能不希望如果任務中斷,GenServer 也會中斷。因此,最好改在 OTP 行為中使用 Task.Supervisor.async_nolink/3。為了完整起見,以下是啟動任務並處理其結果的 GenServer 範例

defmodule GenServerTaskExample do
  use GenServer

  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  def init(_opts) do
    # We will keep all running tasks in a map
    {:ok, %{tasks: %{}}}
  end

  # Imagine we invoke a task from the GenServer to access a URL...
  def handle_call(:some_message, _from, state) do
    url = ...
    task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> fetch_url(url) end)

    # After we start the task, we store its reference and the url it is fetching
    state = put_in(state.tasks[task.ref], url)

    {:reply, :ok, state}
  end

  # If the task succeeds...
  def handle_info({ref, result}, state) do
    # The task succeed so we can demonitor its reference
    Process.demonitor(ref, [:flush])

    {url, state} = pop_in(state.tasks[ref])
    IO.puts "Got #{inspect(result)} for URL #{inspect url}"
    {:noreply, state}
  end

  # If the task fails...
  def handle_info({:DOWN, ref, _, _, reason}, state) do
    {url, state} = pop_in(state.tasks[ref])
    IO.puts "URL #{inspect url} failed with reason #{inspect(reason)}"
    {:noreply, state}
  end
end

定義伺服器後,您需要在您的監督樹中啟動上述任務監督器和 GenServer

children = [
  {Task.Supervisor, name: MyApp.TaskSupervisor},
  {GenServerTaskExample, name: MyApp.GenServerTaskExample}
]

Supervisor.start_link(children, strategy: :one_for_one)
連結到此函數

await_many(tasks, timeout \\ 5000)

檢視原始碼 (自 1.11.0 起)
@spec await_many([t()], timeout()) :: [term()]

等待多個任務的回覆並傳回。

此函式接收一串工作,並在指定時間間隔內等待回覆。它會傳回結果串列,順序與 tasks 輸入引數中提供的任務相同。

如果任何工作程序死亡,呼叫程序將會以與該工作相同的理由結束。

逾時時間可以是毫秒數或 :infinity,預設值為 5000。如果超過逾時時間,呼叫程序將會結束。任何連結到呼叫程序的工作程序(也就是使用 async 啟動工作時的情況)也會結束。任何捕捉結束或未連結到呼叫程序的工作程序將會繼續執行。

此函式假設工作監視器仍處於活動狀態,或監視器的 :DOWN 訊息在訊息佇列中。如果任何工作已被取消監視,或訊息已接收,此函式將會等待逾時時間的持續時間。

此函式只能針對任何特定工作呼叫一次。如果您想要多次檢查長時間執行的工作是否已完成運算,請改用 yield_many/2

與 OTP 行為相容

不建議在 GenServer 等 OTP 行為中 await 長時間執行的工作。有關更多資訊,請參閱 await/2

範例

iex> tasks = [
...>   Task.async(fn -> 1 + 1 end),
...>   Task.async(fn -> 2 + 3 end)
...> ]
iex> Task.await_many(tasks)
[2, 5]
連結到此函數

child_spec(arg)

檢視原始碼 (自 1.5.0 起)
@spec child_spec(term()) :: Supervisor.child_spec()

傳回在監督者下啟動任務的規格。

arg 傳遞為 Task.start_link/1 的引數,在規格的 :start 欄位中。

有關更多資訊,請參閱 Supervisor 模組、Supervisor.child_spec/2 函式和 Supervisor.child_spec/0 型別。

連結到此函數

completed(result)

檢視原始碼 (自 1.13.0 起)
@spec completed(any()) :: t()

啟動一個任務,並立即以給定的 result 完成。

async/1 不同,此任務不會產生連結的程序。它可以像其他任何任務一樣等待或讓出。

用法

在某些情況下,建立一個「已完成」的任務很有用,它代表一個已經執行並產生結果的任務。例如,在處理資料時,你可能可以在將資料傳送出去做進一步處理之前,就判斷出某些輸入無效

def process(data) do
  tasks =
    for entry <- data do
      if invalid_input?(entry) do
        Task.completed({:error, :invalid_input})
      else
        Task.async(fn -> further_process(entry) end)
      end
    end

  Task.await_many(tasks)
end

在許多情況下,可以避免使用 Task.completed/1,而直接回傳結果。通常只有在處理混合非同步時,才需要這個變體,也就是一組輸入會部分同步處理,部分非同步處理。

連結到此函數

ignore(task)

檢視原始碼 (自 1.13.0 起)
@spec ignore(t()) :: {:ok, term()} | {:exit, term()} | nil

忽略現有的任務。

這表示任務會繼續執行,但它會取消連結,你無法再讓出、等待或關閉它。

如果在忽略任務之前收到回覆,則回傳 {:ok, reply};如果任務在忽略之前已結束,則回傳 {:exit, reason};否則回傳 nil

重要:避免使用 Task.async/1,3,然後立即忽略任務。如果你要啟動任務而不關心其結果,請改用 Task.Supervisor.start_child/2

連結到此函數

shutdown(task, shutdown \\ 5000)

檢視原始碼
@spec shutdown(t(), timeout() | :brutal_kill) :: {:ok, term()} | {:exit, term()} | nil

取消連結並關閉任務,然後檢查是否有回覆。

如果在關閉任務時收到回覆,則回傳 {:ok, reply};如果任務已結束,則回傳 {:exit, reason};否則回傳 nil。關閉後,你無法再等待或讓出它。

第二個參數是逾時或 :brutal_kill。如果逾時,會傳送 :shutdown 結束訊號給任務程序,如果它沒有在逾時內結束,則會將它終止。使用 :brutal_kill 會直接終止任務。如果任務異常終止(可能是被另一個程序終止),此函式會以相同原因結束。

在終止呼叫端時,不需要呼叫此函式,除非以 :normal 原因結束,或任務正在攔截結束。如果呼叫端以 :normal 以外的原因結束,且任務沒有攔截結束,呼叫端的結束訊號會停止任務。呼叫端可以用 :shutdown 原因結束,以關閉所有連結的程序,包括沒有攔截結束的任務,而不會產生任何記錄訊息。

如果任務沒有連結的程序,例如由 Task.completed/1 啟動的任務,我們會檢查回應或錯誤,但不會關閉程序。

如果任務的監視器已解除監視或已收到,且訊息佇列中沒有等待的回應,此函式會傳回 {:exit, :noproc},因為無法判斷退出原因。

@spec start((-> any())) :: {:ok, pid()}

啟動任務。

fun 必須是零元匿名函式。

這只應在任務用於副作用(例如 I/O)且您對其結果或是否成功完成不感興趣時使用。

如果當前節點關閉,即使任務尚未完成,節點也會終止。因此,我們建議改用 Task.Supervisor.start_child/2,它允許您透過 :shutdown 選項控制關閉時間。

連結到此函數

start(module, function_name, args)

檢視原始碼
@spec start(module(), atom(), [term()]) :: {:ok, pid()}

啟動任務。

這只應在任務用於副作用(例如 I/O)且您對其結果或是否成功完成不感興趣時使用。

如果當前節點關閉,即使任務尚未完成,節點也會終止。因此,我們建議改用 Task.Supervisor.start_child/2,它允許您透過 :shutdown 選項控制關閉時間。

@spec start_link((-> any())) :: {:ok, pid()}

使用給定的 fun 作為監督樹的一部分啟動任務。

fun 必須是零元匿名函式。

這用於在監督樹下啟動靜態監督的任務。

連結到此函數

start_link(module, function, args)

檢視原始碼
@spec start_link(module(), atom(), [term()]) :: {:ok, pid()}

使用給定的 modulefunctionargs 作為監督樹的一部分啟動任務。

這用於在監督樹下啟動靜態監督的任務。

連結到此函數

yield(task, timeout \\ 5000)

檢視原始碼
@spec yield(t(), timeout()) :: {:ok, term()} | {:exit, term()} | nil

暫時封鎖呼叫者程序,等待任務回覆。

如果收到回應,傳回 {:ok, reply};如果沒有收到回應,傳回 nil;如果任務已退出,傳回 {:exit, reason}。請記住,通常任務失敗也會導致擁有該任務的程序退出。因此,如果符合下列至少一個條件,此函式可能會傳回 {:exit, reason}

可以給定逾時時間(以毫秒為單位或 :infinity),預設值為 5000。如果在收到任務訊息之前時間用盡,此函式會傳回 nil,且監視器將保持作用中。因此,可以在同一個任務上多次呼叫 yield/2

此函式假設任務的監視器仍處於作用中,或監視器的 :DOWN 訊息在訊息佇列中。如果已解除監視或已收到訊息,此函式會等待逾時時間以等待訊息。

如果您打算在任務未在 timeout 毫秒內回應時關閉任務,您應該將此與 shutdown/1 串連,如下所示

case Task.yield(task, timeout) || Task.shutdown(task) do
  {:ok, result} ->
    result

  nil ->
    Logger.warning("Failed to get a result in #{timeout}ms")
    nil
end

如果您打算檢查任務,但在逾時後讓它繼續執行,您可以將此與 ignore/1 串連,如下所示

case Task.yield(task, timeout) || Task.ignore(task) do
  {:ok, result} ->
    result

  nil ->
    Logger.warning("Failed to get a result in #{timeout}ms")
    nil
end

這可確保如果任務在 timeout 之後但在 shutdown/1 被呼叫之前完成,您仍會取得結果,因為 shutdown/1 被設計為處理此案例並傳回結果。

連結到此函數

yield_many(tasks, opts \\ [])

檢視原始碼
@spec yield_many([t()], timeout()) :: [{t(), {:ok, term()} | {:exit, term()} | nil}]
@spec yield_many([t()],
  limit: pos_integer(),
  timeout: timeout(),
  on_timeout: :nothing | :ignore | :kill_task
) :: [{t(), {:ok, term()} | {:exit, term()} | nil}]

在給定的時間間隔內讓出多個任務。

此函式接收任務清單,並在給定的時間間隔內等待其回應。它傳回一個由兩元素組成的元組清單,任務為第一個元素,產生的結果為第二個元素。傳回清單中的任務將與 tasks 輸入參數中提供的任務順序相同。

類似於 yield/2,每個任務的結果將會是

  • {:ok, term} 如果任務已在給定的時間區間內成功回報其結果
  • {:exit, reason} 如果任務已終止
  • nil 如果任務持續執行,可能是因為已達到限制或超過逾時時間

查看 yield/2 以取得更多資訊。

範例

Task.yield_many/2 允許開發人員產生多個任務並在給定的時間範圍內擷取收到的結果。如果我們將它與 Task.shutdown/2(或 Task.ignore/1)結合使用,它允許我們收集那些結果並取消(或忽略)那些未及時回覆的任務。

讓我們看一個範例。

tasks =
  for i <- 1..10 do
    Task.async(fn ->
      Process.sleep(i * 1000)
      i
    end)
  end

tasks_with_results = Task.yield_many(tasks, timeout: 5000)

results =
  Enum.map(tasks_with_results, fn {task, res} ->
    # Shut down the tasks that did not reply nor exit
    res || Task.shutdown(task, :brutal_kill)
  end)

# Here we are matching only on {:ok, value} and
# ignoring {:exit, _} (crashed tasks) and `nil` (no replies)
for {:ok, value} <- results do
  IO.inspect(value)
end

在上面的範例中,我們建立從 1 到 10 秒的睡眠任務,並傳回它們睡眠的秒數。如果你一次執行所有程式碼,你應該會看到 1 到 5 被印出,因為那些是已在給定時間內回覆的任務。所有其他任務都將使用 Task.shutdown/2 呼叫關閉。

作為一種便利,你可以透過指定 :on_timeout 選項為 :kill_task(或 :ignore)來達成類似於上述的行為。如果你希望在逾時時退出呼叫者程序,請參閱 Task.await_many/2

選項

第二個參數是逾時或選項,預設為此

  • :limit - 要等待的最大任務數。如果在逾時之前達到限制,此函式會立即傳回,而不會觸發 :on_timeout 行為

  • :timeout - 每個任務允許執行的最長時間(以毫秒或 :infinity 表示)。預設為 5000

  • :on_timeout - 當任務逾時時要執行的動作。可能的數值為

    • :nothing - 不做任何事(預設)。任務仍然可以在稍後等待、產生、忽略或關閉。
    • :ignore - 任務的結果將會被忽略。
    • :kill_task - 逾時的任務會被終止。