檢視原始碼 分散式任務和標籤

在本章節中,我們將回到 :kv 應用程式並新增一個路由層,它將允許我們根據儲存區名稱在節點之間分配請求。

路由層將接收下列格式的路由表

[
  {?a..?m, :"foo@computer-name"},
  {?n..?z, :"bar@computer-name"}
]

路由器會根據表格檢查儲存區名稱的第一個位元組,並根據該位元組將請求傳送至適當的節點。例如,以字母「a」開頭的儲存區(?a 代表字母「a」的 Unicode 碼位)將會傳送至節點 foo@computer-name

如果相符的項目指向評估請求的節點,則表示我們已完成路由,而此節點將執行請求的操作。如果相符的項目指向不同的節點,我們將把請求傳遞給該節點,該節點會查看自己的路由表(可能與第一個節點中的路由表不同),並採取相應的動作。如果沒有相符的項目,則會引發錯誤。

注意:在本章節中,我們將在同一台機器上使用兩個節點。您可以自由地在同一個網路中使用兩個(或更多)不同的機器,但您需要做一些準備工作。首先,您需要確保所有機器都有 ~/.erlang.cookie 檔案,且值完全相同。然後,您需要保證 epmd 在未被封鎖的埠上執行(您可以執行 epmd -d 以取得偵錯資訊)。

我們的首支分散式程式碼

Elixir 內建連接節點和在節點之間交換資訊的功能。事實上,當在分散式環境中工作時,我們使用相同的程序、訊息傳遞和接收訊息的概念,因為 Elixir 程序是位置透明的。這表示在傳送訊息時,收件程序是否在同一個節點或另一個節點並不重要,VM 在這兩種情況下都能傳遞訊息。

為了執行分散式程式碼,我們需要以一個名稱啟動 VM。名稱可以很短(在同一個網路中時)或很長(需要完整的電腦位址)。讓我們開始一個新的 IEx 會話

$ iex --sname foo

您現在可以看到提示略有不同,並顯示節點名稱後接電腦名稱

Interactive Elixir - press Ctrl+C to exit (type h() ENTER for help)
iex(foo@jv)1>

我的電腦名稱為 jv,因此在上面的範例中,我看到 foo@jv,但您會得到不同的結果。我們將在以下範例中使用 foo@computer-name,而您在嘗試程式碼時應適當地更新它們。

讓我們在此 shell 中定義名為 Hello 的模組

iex> defmodule Hello do
...>   def world, do: IO.puts "hello world"
...> end

如果您在同一個網路中擁有另一台已安裝 Erlang 和 Elixir 的電腦,您可以在其上啟動另一個 shell。如果您沒有,您可以在另一個終端機中啟動另一個 IEx 會話。無論哪一種情況,都給它 bar 的簡短名稱

$ iex --sname bar

請注意,在此新的 IEx 會話中,我們無法存取 Hello.world/0

iex> Hello.world
** (UndefinedFunctionError) function Hello.world/0 is undefined (module Hello is not available)
    Hello.world()

然而,我們可以從 bar@computer-namefoo@computer-name 上產生一個新程序!讓我們試試看(其中 @computer-name 是您在本地看到的)

iex> Node.spawn_link(:"foo@computer-name", fn -> Hello.world() end)
#PID<9014.59.0>
hello world

Elixir 在另一個節點上產生了一個程序,並傳回其 PID。然後程式碼在 Hello.world/0 函數存在的另一個節點上執行,並呼叫該函數。請注意,「hello world」的結果列印在目前的節點 bar 上,而不是 foo 上。換句話說,要列印的訊息從 foo 傳送回 bar。這是因為在另一個節點(foo)上產生的程序知道所有輸出都應傳送回原始節點!

我們可以像往常一樣傳送和接收 Node.spawn_link/2 傳回的 PID 的訊息。讓我們嘗試一個快速的 ping-pong 範例

