檢視原始碼 Task.Supervisor (Elixir v1.16.2)
任務監督器。
此模組定義了一個可動態監督任務的監督器。
任務監督器在沒有子項目的情況下啟動,通常在監督器和名稱之下
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
Supervisor.start_link(children, strategy: :one_for_one)
子項目規範中提供的選項記載於 start_link/1
中。
啟動後,您可以直接在監督器下啟動任務,例如
task = Task.Supervisor.async(MyApp.TaskSupervisor, fn ->
:do_some_work
end)
請參閱 Task
模組以取得更多範例。
可擴充性和分割
Task.Supervisor
是負責啟動其他程序的單一程序。在某些應用程式中, Task.Supervisor
可能會成為瓶頸。為了解決此問題,您可以啟動多個 Task.Supervisor
執行個體,然後選擇一個隨機執行個體來啟動任務。
取代
children = [
{Task.Supervisor, name: MyApp.TaskSupervisor}
]
和
Task.Supervisor.async(MyApp.TaskSupervisor, fn -> :do_some_work end)
您可以執行下列動作
children = [
{PartitionSupervisor,
child_spec: Task.Supervisor,
name: MyApp.TaskSupervisors}
]
然後
Task.Supervisor.async(
{:via, PartitionSupervisor, {MyApp.TaskSupervisors, self()}},
fn -> :do_some_work end
)
在上述程式碼中,我們啟動一個分割監督器,預設會為您機器中的每個核心啟動一個動態監督器。接著,您不必透過名稱呼叫 Task.Supervisor
,而是使用 {:via, PartitionSupervisor, {name, key}}
格式透過分割監督器呼叫,其中 name
是分割監督器名稱, key
是路由金鑰。我們選擇 self()
作為路由金鑰,這表示每個程序將會指派一個現有的任務監督器。請閱讀 PartitionSupervisor
文件以取得更多資訊。
名稱註冊
Task.Supervisor
與 GenServer
繫結至相同的名稱註冊規則。在 GenServer
文件中進一步了解它們。
摘要
函式
啟動可以等待的任務。
啟動可以等待的任務。
傳回一個串流,在 enumerable
中的每個元素上並行執行給定的函式 fun
。
傳回一個串流,其中給定的函式 (module
和 function
) 並行對應到 enumerable
中的每個元素。
傳回一個串流,在 enumerable
中的每個元素上並行執行給定的 function
。
傳回一個串流,其中給定的函式 (module
和 function
) 並行對應到 enumerable
中的每個元素。
傳回所有子項 PID,但不包括正在重新啟動的子項。
將任務作為給定 supervisor
的子項啟動。
將任務作為給定 supervisor
的子項啟動。
啟動新的監督程式。
終止具有給定 pid
的子項。
類型
@type option() :: DynamicSupervisor.option() | DynamicSupervisor.init_option()
start_link
使用的選項值
函式
@spec async(Supervisor.supervisor(), (-> any()), Keyword.t()) :: Task.t()
啟動可以等待的任務。
supervisor
必須是 Supervisor
中定義的參考。工作仍然會連結至呼叫者,詳情請參閱 Task.async/1
,非連結變體請參閱 async_nolink/3
。
如果 supervisor
已達到最大子項數,則會引發錯誤。
選項
:shutdown
-:brutal_kill
如果工作必須在關閉時直接終止,或表示逾時值(毫秒)的整數,預設為 5000 毫秒。工作必須攔截退出,逾時才有作用。
啟動可以等待的任務。
supervisor
必須是 Supervisor
中定義的參考。工作仍然會連結至呼叫者,詳情請參閱 Task.async/1
,非連結變體請參閱 async_nolink/3
。
如果 supervisor
已達到最大子項數,則會引發錯誤。
選項
:shutdown
-:brutal_kill
如果工作必須在關閉時直接終止,或表示逾時值(毫秒)的整數,預設為 5000 毫秒。工作必須攔截退出,逾時才有作用。
@spec async_nolink(Supervisor.supervisor(), (-> any()), Keyword.t()) :: Task.t()
啟動可以等待的任務。
supervisor
必須是 Supervisor
中定義的參考。工作不會連結至呼叫者,詳情請參閱 Task.async/1
。
如果 supervisor
已達到最大子項數,則會引發錯誤。
請注意,此函式要求工作監督者將 :temporary
作為 :restart
選項(預設值),因為 async_nolink/3
保留工作的一個直接參考,如果工作重新啟動,此參考會遺失。
選項
:shutdown
-:brutal_kill
如果工作必須在關閉時直接終止,或表示逾時值(毫秒)的整數,預設為 5000 毫秒。工作必須攔截退出,逾時才有作用。
與 OTP 行為相容
如果您在 GenServer
等 OTP 行為中使用 async_nolink
建立工作,您應該在 GenServer.handle_info/2
回呼中比對來自工作的訊息。
工作傳送的回覆格式為 {ref, result}
,其中 ref
是工作結構保留的監視器參考,result
是工作函式的傳回值。
請記住,不論使用 async_nolink
建立的工作如何終止,呼叫者的程序總是會收到 :DOWN
訊息,其中 ref
值與工作結構保留的值相同。如果工作正常終止,:DOWN
訊息中的原因將會是 :normal
。
範例
通常,當合理預期任務可能會失敗,而且您不希望它中斷呼叫者時,您會使用 async_nolink/3
。讓我們看一個 GenServer
用於執行單一任務並追蹤其狀態的範例
defmodule MyApp.Server do
use GenServer
# ...
def start_task do
GenServer.call(__MODULE__, :start_task)
end
# In this case the task is already running, so we just return :ok.
def handle_call(:start_task, _from, %{ref: ref} = state) when is_reference(ref) do
{:reply, :ok, state}
end
# The task is not running yet, so let's start it.
def handle_call(:start_task, _from, %{ref: nil} = state) do
task =
Task.Supervisor.async_nolink(MyApp.TaskSupervisor, fn ->
...
end)
# We return :ok and the server will continue running
{:reply, :ok, %{state | ref: task.ref}}
end
# The task completed successfully
def handle_info({ref, answer}, %{ref: ref} = state) do
# We don't care about the DOWN message now, so let's demonitor and flush it
Process.demonitor(ref, [:flush])
# Do something with the result and then return
{:noreply, %{state | ref: nil}}
end
# The task failed
def handle_info({:DOWN, ref, :process, _pid, _reason}, %{ref: ref} = state) do
# Log and possibly restart the task...
{:noreply, %{state | ref: nil}}
end
end
啟動可以等待的任務。
supervisor
必須是 Supervisor
中定義的參考。工作不會連結至呼叫者,詳情請參閱 Task.async/1
。
如果 supervisor
已達到最大子項數,則會引發錯誤。
請注意,此函式需要任務監督程式具有 :temporary
作為 :restart
選項(預設值),因為 async_nolink/5
保留對任務的直接參照,如果任務重新啟動,則會遺失此參照。
@spec async_stream( Supervisor.supervisor(), Enumerable.t(), (term() -> term()), keyword() ) :: Enumerable.t()
傳回一個串流,在 enumerable
中的每個元素上並行執行給定的函式 fun
。
enumerable
中的每個元素都作為引數傳遞給指定的函式 fun
,並由其自己的任務處理。這些任務將在指定的 supervisor
下產生,並連結到呼叫者程序,類似於 async/3
。
請參閱 async_stream/6
以了解討論、選項和範例。
async_stream(supervisor, enumerable, module, function, args, options \\ [])
檢視原始碼 (自 1.4.0 起)@spec async_stream( Supervisor.supervisor(), Enumerable.t(), module(), atom(), [term()], keyword() ) :: Enumerable.t()
傳回一個串流,其中給定的函式 (module
和 function
) 並行對應到 enumerable
中的每個元素。
每個元素都將附加到指定的 args
之前,並由其自己的任務處理。這些任務將在指定的 supervisor
下產生,並連結到呼叫者程序,類似於 async/5
。
串流時,每個任務在成功完成時會發出 {:ok, value}
,如果呼叫者正在捕捉退出,則會發出 {:exit, reason}
。結果的順序取決於 :ordered
選項的值。
可以透過選項控制並行處理的層級和任務允許執行的時間(請參閱以下「選項」區段)。
如果您發現自己正在捕捉退出以處理非同步串流中的退出,請考慮使用 async_stream_nolink/6
來啟動未連結到呼叫程序的任務。
選項
:max_concurrency
- 設定同時執行的最大任務數。預設為System.schedulers_online/0
。:ordered
- 是否應以與輸入串流相同的順序傳回結果。當您有大型串流且不希望在傳送結果前進行緩衝時,此選項很有用。當您將任務用於副作用時,這也很有用。預設為true
。:timeout
- 在未收到任務回覆(在所有正在執行的任務中)的情況下等待的最大時間(以毫秒為單位)。預設為5000
。:on_timeout
- 在任務逾時時要執行的動作。可能的值為:exit
(預設) - 產生任務的程序會結束。:kill_task
- 逾時的任務會被終止。為該任務發出的值為{:exit, :timeout}
。
:zip_input_on_exit
-(自 v1.14.0 起)將原始輸入新增至:exit
元組。為該任務發出的值為{:exit, {input, reason}}
,其中input
是在處理期間導致結束的集合元素。預設為false
。:shutdown
-:brutal_kill
如果必須在關閉時直接終止任務,或表示逾時值的整數。預設為5000
毫秒。任務必須攔截結束才能讓逾時生效。
範例
讓我們建立一個串流,然後列舉它
stream = Task.Supervisor.async_stream(MySupervisor, collection, Mod, :expensive_fun, [])
Enum.to_list(stream)
@spec async_stream_nolink( Supervisor.supervisor(), Enumerable.t(), (term() -> term()), keyword() ) :: Enumerable.t()
傳回一個串流,在 enumerable
中的每個元素上並行執行給定的 function
。
在 enumerable
中的每個元素都作為引數傳遞給指定的函式 fun
,並由其自己的任務處理。這些任務將在指定的 supervisor
下產生,並且不會連結到呼叫程序,類似於 async_nolink/3
。
請參閱 async_stream/6
以取得討論和範例。
async_stream_nolink(supervisor, enumerable, module, function, args, options \\ [])
檢視原始碼 (自 1.4.0 起)@spec async_stream_nolink( Supervisor.supervisor(), Enumerable.t(), module(), atom(), [term()], keyword() ) :: Enumerable.t()
傳回一個串流,其中給定的函式 (module
和 function
) 並行對應到 enumerable
中的每個元素。
enumerable
中的每個元素都將加到指定的 args
之前,並由它自己的工作處理。這些工作將在指定的 supervisor
下產生,且不會連結到呼叫者程序,類似於 async_nolink/5
。
請參閱 async_stream/6
以了解討論、選項和範例。
@spec children(Supervisor.supervisor()) :: [pid()]
傳回所有子項 PID,但不包括正在重新啟動的子項。
請注意,在記憶體不足的情況下,監督大量子程序時呼叫此函數可能會導致記憶體不足例外狀況。
@spec start_child(Supervisor.supervisor(), (-> any()), keyword()) :: DynamicSupervisor.on_start_child()
將任務作為給定 supervisor
的子項啟動。
Task.Supervisor.start_child(MyTaskSupervisor, fn ->
IO.puts "I am running in a task"
end)
請注意,產生的程序不會連結到呼叫者,只會連結到監督者。如果工作需要執行副作用(例如 I/O),而您對其結果或是否成功完成不感興趣,則此命令很有用。
選項
:restart
- 重新啟動策略,可能是:temporary
(預設值)、:transient
或:permanent
。:temporary
表示工作從不重新啟動,:transient
表示如果結束不是:normal
、:shutdown
或{:shutdown, reason}
,則重新啟動。:permanent
重新啟動策略表示總是重新啟動。:shutdown
-:brutal_kill
如果工作必須在關閉時直接終止,或表示逾時值的整數,預設為 5000 毫秒。工作必須攔截結束才能讓逾時生效。
@spec start_child(Supervisor.supervisor(), module(), atom(), [term()], keyword()) :: DynamicSupervisor.on_start_child()
將任務作為給定 supervisor
的子項啟動。
類似於 start_child/3
,但工作由指定的 module
、fun
和 args
指定。
@spec start_link([option()]) :: Supervisor.on_start()
啟動新的監督程式。
範例
工作監督者通常使用元組格式在監督樹下啟動
{Task.Supervisor, name: MyApp.TaskSupervisor}
您也可以透過直接呼叫 start_link/1
來啟動它
Task.Supervisor.start_link(name: MyApp.TaskSupervisor)
但這只建議用於指令碼,應避免在生產程式碼中使用。一般來說,程序應始終在監督樹內啟動。
選項
:name
- 用於註冊監督者名稱,支援的值說明在GenServer
模組文件中的名稱註冊
區段下;:max_restarts
、:max_seconds
和:max_children
- 如DynamicSupervisor
中所指定;
此函數也可以接收 :restart
和 :shutdown
作為選項,但這兩個選項已過時,現在建議將它們直接傳遞給 start_child
。
@spec terminate_child(Supervisor.supervisor(), pid()) :: :ok | {:error, :not_found}
終止具有給定 pid
的子項。