檢視原始碼 Task (Elixir v1.16.2)
產生和等待任務的便利功能。
任務是專門在生命週期中執行特定動作的程序,通常與其他程序幾乎沒有或沒有任何通訊。任務最常見的用例是透過非同步計算值,將順序碼轉換為並行碼
task = Task.async(fn -> do_some_work() end)
res = do_some_other_work()
res + Task.await(task)
使用 async
產生的任務可以由呼叫程序 (且僅限呼叫程序) 等待,如上例所示。它們是透過產生一個程序來實作,該程序在執行指定的運算後,會傳送訊息給呼叫程序。
與使用 spawn/1
啟動的純粹程序相比,任務包含監控元資料,並在發生錯誤時記錄。
除了 async/1
和 await/2
之外,任務也可以作為監督樹狀結構的一部分啟動,並在遠端節點上動態產生。我們將在接下來探討這些場景。
async 和 await
任務的常見用途之一是使用 Task.async/1
將順序碼轉換為並行碼,同時保留其語意。呼叫時,將建立一個新的程序,由呼叫程序連結和監控。任務動作完成後,將傳送一則訊息給呼叫程序,其中包含結果。
Task.await/2
用於讀取任務傳送的訊息。
使用 async
時,有兩件重要事項需要考量
如果您正在使用非同步任務,您必須等待回覆,因為它們總是會傳送。如果您不期待回覆,請考慮使用
Task.start_link/1
,如下所述。非同步任務會連結呼叫程序和產生的程序。這表示,如果呼叫程序發生故障,任務也會發生故障,反之亦然。這是故意的:如果要接收結果的程序不再存在,則完成運算沒有任何意義。如果您不希望這樣,您會想要使用後續部分中描述的受監督任務。
任務是程序
任務是程序,因此資料需要完全複製給它們。以下程式碼為例
large_data = fetch_large_data()
task = Task.async(fn -> do_some_work(large_data) end)
res = do_some_other_work()
res + Task.await(task)
以上程式碼複製了所有 large_data
,這可能會消耗資源,具體取決於資料大小。有兩種方法可以解決這個問題。
首先,如果你只需要存取 large_data
的一部分,請考慮在任務之前將其提取出來
large_data = fetch_large_data()
subset_data = large_data.some_field
task = Task.async(fn -> do_some_work(subset_data) end)
或者,如果你可以將資料載入完全移至任務,那可能會更好
task = Task.async(fn ->
large_data = fetch_large_data()
do_some_work(large_data)
end)
動態監督任務
Task.Supervisor
模組允許開發人員動態建立多個監督任務。
一個簡短的範例是
{:ok, pid} = Task.Supervisor.start_link()
task =
Task.Supervisor.async(pid, fn ->
# Do something
end)
Task.await(task)
但是,在大部分情況下,你會想要將任務監督器加入你的監督樹
Supervisor.start_link([
{Task.Supervisor, name: MyApp.TaskSupervisor}
], strategy: :one_for_one)
現在,你可以使用 async/await,方法是傳遞監督器的名稱,而不是 pid
Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
# Do something
end)
|> Task.await()
我們鼓勵開發人員盡可能依賴監督任務。監督任務可以提高可見度,了解在特定時間點有多少任務正在執行,並啟用各種模式,讓你能夠明確控制如何處理結果、錯誤和逾時。以下是摘要
使用
Task.Supervisor.start_child/2
允許你在不關心其結果或是否成功完成時啟動一個 fire-and-forget 任務。使用
Task.Supervisor.async/2
+Task.await/2
允許你同時執行任務並取得其結果。如果任務失敗,呼叫者也會失敗。使用
Task.Supervisor.async_nolink/2
+Task.yield/2
+Task.shutdown/2
允許你同時執行任務並在給定的時間範圍內取得其結果或失敗原因。如果任務失敗,呼叫者不會失敗。你會在yield
或shutdown
上收到錯誤原因。
此外,當您的應用程式關閉時,監督程式保證所有任務會在可設定的關閉期間內終止。有關支援的作業的詳細資訊,請參閱 Task.Supervisor
模組。
分散式任務
使用 Task.Supervisor
,可以輕鬆地在節點間動態啟動任務
# On the remote node named :remote@local
Task.Supervisor.start_link(name: MyApp.DistSupervisor)
# On the client
supervisor = {MyApp.DistSupervisor, :remote@local}
Task.Supervisor.async(supervisor, MyMod, :my_fun, [arg1, arg2, arg3])
請注意,在使用分散式任務時,應使用 Task.Supervisor.async/5
函式,該函式預期明確的模組、函式和引數,而不是使用 Task.Supervisor.async/3
(使用匿名函式)。這是因為匿名函式預期在所有相關節點上都存在相同的模組版本。請查看 Agent
模組文件,以取得有關分散式程序的更多資訊,因為其中描述的限制適用於整個生態系統。
靜態監督的任務
Task
模組實作 child_spec/1
函式,這允許它直接在常規 Supervisor
(而不是 Task.Supervisor
)下啟動,方法是傳遞一個包含要執行的函式的元組
Supervisor.start_link([
{Task, fn -> :some_work end}
], strategy: :one_for_one)
當您需要在設定監督樹時執行一些步驟時,這通常很有用。例如:預熱快取、記錄初始化狀態等。
如果您不希望將 Task 程式碼直接放在 Supervisor
下,您可以將 Task
包裝在自己的模組中,類似於使用 GenServer
或 Agent
的方式
defmodule MyTask do
use Task
def start_link(arg) do
Task.start_link(__MODULE__, :run, [arg])
end
def run(arg) do
# ...
end
end
然後將它傳遞給監督程式
Supervisor.start_link([
{MyTask, arg}
], strategy: :one_for_one)
由於這些任務受到監督,且與呼叫者沒有直接關聯,因此無法等待它們。預設情況下,函式 Task.start/1
和 Task.start_link/1
適用於 fire-and-forget 任務,您不必在意結果或它是否成功完成。
使用 Task
當你
use Task
時,Task
模組會定義一個child_spec/1
函式,因此你的模組可以用作監督樹中的子模組。
use Task
定義一個 child_spec/1
函式,允許定義的模組置於監督樹之下。產生的 child_spec/1
可以使用下列選項自訂
:id
- 子規格識別碼,預設為目前的模組:restart
- 子模組應該重新啟動的時機,預設為:temporary
:shutdown
- 如何關閉子模組,可以立即關閉或給予時間讓它關閉
與 GenServer
、Agent
和 Supervisor
相反,Task 的預設 :restart
為 :temporary
。這表示即使任務崩潰,任務也不會重新啟動。如果你希望任務在非成功退出時重新啟動,請執行
use Task, restart: :transient
如果你希望任務總是重新啟動
use Task, restart: :permanent
請參閱 Supervisor
模組中的「子規格」章節,以取得更詳細的資訊。緊接在 use Task
之前的 @doc
註解會附加到產生的 child_spec/1
函式。
祖先和呼叫者追蹤
每當你啟動一個新程序時,Elixir 會透過程序字典中的 $ancestors
鍵為該程序的父程序加上註解。這通常用於追蹤監督樹內的階層。
例如,我們建議開發人員總是啟動一個監督者下的任務。這提供了更高的可見性,並允許你在節點關閉時控制這些任務如何終止。這可能看起來像 Task.Supervisor.start_child(MySupervisor, task_function)
。這表示,儘管你的程式碼是呼叫任務的程式碼,但任務的實際祖先是監督者,因為監督者才是實際啟動任務的程式碼。
為了追蹤你的程式碼和任務之間的關係,我們使用程序字典中的 $callers
鍵。因此,假設上述 Task.Supervisor
呼叫,我們有
[your code] -- calls --> [supervisor] ---- spawns --> [task]
這表示我們儲存下列關係
[your code] [supervisor] <-- ancestor -- [task]
^ |
|--------------------- caller ---------------------|
目前程序呼叫者的清單可從程序字典中透過 Process.get(:"$callers")
擷取。這將會傳回 nil
或清單 [pid_n, ..., pid2, pid1]
,其中至少有一個項目,pid_n
是呼叫目前程序的 PID,pid2
呼叫 pid_n
,而 pid2
則是由 pid1
呼叫。
如果任務發生異常,呼叫者欄位會包含在 :callers
鍵下的記錄訊息中繼資料中。
摘要
函式
任務結構。
啟動必須等待的任務。
啟動必須等待的任務。
傳回一個串流,在 enumerable
中的每個元素上並行執行給定的函式 fun
。
傳回一個串流,其中給定的函式 (module
和 function_name
) 會並行對應到 enumerable
中的每個元素。
等待任務回覆並傳回。
等待多個任務的回覆並傳回。
傳回在監督者下啟動任務的規格。
啟動一個任務,並立即以給定的 result
完成。
忽略現有的任務。
取消連結並關閉任務,然後檢查是否有回覆。
啟動任務。
使用給定的 fun
作為監督樹的一部分啟動任務。
使用給定的 module
、function
和 args
作為監督樹的一部分啟動任務。
暫時封鎖呼叫者程序,等待任務回覆。
在給定的時間間隔內讓出多個任務。
類型
函數
任務結構。
它包含這些欄位
啟動必須等待的任務。
fun
必須是一個零元數匿名函數。此函數會產生一個與呼叫者程序連結並受其監視的程序。會傳回一個包含相關資訊的 Task
結構。
如果您啟動一個 async
,您必須等待。這是透過呼叫 Task.await/2
或 Task.yield/2
,然後在傳回的任務上呼叫 Task.shutdown/2
來完成的。或者,如果您在 GenServer
內產生一個任務,那麼 GenServer
將自動為您等待,並使用任務回應和關聯的 :DOWN
訊息呼叫 GenServer.handle_info/2
。
閱讀 Task
模組文件,以取得有關非同步任務一般用法的更多資訊。
連結
此函數會產生一個與呼叫者程序連結並受其監視的程序。連結的部分很重要,因為如果父程序死亡,它會中止任務。它還保證在加入非同步呼叫後,async/await 之前的程式碼具有相同的屬性。例如,想像您有這個
x = heavy_fun()
y = some_fun()
x + y
現在您想讓 heavy_fun()
非同步
x = Task.async(&heavy_fun/0)
y = some_fun()
Task.await(x) + y
和以前一樣,如果 heavy_fun/0
失敗,整個運算都會失敗,包括呼叫者程序。如果您不希望任務失敗,則必須以與沒有非同步呼叫時相同的方式變更 heavy_fun/0
程式碼。例如,傳回 {:ok, val} | :error
結果或在更極端的案例中,使用 try/rescue
。換句話說,非同步任務應被視為呼叫者程序的延伸,而不是將其與所有錯誤隔離的機制。
如果您不想將呼叫者連結到任務,則必須使用 Task.Supervisor
的受監督任務並呼叫 Task.Supervisor.async_nolink/2
。
無論如何,避免下列任何一種情況
設定
:trap_exit
為true
- 攔截退出應該只在特殊情況下使用,因為它會讓你的程序不僅對任務退出免疫,也對任何其他程序退出免疫。此外,即使攔截退出,呼叫
await
仍會在任務終止且未傳回其結果時退出。解除連結以
async
/await
啟動的任務程序。如果你解除連結程序,而任務不屬於任何監督者,那麼呼叫者程序死亡時,你可能會留下未完成的任務。
元資料
使用此函數建立的任務會在 :mfa
元資料欄位中儲存 :erlang.apply/2
,它會在內部用來套用匿名函數。如果你想要使用另一個函數作為元資料,請使用 async/3
。
啟動必須等待的任務。
類似於 async/1
,但要啟動的函數是由指定的 module
、function_name
和 args
指定。出於反映目的,module
、function_name
和其元數會儲存在 :mfa
欄位中,作為一個元組。
@spec async_stream(Enumerable.t(), (term() -> term()), keyword()) :: Enumerable.t()
傳回一個串流,在 enumerable
中的每個元素上並行執行給定的函式 fun
。
作用與 async_stream/5
相同,但使用匿名函數,而不是模組函數引數元組。 fun
必須是一個元數的匿名函數。
每個 enumerable
元素都會傳遞為引數給指定的函數 fun
,並由其自己的任務處理。這些任務會連結到呼叫者程序,類似於 async/1
。
範例
非同步計算每個字串中的碼點,然後使用 reduce 將這些計數加總。
iex> strings = ["long string", "longer string", "there are many of these"]
iex> stream = Task.async_stream(strings, fn text -> text |> String.codepoints() |> Enum.count() end)
iex> Enum.reduce(stream, 0, fn {:ok, num}, acc -> num + acc end)
47
請參閱 async_stream/5
以了解討論、選項和更多範例。
@spec async_stream(Enumerable.t(), module(), atom(), [term()], keyword()) :: Enumerable.t()
傳回一個串流,其中給定的函式 (module
和 function_name
) 會並行對應到 enumerable
中的每個元素。
enumerable
的每個元素都會預先加入給定的 args
,並由其自己的任務處理。這些任務會連結到中間程序,然後再連結到呼叫程序。這表示任務失敗會終止呼叫程序,而呼叫程序失敗會終止所有任務。
串流時,每個任務會在成功完成時發出 {:ok, value}
,或是在呼叫程序捕捉退出時發出 {:exit, reason}
。使用 :zip_input_on_exit
選項,可以讓退出有 {:exit, {element, reason}}
。結果的順序取決於 :ordered
選項的值。
可以透過選項控制並發程度和任務允許執行的時間(請參閱下方的「選項」區段)。
考慮使用 Task.Supervisor.async_stream/6
在監督程序下啟動任務。如果你發現自己捕捉退出以確保任務中的錯誤不會終止呼叫程序,請考慮使用 Task.Supervisor.async_stream_nolink/6
啟動未連結到呼叫程序的任務。
選項
:max_concurrency
- 設定同時執行的任務最大數量。預設為System.schedulers_online/0
。:ordered
- 是否應以與輸入串流相同的順序傳回結果。當輸出有順序時,Elixir 可能需要緩衝結果才能以原始順序發出。將此選項設定為 false 會停用緩衝,但代價是移除順序。當你只將任務用於副作用時,這也很有用。請注意,不論:ordered
設定為何,任務都會非同步處理。如果你需要依序處理元素,請考慮改用Enum.map/2
或Enum.each/2
。:timeout
- 每個任務允許執行的最長時間(以毫秒或:infinity
表示)。預設為5000
。:on_timeout
- 當任務逾時時要執行的動作。可能的數值為:exit
(預設) - 呼叫者(產生任務的程序)會結束。:kill_task
- 逾時的任務會被終止。該任務發出的值為{:exit, :timeout}
。
:zip_input_on_exit
-(自 v1.14.0 起)將原始輸入新增至:exit
元組。該任務發出的值為{:exit, {input, reason}}
,其中input
是在處理過程中導致離開的集合元素。預設為false
。
範例
讓我們建立一個串流,然後列舉它
stream = Task.async_stream(collection, Mod, :expensive_fun, [])
Enum.to_list(stream)
可以使用 :max_concurrency
選項增加或減少並行性。例如,如果任務是 IO 密集型的,則可以增加值
max_concurrency = System.schedulers_online() * 2
stream = Task.async_stream(collection, Mod, :expensive_fun, [], max_concurrency: max_concurrency)
Enum.to_list(stream)
如果您不關心運算結果,可以使用 Stream.run/1
執行串流。此外,設定 ordered: false
,因為您也不關心結果的順序
stream = Task.async_stream(collection, Mod, :expensive_fun, [], ordered: false)
Stream.run(stream)
第一個完成的非同步任務
您也可以使用 async_stream/3
執行 M 個任務,並找出 N 個完成的任務。例如
[
&heavy_call_1/0,
&heavy_call_2/0,
&heavy_call_3/0
]
|> Task.async_stream(fn fun -> fun.() end, ordered: false, max_concurrency: 3)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.take(2)
在上面的範例中,我們執行三個任務,並等待前兩個完成。我們使用 Stream.filter/2
將自己限制在僅完成的任務,然後使用 Enum.take/2
擷取 N 個項目。請注意,設定 ordered: false
和 max_concurrency: M
(其中 M 是任務數)非常重要,以確保所有呼叫同時執行。
注意:非同步 + 取用未繫結
如果您可能需要處理大量項目,並只保留部分結果,您可能會處理比預期更多的項目。讓我們看一個範例
1..100
|> Task.async_stream(fn i ->
Process.sleep(100)
IO.puts(to_string(i))
end)
|> Enum.take(10)
在具有 8 個核心的機器上執行上面的範例將處理 16 個項目,即使您只想要 10 個元素,因為 async_stream/3
會同時處理項目。這是因為它會一次處理 8 個元素。然後所有 8 個元素幾乎同時完成,導致 8 個元素被啟動處理。在這些額外的 8 個元素中,只有 2 個會被使用,其餘的會被終止。
根據問題,您可以過濾或限制元素數量
1..100
|> Stream.take(10)
|> Task.async_stream(fn i ->
Process.sleep(100)
IO.puts(to_string(i))
end)
|> Enum.to_list()
在其他情況下,您可能想要調整 :max_concurrency
來限制可能過度處理的元素數量,但會降低並行性。您也可以設定要取用的元素數量為 :max_concurrency
的倍數。例如,在上述範例中設定 max_concurrency: 5
。
等待任務回覆並傳回。
如果任務處理程序中斷,呼叫程序將會因為與任務相同的理由而中斷。
可以提供逾時時間(以毫秒或 :infinity
表示),預設值為 5000
。如果超過逾時時間,呼叫程序將會中斷。如果任務處理程序連結到呼叫程序(當任務以 async
啟動時),任務處理程序也會中斷。如果任務處理程序正在攔截中斷或未連結到呼叫程序,它將會繼續執行。
此函式假設任務的監視器仍然處於活動狀態,或監視器的 :DOWN
訊息在訊息佇列中。如果已經解除監視,或已經收到訊息,此函式將會等待逾時時間以等待訊息。
此函式只能針對任何給定的任務呼叫一次。如果您想要多次檢查長時間執行的任務是否已完成運算,請改用 yield/2
。
範例
iex> task = Task.async(fn -> 1 + 1 end)
iex> Task.await(task)
2
與 OTP 行為相容
不建議在 OTP 行為(例如 GenServer
)中 await
長時間執行的任務。您應該在 GenServer.handle_info/2
回呼中比對來自任務的訊息。
GenServer 會在 handle_info/2
上收到兩則訊息
{ref, result}
- 回覆訊息,其中ref
是task.ref
傳回的監視器參考,而result
是任務結果{:DOWN, ref, :process, pid, reason}
- 由於所有任務也受到監視,您也會收到Process.monitor/1
傳送的:DOWN
訊息。如果您收到沒有回覆的:DOWN
訊息,表示任務已中斷
另一個需要考慮的事項是,由 Task.async/1
啟動的任務始終連結到其呼叫者,您可能不希望如果任務中斷,GenServer 也會中斷。因此,最好改在 OTP 行為中使用 Task.Supervisor.async_nolink/3
。為了完整起見,以下是啟動任務並處理其結果的 GenServer 範例
defmodule GenServerTaskExample do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, :ok, opts)
end
def init(_opts) do
# We will keep all running tasks in a map
{:ok, %{tasks: %{}}}
end
# Imagine we invoke a task from the GenServer to access a URL...
def handle_call(:some_message, _from, state) do
url = ...
task = Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn -> fetch_url(url) end)
# After we start the task, we store its reference and the url it is fetching
state = put_in(state.tasks[task.ref], url)
{:reply, :ok, state}
end
# If the task succeeds...
def handle_info({ref, result}, state) do
# The task succeed so we can demonitor its reference
Process.demonitor(ref, [:flush])
{url, state} = pop_in(state.tasks[ref])
IO.puts "Got #{inspect(result)} for URL #{inspect url}"
{:noreply, state}
end
# If the task fails...
def handle_info({:DOWN, ref, _, _, reason}, state) do
{url, state} = pop_in(state.tasks[ref])
IO.puts "URL #{inspect url} failed with reason #{inspect(reason)}"
{:noreply, state}
end
end
定義伺服器後,您需要在您的監督樹中啟動上述任務監督器和 GenServer
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor},
{GenServerTaskExample, name: MyApp.GenServerTaskExample}
]
Supervisor.start_link(children, strategy: :one_for_one)
等待多個任務的回覆並傳回。
此函式接收一串工作,並在指定時間間隔內等待回覆。它會傳回結果串列,順序與 tasks
輸入引數中提供的任務相同。
如果任何工作程序死亡,呼叫程序將會以與該工作相同的理由結束。
逾時時間可以是毫秒數或 :infinity
,預設值為 5000
。如果超過逾時時間,呼叫程序將會結束。任何連結到呼叫程序的工作程序(也就是使用 async
啟動工作時的情況)也會結束。任何捕捉結束或未連結到呼叫程序的工作程序將會繼續執行。
此函式假設工作監視器仍處於活動狀態,或監視器的 :DOWN
訊息在訊息佇列中。如果任何工作已被取消監視,或訊息已接收,此函式將會等待逾時時間的持續時間。
此函式只能針對任何特定工作呼叫一次。如果您想要多次檢查長時間執行的工作是否已完成運算,請改用 yield_many/2
。
與 OTP 行為相容
不建議在 GenServer
等 OTP 行為中 await
長時間執行的工作。有關更多資訊,請參閱 await/2
。
範例
iex> tasks = [
...> Task.async(fn -> 1 + 1 end),
...> Task.async(fn -> 2 + 3 end)
...> ]
iex> Task.await_many(tasks)
[2, 5]
@spec child_spec(term()) :: Supervisor.child_spec()
傳回在監督者下啟動任務的規格。
arg
傳遞為 Task.start_link/1
的引數,在規格的 :start
欄位中。
有關更多資訊,請參閱 Supervisor
模組、Supervisor.child_spec/2
函式和 Supervisor.child_spec/0
型別。
啟動一個任務,並立即以給定的 result
完成。
與 async/1
不同,此任務不會產生連結的程序。它可以像其他任何任務一樣等待或讓出。
用法
在某些情況下,建立一個「已完成」的任務很有用,它代表一個已經執行並產生結果的任務。例如,在處理資料時,你可能可以在將資料傳送出去做進一步處理之前,就判斷出某些輸入無效
def process(data) do
tasks =
for entry <- data do
if invalid_input?(entry) do
Task.completed({:error, :invalid_input})
else
Task.async(fn -> further_process(entry) end)
end
end
Task.await_many(tasks)
end
在許多情況下,可以避免使用 Task.completed/1
,而直接回傳結果。通常只有在處理混合非同步時,才需要這個變體,也就是一組輸入會部分同步處理,部分非同步處理。
忽略現有的任務。
這表示任務會繼續執行,但它會取消連結,你無法再讓出、等待或關閉它。
如果在忽略任務之前收到回覆,則回傳 {:ok, reply}
;如果任務在忽略之前已結束,則回傳 {:exit, reason}
;否則回傳 nil
。
重要:避免使用 Task.async/1,3
,然後立即忽略任務。如果你要啟動任務而不關心其結果,請改用 Task.Supervisor.start_child/2
。
取消連結並關閉任務,然後檢查是否有回覆。
如果在關閉任務時收到回覆,則回傳 {:ok, reply}
;如果任務已結束,則回傳 {:exit, reason}
;否則回傳 nil
。關閉後,你無法再等待或讓出它。
第二個參數是逾時或 :brutal_kill
。如果逾時,會傳送 :shutdown
結束訊號給任務程序,如果它沒有在逾時內結束,則會將它終止。使用 :brutal_kill
會直接終止任務。如果任務異常終止(可能是被另一個程序終止),此函式會以相同原因結束。
在終止呼叫端時,不需要呼叫此函式,除非以 :normal
原因結束,或任務正在攔截結束。如果呼叫端以 :normal
以外的原因結束,且任務沒有攔截結束,呼叫端的結束訊號會停止任務。呼叫端可以用 :shutdown
原因結束,以關閉所有連結的程序,包括沒有攔截結束的任務,而不會產生任何記錄訊息。
如果任務沒有連結的程序,例如由 Task.completed/1
啟動的任務,我們會檢查回應或錯誤,但不會關閉程序。
如果任務的監視器已解除監視或已收到,且訊息佇列中沒有等待的回應,此函式會傳回 {:exit, :noproc}
,因為無法判斷退出原因。
啟動任務。
fun
必須是零元匿名函式。
這只應在任務用於副作用(例如 I/O)且您對其結果或是否成功完成不感興趣時使用。
如果當前節點關閉,即使任務尚未完成,節點也會終止。因此,我們建議改用 Task.Supervisor.start_child/2
,它允許您透過 :shutdown
選項控制關閉時間。
啟動任務。
這只應在任務用於副作用(例如 I/O)且您對其結果或是否成功完成不感興趣時使用。
如果當前節點關閉,即使任務尚未完成,節點也會終止。因此,我們建議改用 Task.Supervisor.start_child/2
,它允許您透過 :shutdown
選項控制關閉時間。
使用給定的 fun
作為監督樹的一部分啟動任務。
fun
必須是零元匿名函式。
這用於在監督樹下啟動靜態監督的任務。
使用給定的 module
、function
和 args
作為監督樹的一部分啟動任務。
這用於在監督樹下啟動靜態監督的任務。
暫時封鎖呼叫者程序,等待任務回覆。
如果收到回應,傳回 {:ok, reply}
;如果沒有收到回應,傳回 nil
;如果任務已退出,傳回 {:exit, reason}
。請記住,通常任務失敗也會導致擁有該任務的程序退出。因此,如果符合下列至少一個條件,此函式可能會傳回 {:exit, reason}
- 任務程序以原因
:normal
退出 - 任務未連結到呼叫端(任務以
Task.Supervisor.async_nolink/2
或Task.Supervisor.async_nolink/4
啟動) - 呼叫端正在攔截退出
可以給定逾時時間(以毫秒為單位或 :infinity
),預設值為 5000
。如果在收到任務訊息之前時間用盡,此函式會傳回 nil
,且監視器將保持作用中。因此,可以在同一個任務上多次呼叫 yield/2
。
此函式假設任務的監視器仍處於作用中,或監視器的 :DOWN
訊息在訊息佇列中。如果已解除監視或已收到訊息,此函式會等待逾時時間以等待訊息。
如果您打算在任務未在 timeout
毫秒內回應時關閉任務,您應該將此與 shutdown/1
串連,如下所示
case Task.yield(task, timeout) || Task.shutdown(task) do
{:ok, result} ->
result
nil ->
Logger.warning("Failed to get a result in #{timeout}ms")
nil
end
如果您打算檢查任務,但在逾時後讓它繼續執行,您可以將此與 ignore/1
串連,如下所示
case Task.yield(task, timeout) || Task.ignore(task) do
{:ok, result} ->
result
nil ->
Logger.warning("Failed to get a result in #{timeout}ms")
nil
end
這可確保如果任務在 timeout
之後但在 shutdown/1
被呼叫之前完成,您仍會取得結果,因為 shutdown/1
被設計為處理此案例並傳回結果。
@spec yield_many([t()], timeout()) :: [{t(), {:ok, term()} | {:exit, term()} | nil}]
@spec yield_many([t()], limit: pos_integer(), timeout: timeout(), on_timeout: :nothing | :ignore | :kill_task ) :: [{t(), {:ok, term()} | {:exit, term()} | nil}]
在給定的時間間隔內讓出多個任務。
此函式接收任務清單,並在給定的時間間隔內等待其回應。它傳回一個由兩元素組成的元組清單,任務為第一個元素,產生的結果為第二個元素。傳回清單中的任務將與 tasks
輸入參數中提供的任務順序相同。
類似於 yield/2
,每個任務的結果將會是
{:ok, term}
如果任務已在給定的時間區間內成功回報其結果{:exit, reason}
如果任務已終止nil
如果任務持續執行,可能是因為已達到限制或超過逾時時間
查看 yield/2
以取得更多資訊。
範例
Task.yield_many/2
允許開發人員產生多個任務並在給定的時間範圍內擷取收到的結果。如果我們將它與 Task.shutdown/2
(或 Task.ignore/1
)結合使用,它允許我們收集那些結果並取消(或忽略)那些未及時回覆的任務。
讓我們看一個範例。
tasks =
for i <- 1..10 do
Task.async(fn ->
Process.sleep(i * 1000)
i
end)
end
tasks_with_results = Task.yield_many(tasks, timeout: 5000)
results =
Enum.map(tasks_with_results, fn {task, res} ->
# Shut down the tasks that did not reply nor exit
res || Task.shutdown(task, :brutal_kill)
end)
# Here we are matching only on {:ok, value} and
# ignoring {:exit, _} (crashed tasks) and `nil` (no replies)
for {:ok, value} <- results do
IO.inspect(value)
end
在上面的範例中,我們建立從 1 到 10 秒的睡眠任務,並傳回它們睡眠的秒數。如果你一次執行所有程式碼,你應該會看到 1 到 5 被印出,因為那些是已在給定時間內回覆的任務。所有其他任務都將使用 Task.shutdown/2
呼叫關閉。
作為一種便利,你可以透過指定 :on_timeout
選項為 :kill_task
(或 :ignore
)來達成類似於上述的行為。如果你希望在逾時時退出呼叫者程序,請參閱 Task.await_many/2
。
選項
第二個參數是逾時或選項,預設為此
:limit
- 要等待的最大任務數。如果在逾時之前達到限制,此函式會立即傳回,而不會觸發:on_timeout
行為:timeout
- 每個任務允許執行的最長時間(以毫秒或:infinity
表示)。預設為5000
。:on_timeout
- 當任務逾時時要執行的動作。可能的數值為:nothing
- 不做任何事(預設)。任務仍然可以在稍後等待、產生、忽略或關閉。:ignore
- 任務的結果將會被忽略。:kill_task
- 逾時的任務會被終止。