iex> pid = Node.spawn_link(:"foo@computer-name", fn ->
...>   receive do
...>     {:ping, client} -> send(client, :pong)
...>   end
...> end)
#PID<9014.59.0>
iex> send(pid, {:ping, self()})
{:ping, #PID<0.73.0>}
iex> flush()
:pong
:ok

從我們的快速探索中,我們可以得出結論,我們應該使用 Node.spawn_link/2 在遠端節點上產生程序,每當我們需要執行分散式運算時。然而,我們在整個指南中了解到,如果可能的話,應避免在監督樹之外產生程序,因此我們需要尋找其他選項。

有三個比 Node.spawn_link/2 更好的替代方案,我們可以在我們的實作中使用

  1. 我們可以使用 Erlang 的 :erpc 模組在遠端節點上執行函式。在上述 bar@computer-name shell 內,你可以呼叫 :erpc.call(:"foo@computer-name", Hello, :world, []),它會印出「hello world」

  2. 我們可以在另一個節點上執行伺服器,並透過 GenServer API 將要求傳送給該節點。例如,你可以使用 GenServer.call({name, node}, arg) 或傳遞遠端程序 PID 作為第一個引數來呼叫遠端節點上的伺服器

  3. 我們可以使用 任務,我們在 前一章 中已經學習過,因為它們可以在本機和遠端節點上產生

上述選項具有不同的屬性。GenServer 會在單一伺服器上序列化你的要求,而任務會在遠端節點上有效地非同步執行,唯一的序列化點是主管所執行的產生。

對於我們的路由層,我們將使用任務,但也可以自由探索其他替代方案。

async/await

到目前為止,我們已經探索了在隔離中啟動和執行的任務,而不考慮其回傳值。然而,有時執行任務來計算值並稍後讀取其結果是有用的。為此,任務也提供了 async/await 模式

task = Task.async(fn -> compute_something_expensive() end)
res  = compute_something_else()
res + Task.await(task)

async/await 提供了一個非常簡單的機制來並行計算值。不僅如此,async/await 也可以與我們在前面章節中使用的相同 Task.Supervisor 一起使用。我們只需要呼叫 Task.Supervisor.async/2 而不是 Task.Supervisor.start_child/2,並使用 Task.await/2 來稍後讀取結果。

分散式任務

分散式任務與受監督任務完全相同。唯一的區別是我們在主管上產生任務時傳遞節點名稱。從 :kv 應用程式開啟 lib/kv/supervisor.ex。我們將任務主管新增為樹狀結構的最後一個子節點

{Task.Supervisor, name: KV.RouterTasks},

現在,讓我們再次啟動兩個命名節點,但位於 :kv 應用程式內

$ iex --sname foo -S mix
$ iex --sname bar -S mix

bar@computer-name 內部,我們現在可以直接透過主管在另一個節點上產生任務

iex> task = Task.Supervisor.async({KV.RouterTasks, :"foo@computer-name"}, fn ->
...>   {:ok, node()}
...> end)
%Task{
  mfa: {:erlang, :apply, 2},
  owner: #PID<0.122.0>,
  pid: #PID<12467.88.0>,
  ref: #Reference<0.0.0.400>
}
iex> Task.await(task)
{:ok, :"foo@computer-name"}

我們的首要分佈式任務是擷取執行任務的節點名稱。請注意,我們已提供匿名函式給 Task.Supervisor.async/2,但在分佈式案例中,最好明確提供模組、函式和引數

iex> task = Task.Supervisor.async({KV.RouterTasks, :"foo@computer-name"}, Kernel, :node, [])
%Task{
  mfa: {Kernel, :node, 0},
  owner: #PID<0.122.0>,
  pid: #PID<12467.89.0>,
  ref: #Reference<0.0.0.404>
}
iex> Task.await(task)
:"foo@computer-name"

差異在於,匿名函式要求目標節點與呼叫者擁有完全相同的程式碼版本。使用模組、函式和引數較為健全,因為你只需要在給定的模組中找到具有匹配元數的函式即可。

有了這項知識,我們終於可以撰寫路由程式碼了。

路由層

lib/kv/router.ex 建立一個檔案,其內容如下

defmodule KV.Router do
  @doc """
  Dispatch the given `mod`, `fun`, `args` request
  to the appropriate node based on the `bucket`.
  """
  def route(bucket, mod, fun, args) do
    # Get the first byte of the binary
    first = :binary.first(bucket)

    # Try to find an entry in the table() or raise
    entry =
      Enum.find(table(), fn {enum, _node} ->
        first in enum
      end) || no_entry_error(bucket)

    # If the entry node is the current node
    if elem(entry, 1) == node() do
      apply(mod, fun, args)
    else
      {KV.RouterTasks, elem(entry, 1)}
      |> Task.Supervisor.async(KV.Router, :route, [bucket, mod, fun, args])
      |> Task.await()
    end
  end

  defp no_entry_error(bucket) do
    raise "could not find entry for #{inspect bucket} in table #{inspect table()}"
  end

  @doc """
  The routing table.
  """
  def table do
    # Replace computer-name with your local machine name
    [{?a..?m, :"foo@computer-name"}, {?n..?z, :"bar@computer-name"}]
  end
end

讓我們撰寫一個測試,以驗證我們的路由器是否運作。建立一個名為 test/kv/router_test.exs 的檔案,其中包含

defmodule KV.RouterTest do
  use ExUnit.Case, async: true

  test "route requests across nodes" do
    assert KV.Router.route("hello", Kernel, :node, []) ==
             :"foo@computer-name"
    assert KV.Router.route("world", Kernel, :node, []) ==
             :"bar@computer-name"
  end

  test "raises on unknown entries" do
    assert_raise RuntimeError, ~r/could not find entry/, fn ->
      KV.Router.route(<<0>>, Kernel, :node, [])
    end
  end
end

第一個測試呼叫 Kernel.node/0,它會傳回目前節點的名稱,根據儲存區名稱「hello」和「world」。根據我們目前的路由表,我們應該分別取得 foo@computer-namebar@computer-name 作為回應。

第二個測試檢查程式碼是否會對未知項目引發錯誤。

為了執行第一個測試,我們需要執行兩個節點。移至 apps/kv 並重新啟動名為 bar 的節點,該節點將由測試使用。

$ iex --sname bar -S mix

現在使用下列指令執行測試

$ elixir --sname foo -S mix test

測試應該會通過。

測試篩選器和標籤

儘管我們的測試通過了,但我們的測試結構變得更複雜。特別是,僅使用 mix test 執行測試會導致我們的套件失敗,因為我們的測試需要連線到另一個節點。

幸運的是,ExUnit 附帶標記測試的功能,讓我們可以根據這些標籤執行特定的回呼,甚至完全篩選測試。我們已在上一個章節中使用 :capture_log 標籤,其語意是由 ExUnit 本身指定的。

這次我們在 test/kv/router_test.exs 中加入 :distributed 標籤

@tag :distributed
test "route requests across nodes" do

撰寫 @tag :distributed 等同於撰寫 @tag distributed: true

標記測試後,我們現在可以檢查節點是否在網路中存活,如果沒有,我們可以排除所有分散式測試。在 :kv 應用程式中開啟 test/test_helper.exs 並新增下列內容

exclude =
  if Node.alive?(), do: [], else: [distributed: true]

ExUnit.start(exclude: exclude)

現在使用 mix test 執行測試

$ mix test
Excluding tags: [distributed: true]

.......

Finished in 0.05 seconds
9 tests, 0 failures, 1 excluded

這次所有測試都通過了,ExUnit 警告我們分散式測試已被排除。如果您使用 $ elixir --sname foo -S mix test 執行測試,只要 bar@computer-name 節點可用,就會執行一個額外的測試並成功通過。

mix test 命令也允許我們動態包含和排除標籤。例如,我們可以執行 $ mix test --include distributed 來執行分散式測試,而不論 test/test_helper.exs 中設定的值為何。我們也可以傳遞 --exclude 來從命令列中排除特定標籤。最後,--only 可用於僅執行具有特定標籤的測試

$ elixir --sname foo -S mix test --only distributed

您可以在 ExUnit.Case 模組文件檔中閱讀更多關於篩選器、標籤和預設標籤的資訊。

將所有內容連接起來

現在我們的路由系統已經到位,讓我們變更 KVServer 以使用路由器。將 KVServer.Command 中的 lookup/2 函式從此

defp lookup(bucket, callback) do
  case KV.Registry.lookup(KV.Registry, bucket) do
    {:ok, pid} -> callback.(pid)
    :error -> {:error, :not_found}
  end
end

替換為此

defp lookup(bucket, callback) do
  case KV.Router.route(bucket, KV.Registry, :lookup, [KV.Registry, bucket]) do
    {:ok, pid} -> callback.(pid)
    :error -> {:error, :not_found}
  end
end

我們沒有直接查詢登錄檔,而是使用路由器來比對特定節點。然後我們會取得一個 pid,它可以來自叢集中的任何程序。從現在開始,GETPUTDELETE 要求都會路由到適當的節點。

讓我們也確定在建立新儲存區時,它會出現在正確的節點上。將 KVServer.Command 中的 run/1 函式(與 :create 命令相符)替換為下列內容

def run({:create, bucket}) do
  case KV.Router.route(bucket, KV.Registry, :create, [KV.Registry, bucket]) do
    pid when is_pid(pid) -> {:ok, "OK\r\n"}
    _ -> {:error, "FAILED TO CREATE BUCKET"}
  end
end

現在如果您執行測試,您會看到檢查伺服器互動的現有測試會失敗,因為它會嘗試使用路由表。若要解決此失敗,請變更 :kv_server 應用程式的 test_helper.exs,就像我們對 :kv 所做的一樣,並將 @tag :distributed 也新增到此測試

@tag :distributed
test "server interaction", %{socket: socket} do

但是,請記住,透過將測試分散,我們可能會減少執行它的頻率,因為我們可能不會在每次測試執行時都進行分散式設定。我們將在下一章中學習如何解決這個問題,方法是有效地學習如何讓路由表可設定。

總結

在分佈式方面,我們僅觸及皮毛。

在我們所有的範例中,我們仰賴 Erlang 在每次有請求時自動連接節點的能力。例如,當我們呼叫 Node.spawn_link(:"foo@computer-name", fn -> Hello.world() end) 時,Erlang 會自動連接到所述節點並啟動新的程序。然而,您可能也想要使用 Node.connect/1Node.disconnect/1 來採取更明確的方法來連接。

預設情況下,Erlang 會建立一個完全網狀網路,這表示所有節點都彼此連接。在此拓撲結構下,Erlang 分佈式已知可以擴展到同一個叢集中的數十個節點。Erlang 也有隱藏節點的概念,這允許開發人員組建自訂拓撲,如在 Partisan 等專案中所見。

在實際環境中,您可能隨時都有節點連接和斷開連接。在這種情況下,您需要提供節點可發現性。像 libclusterdns_cluster 等函式庫提供了使用 DNS、Kubernetes 等進行節點可發現性的多種策略。

在實際生活中使用的分散式鍵值儲存需要考慮節點可能隨時上下線,以及在節點間遷移儲存區的事實。更進一步來說,儲存區通常需要在節點之間複製,因此節點的故障不會導致整個儲存區遺失。這個程序稱為複製。我們的實作不會嘗試解決此類問題。相反地,我們假設節點數量是固定的,因此使用固定的路由表。

這些主題乍看之下可能令人望而生畏,但請記住,大多數 Elixir 框架會為您抽象化這些問題。例如,在使用 Phoenix 網路框架 時,其即插即用的抽象化會負責傳送訊息和追蹤使用者如何加入和離開叢集。然而,如果您對分散式系統有興趣,還有很多東西可以探索。以下是一些額外的參考

您還可以在整個 Erlang 生態系統中找到許多用於建構分散式系統的函式庫。現在,是時候回到我們的簡單分散式鍵值儲存,並學習如何為生產環境設定和封裝它。