檢視原始碼 串流 (Elixir v1.16.2)
用於建立和組合串流的函式。
串流是可以組合的延遲列舉 (關於列舉的簡介,請參閱 Enum
模組)。任何在列舉期間逐一產生元素的列舉都稱為串流。例如,Elixir 的 Range
就是一個串流
iex> range = 1..5
1..5
iex> Enum.map(range, &(&1 * 2))
[2, 4, 6, 8, 10]
在上面的範例中,當我們對範圍進行映射時,列舉的元素會在列舉期間逐一建立。Stream
模組允許我們對範圍進行映射,而不會觸發其列舉
iex> range = 1..3
iex> stream = Stream.map(range, &(&1 * 2))
iex> Enum.map(stream, &(&1 + 1))
[3, 5, 7]
請注意,我們從一個範圍開始,然後建立一個串流,目的是將範圍中的每個元素乘以 2。此時,沒有進行任何運算。只有當呼叫 Enum.map/2
時,我們才會實際列舉範圍中的每個元素,將其乘以 2 並加上 1。我們說 Stream
中的函式是延遲的,而 Enum
中的函式是急切的。
由於串流具有延遲性,因此在處理大型 (甚至無限) 集合時它們非常有用。當使用 Enum
串連許多運算時,會建立中間清單,而 Stream
會建立一個運算食譜,稍後再執行。讓我們看另一個範例
1..3
|> Enum.map(&IO.inspect(&1))
|> Enum.map(&(&1 * 2))
|> Enum.map(&IO.inspect(&1))
1
2
3
2
4
6
#=> [2, 4, 6]
請注意,我們首先列印清單中的每個元素,然後將每個元素乘以 2,最後列印每個新值。在此範例中,清單列舉了三次。讓我們看一個使用串流的範例
stream = 1..3
|> Stream.map(&IO.inspect(&1))
|> Stream.map(&(&1 * 2))
|> Stream.map(&IO.inspect(&1))
Enum.to_list(stream)
1
2
2
4
3
6
#=> [2, 4, 6]
雖然最終結果相同,但列印元素的順序已變更!使用串流時,我們會列印第一個元素,然後列印其兩倍。在此範例中,清單僅列舉了一次!
這就是我們前面所說的串流是可以組合的延遲列舉。請注意,我們可以多次呼叫 Stream.map/2
,有效地組合串流並保持其延遲性。只有當您呼叫 Enum
模組中的函式時,才會執行運算。
與 Enum
類似,此模組中的函式以線性時間運作。這表示執行操作所需時間與串列長度以相同速率增加。這在 Stream.map/2
等操作中是預期的。畢竟,如果我們想要遍歷串流中的每個元素,串流越長,我們需要遍歷的元素就越多,而且所需時間就越長。
建立串流
Elixir 標準函式庫中有許多函式會傳回串流,以下是一些範例:
IO.stream/2
- 一次串流一個輸入列URI.query_decoder/1
- 一次解碼一個查詢字串
此模組也提供許多建立串流的便利函式,例如 Stream.cycle/1
、Stream.unfold/2
、Stream.resource/3
等。
請注意,此模組中的函式保證會傳回可列舉物件。由於可列舉物件可以有不同的形式(結構、匿名函式等),因此此模組中的函式可能會傳回任何這些形式,而且這可能會隨時變更。例如,今天傳回匿名函式的函式,在未來的版本中可能會傳回結構。
摘要
函式
依據 fun
傳回相同值的緩衝元素,將 enum
分割成區塊。
chunk_every(enum, count, count)
的捷徑。
將可列舉物件串流成區塊,每個區塊包含 count
個元素,其中每個新區塊從可列舉物件中的 step
個元素開始。
使用細緻的控制來分割 enum
,以在發出每個區塊時進行控制。
建立一個串流,列舉可列舉中的每個可列舉。
建立一個串流,列舉第一個參數,接著是第二個參數。
建立一個串流,無限次循環給定的可列舉。
建立一個串流,僅發射與最後發射元素不同的元素。
建立一個串流,僅發射元素,如果在元素上呼叫 fun
的結果與在最後發射元素上呼叫 fun
的(儲存)結果不同。
延遲從可列舉中刪除下一個 n
個元素。
建立一個串流,從可列舉中刪除每個 nth
個元素。
延遲刪除可列舉的元素,而給定函數傳回真值。
在串流中複製給定元素 n
次。
對每個元素執行給定函數。
建立一個串流,根據列舉中給定的函數篩選元素。
將給定的 fun
映射到 enumerable
上,並壓平結果。
延遲在列舉的每個元素之間插入 intersperse_element
。
建立一個串流,在給定的週期 n
(以毫秒為單位)後發射一個值。
將串流值注入給定的可收集作為副作用。
發射一系列值,從 start_value
開始。後續值是透過對前一個值呼叫 next_fun
來產生的。
建立一個串流,將在列舉中套用給定的函數。
建立一個串流,將對可列舉中的每個 nth
個元素套用給定的函數。
建立一個串流,將根據列舉中給定的函數拒絕元素。
傳回透過重複呼叫 generator_fun
所產生的串流。
對給定的資源發射一系列值。
執行給定的串流。
建立一個串流,將給定的函數套用於每個元素,發射結果,並將相同的結果用作下一個運算的累加器。使用可列舉中的第一個元素作為起始值。
建立一個串流,將給定的函式套用至每個元素,發射結果,並將相同的結果用作下一個運算的累加器。使用給定的 acc
作為起始值。
延遲取得列舉中的下一個 計數
個元素,並停止列舉。
建立一個串流,從列舉中取得每個 第 n 個
元素。
延遲取得列舉中的元素,同時給定的函式傳回真值。
建立一個串流,在 n
毫秒後發射單一值。
轉換現有的串流。
類似於 Stream.transform/5
,但未提供 last_fun
。
使用基於函式的開始、最後和之後的回呼轉換現有的串流。
針對給定的累加器發射一系列值。
建立一個串流,僅發射唯一的元素。
建立一個串流,僅發射唯一的元素,方法是移除函式 fun
傳回重複元素的元素。
建立一個串流,其中列舉中的每個元素將與其索引一起封裝在元組中。
將有限集合的列舉中對應的元素壓縮成一個元組串流。
將兩個列舉壓縮在一起,延遲執行。
延遲將有限集合的列舉中對應的元素壓縮成一個新的列舉,並在執行時使用 zip_fun
函式轉換它們。
延遲將兩個列舉中對應的元素壓縮成一個新的列舉,並在執行時使用 zip_fun
函式轉換它們。
類型
函式
@spec chunk_by(Enumerable.t(), (element() -> any())) :: Enumerable.t()
依據 fun
傳回相同值的緩衝元素,將 enum
分割成區塊。
僅當 fun
傳回新值或 enum
結束時,才會發射元素。
範例
iex> stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
iex> Enum.to_list(stream)
[[1], [2, 2], [3], [4, 4, 6], [7, 7]]
@spec chunk_every(Enumerable.t(), pos_integer()) :: Enumerable.t()
chunk_every(enum, count, count)
的捷徑。
@spec chunk_every( Enumerable.t(), pos_integer(), pos_integer(), Enumerable.t() | :discard ) :: Enumerable.t()
將可列舉物件串流成區塊,每個區塊包含 count
個元素,其中每個新區塊從可列舉物件中的 step
個元素開始。
step
為選用參數,若未傳遞,則預設為 count
,亦即區塊不會重疊。只要集合結束或發射不完整的區塊,區塊化就會停止。
如果最後一個區塊沒有 count
個元素來填滿區塊,則會從 leftover
取用元素來填入區塊。如果 leftover
沒有足夠的元素來填滿區塊,則會傳回一個部分區塊,其元素少於 count
個。
如果在 leftover
中提供 :discard
,則最後一個區塊會被捨棄,除非它剛好有 count
個元素。
範例
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 2) |> Enum.to_list()
[[1, 2], [3, 4], [5, 6]]
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, :discard) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5]]
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 2, [7]) |> Enum.to_list()
[[1, 2, 3], [3, 4, 5], [5, 6, 7]]
iex> Stream.chunk_every([1, 2, 3, 4, 5, 6], 3, 3, []) |> Enum.to_list()
[[1, 2, 3], [4, 5, 6]]
iex> Stream.chunk_every([1, 2, 3, 4], 3, 3, Stream.cycle([0])) |> Enum.to_list()
[[1, 2, 3], [4, 0, 0]]
@spec chunk_while( Enumerable.t(), acc(), (element(), acc() -> {:cont, chunk, acc()} | {:cont, acc()} | {:halt, acc()}), (acc() -> {:cont, chunk, acc()} | {:cont, acc()}) ) :: Enumerable.t() when chunk: any()
使用細緻的控制來分割 enum
,以在發出每個區塊時進行控制。
chunk_fun
接收目前的元素和累加器,並且必須傳回 {:cont, element, acc}
來發射給定的區塊並繼續使用累加器,或傳回 {:cont, acc}
來不發射任何區塊並繼續使用傳回的累加器。
after_fun
在反覆運算完成時被呼叫,並且也必須傳回 {:cont, element, acc}
或 {:cont, acc}
。
範例
iex> chunk_fun = fn element, acc ->
...> if rem(element, 2) == 0 do
...> {:cont, Enum.reverse([element | acc]), []}
...> else
...> {:cont, [element | acc]}
...> end
...> end
iex> after_fun = fn
...> [] -> {:cont, []}
...> acc -> {:cont, Enum.reverse(acc), []}
...> end
iex> stream = Stream.chunk_while(1..10, [], chunk_fun, after_fun)
iex> Enum.to_list(stream)
[[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]]
@spec concat(Enumerable.t()) :: Enumerable.t()
建立一個串流,列舉可列舉中的每個可列舉。
範例
iex> stream = Stream.concat([1..3, 4..6, 7..9])
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6, 7, 8, 9]
@spec concat(Enumerable.t(), Enumerable.t()) :: Enumerable.t()
建立一個串流,列舉第一個參數,接著是第二個參數。
範例
iex> stream = Stream.concat(1..3, 4..6)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6]
iex> stream1 = Stream.cycle([1, 2, 3])
iex> stream2 = Stream.cycle([4, 5, 6])
iex> stream = Stream.concat(stream1, stream2)
iex> Enum.take(stream, 6)
[1, 2, 3, 1, 2, 3]
@spec cycle(Enumerable.t()) :: Enumerable.t()
建立一個串流,無限次循環給定的可列舉。
範例
iex> stream = Stream.cycle([1, 2, 3])
iex> Enum.take(stream, 5)
[1, 2, 3, 1, 2]
@spec dedup(Enumerable.t()) :: Enumerable.t()
建立一個串流,僅發射與最後發射元素不同的元素。
此函式只需要儲存最後發出的元素。
元素使用 ===/2
進行比較。
範例
iex> Stream.dedup([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
[1, 2, 3, 2, 1]
@spec dedup_by(Enumerable.t(), (element() -> term())) :: Enumerable.t()
建立一個串流,僅發射元素,如果在元素上呼叫 fun
的結果與在最後發射元素上呼叫 fun
的(儲存)結果不同。
範例
iex> Stream.dedup_by([{1, :x}, {2, :y}, {2, :z}, {1, :x}], fn {x, _} -> x end) |> Enum.to_list()
[{1, :x}, {2, :y}, {1, :x}]
@spec drop(Enumerable.t(), integer()) :: Enumerable.t()
延遲從可列舉中刪除下一個 n
個元素。
如果給定負數 n
,它將從集合中刪除最後 n
個元素。請注意,實作此項功能的機制會延遲發出任何元素,直到列舉發出 n
個其他元素。
範例
iex> stream = Stream.drop(1..10, 5)
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]
iex> stream = Stream.drop(1..10, -5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec drop_every(Enumerable.t(), non_neg_integer()) :: Enumerable.t()
建立一個串流,從可列舉中刪除每個 nth
個元素。
第一個元素總是會被刪除,除非 nth
為 0。
nth
必須是非負整數。
範例
iex> stream = Stream.drop_every(1..10, 2)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]
iex> stream = Stream.drop_every(1..1000, 1)
iex> Enum.to_list(stream)
[]
iex> stream = Stream.drop_every([1, 2, 3, 4, 5], 0)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec drop_while(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
延遲刪除可列舉的元素,而給定函數傳回真值。
範例
iex> stream = Stream.drop_while(1..10, &(&1 <= 5))
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]
@spec duplicate(any(), non_neg_integer()) :: Enumerable.t()
在串流中複製給定元素 n
次。
n
是大於或等於 0
的整數。
如果 n
是 0
,則會傳回一個空的串流。
範例
iex> stream = Stream.duplicate("hello", 0)
iex> Enum.to_list(stream)
[]
iex> stream = Stream.duplicate("hi", 1)
iex> Enum.to_list(stream)
["hi"]
iex> stream = Stream.duplicate("bye", 2)
iex> Enum.to_list(stream)
["bye", "bye"]
iex> stream = Stream.duplicate([1, 2], 3)
iex> Enum.to_list(stream)
[[1, 2], [1, 2], [1, 2]]
@spec each(Enumerable.t(), (element() -> term())) :: Enumerable.t()
對每個元素執行給定函數。
串流中的值不會改變,因此此函式可用於將副作用 (例如列印) 加入串流。如果要產生不同的串流,請參閱 map/2
。
範例
iex> stream = Stream.each([1, 2, 3], fn x -> send(self(), x) end)
iex> Enum.to_list(stream)
iex> receive do: (x when is_integer(x) -> x)
1
iex> receive do: (x when is_integer(x) -> x)
2
iex> receive do: (x when is_integer(x) -> x)
3
@spec filter(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
建立一個串流,根據列舉中給定的函數篩選元素。
範例
iex> stream = Stream.filter([1, 2, 3], fn x -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[2]
@spec flat_map(Enumerable.t(), (element() -> Enumerable.t())) :: Enumerable.t()
將給定的 fun
映射到 enumerable
上,並壓平結果。
此函式會傳回一個新的串流,其建立方式是將呼叫 fun
的結果附加在 enumerable
的每個元素上。
範例
iex> stream = Stream.flat_map([1, 2, 3], fn x -> [x, x * 2] end)
iex> Enum.to_list(stream)
[1, 2, 2, 4, 3, 6]
iex> stream = Stream.flat_map([1, 2, 3], fn x -> [[x]] end)
iex> Enum.to_list(stream)
[[1], [2], [3]]
@spec intersperse(Enumerable.t(), any()) :: Enumerable.t()
延遲在列舉的每個元素之間插入 intersperse_element
。
範例
iex> Stream.intersperse([1, 2, 3], 0) |> Enum.to_list()
[1, 0, 2, 0, 3]
iex> Stream.intersperse([1], 0) |> Enum.to_list()
[1]
iex> Stream.intersperse([], 0) |> Enum.to_list()
[]
@spec interval(timer()) :: Enumerable.t()
建立一個串流,在給定的週期 n
(以毫秒為單位)後發射一個值。
發出的值是一個從 0
開始的遞增計數器。此操作會在每次串流新元素時,以指定的間隔封鎖呼叫者。
不要使用此函式來產生數字序列。如果不需要封鎖呼叫者程序,請改用 Stream.iterate(0, & &1 + 1)
。
範例
iex> Stream.interval(10) |> Enum.take(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
@spec into(Enumerable.t(), Collectable.t(), (term() -> term())) :: Enumerable.t()
將串流值注入給定的可收集作為副作用。
@spec iterate(element(), (element() -> element())) :: Enumerable.t()
發射一系列值,從 start_value
開始。後續值是透過對前一個值呼叫 next_fun
來產生的。
範例
iex> Stream.iterate(0, &(&1 + 1)) |> Enum.take(5)
[0, 1, 2, 3, 4]
@spec map(Enumerable.t(), (element() -> any())) :: Enumerable.t()
建立一個串流,將在列舉中套用給定的函數。
範例
iex> stream = Stream.map([1, 2, 3], fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6]
@spec map_every(Enumerable.t(), non_neg_integer(), (element() -> any())) :: Enumerable.t()
建立一個串流,將對可列舉中的每個 nth
個元素套用給定的函數。
第一個元素總是傳遞給指定的函式。
nth
必須是非負整數。
範例
iex> stream = Stream.map_every(1..10, 2, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 2, 6, 4, 10, 6, 14, 8, 18, 10]
iex> stream = Stream.map_every([1, 2, 3, 4, 5], 1, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]
iex> stream = Stream.map_every(1..5, 0, fn x -> x * 2 end)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec reject(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
建立一個串流,將根據列舉中給定的函數拒絕元素。
範例
iex> stream = Stream.reject([1, 2, 3], fn x -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[1, 3]
@spec repeatedly((-> element())) :: Enumerable.t()
傳回透過重複呼叫 generator_fun
所產生的串流。
範例
# Although not necessary, let's seed the random algorithm
iex> :rand.seed(:exsss, {1, 2, 3})
iex> Stream.repeatedly(&:rand.uniform/0) |> Enum.take(3)
[0.5455598952593053, 0.6039309974353404, 0.6684893034823949]
@spec resource( (-> acc()), (acc() -> {[element()], acc()} | {:halt, acc()}), (acc() -> term()) ) :: Enumerable.t()
對給定的資源發射一系列值。
類似於 transform/3
,但透過 start_fun
延遲計算初始累積值,並在列舉結束時執行 after_fun
(在成功和失敗的情況下)。
透過使用前一個累積器(初始值為 start_fun
傳回的結果)呼叫 next_fun
產生後續值,它必須傳回包含要發射的元素清單和下一個累積器的元組。如果傳回 {:halt, acc}
,列舉就會結束。
正如函式名稱所暗示的,此函式可用於從資源串流值。
範例
Stream.resource(
fn -> File.open!("sample") end,
fn file ->
case IO.read(file, :line) do
data when is_binary(data) -> {[data], file}
_ -> {:halt, file}
end
end,
fn file -> File.close(file) end
)
iex> Stream.resource(
...> fn ->
...> {:ok, pid} = StringIO.open("string")
...> pid
...> end,
...> fn pid ->
...> case IO.getn(pid, "", 1) do
...> :eof -> {:halt, pid}
...> char -> {[char], pid}
...> end
...> end,
...> fn pid -> StringIO.close(pid) end
...> ) |> Enum.to_list()
["s", "t", "r", "i", "n", "g"]
@spec run(Enumerable.t()) :: :ok
執行給定的串流。
當需要執行串流以產生副作用,且對其傳回結果不感興趣時,這會很有用。
範例
開啟一個檔案,將所有 #
替換為 %
,並串流到另一個檔案,而不用將整個檔案載入記憶體
File.stream!("/path/to/file")
|> Stream.map(&String.replace(&1, "#", "%"))
|> Stream.into(File.stream!("/path/to/other/file"))
|> Stream.run()
@spec scan(Enumerable.t(), (element(), acc() -> any())) :: Enumerable.t()
建立一個串流,將給定的函數套用於每個元素,發射結果,並將相同的結果用作下一個運算的累加器。使用可列舉中的第一個元素作為起始值。
範例
iex> stream = Stream.scan(1..5, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]
@spec scan(Enumerable.t(), acc(), (element(), acc() -> any())) :: Enumerable.t()
建立一個串流,將給定的函式套用至每個元素,發射結果,並將相同的結果用作下一個運算的累加器。使用給定的 acc
作為起始值。
範例
iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]
@spec take(Enumerable.t(), integer()) :: Enumerable.t()
延遲取得列舉中的下一個 計數
個元素,並停止列舉。
如果給定負數 count
,將取最後 count
個值。因此,會完整列舉集合,在記憶體中保留最多 2 * count
個元素。到達集合尾端後,將執行最後 count
個元素。因此,對無限集合使用負數 count
永遠不會傳回。
範例
iex> stream = Stream.take(1..100, 5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
iex> stream = Stream.take(1..100, -5)
iex> Enum.to_list(stream)
[96, 97, 98, 99, 100]
iex> stream = Stream.cycle([1, 2, 3]) |> Stream.take(5)
iex> Enum.to_list(stream)
[1, 2, 3, 1, 2]
@spec take_every(Enumerable.t(), non_neg_integer()) :: Enumerable.t()
建立一個串流,從列舉中取得每個 第 n 個
元素。
第一個元素總是會包含,除非 nth
是 0。
nth
必須是非負整數。
範例
iex> stream = Stream.take_every(1..10, 2)
iex> Enum.to_list(stream)
[1, 3, 5, 7, 9]
iex> stream = Stream.take_every([1, 2, 3, 4, 5], 1)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
iex> stream = Stream.take_every(1..1000, 0)
iex> Enum.to_list(stream)
[]
@spec take_while(Enumerable.t(), (element() -> as_boolean(term()))) :: Enumerable.t()
延遲取得列舉中的元素,同時給定的函式傳回真值。
範例
iex> stream = Stream.take_while(1..100, &(&1 <= 5))
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
@spec timer(timer()) :: Enumerable.t()
建立一個串流,在 n
毫秒後發射單一值。
發出的值是 0
。此操作會封鎖呼叫者一段指定時間,直到串流元素。
範例
iex> Stream.timer(10) |> Enum.to_list()
[0]
@spec transform(Enumerable.t(), acc, fun) :: Enumerable.t() when fun: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}), acc: any()
轉換現有的串流。
它預期一個累加器和一個接收兩個參數的函式,串流元素和更新的累加器。它必須傳回一個元組,其中第一個元素是一個新的串流(通常是清單)或原子 :halt
,第二個元素是下一個元素要使用的累加器。
注意:此函式等同於 Enum.flat_map_reduce/3
,但此函式在處理串流後不會傳回累加器。
範例
Stream.transform/3
很實用,因為它可以用作此模組中定義許多函式的基礎。例如,我們可以實作 Stream.take(enum, n)
如下
iex> enum = 1001..9999
iex> n = 3
iex> stream = Stream.transform(enum, 0, fn i, acc ->
...> if acc < n, do: {[i], acc + 1}, else: {:halt, acc}
...> end)
iex> Enum.to_list(stream)
[1001, 1002, 1003]
Stream.transform/5
進一步概括此函式,允許封裝資源。
@spec transform(Enumerable.t(), start_fun, reducer, after_fun) :: Enumerable.t() when start_fun: (-> acc), reducer: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}), after_fun: (acc -> term()), acc: any()
類似於 Stream.transform/5
,但未提供 last_fun
。
此函式可以視為 Stream.resource/3
與 Stream.transform/3
的組合。
@spec transform(Enumerable.t(), start_fun, reducer, last_fun, after_fun) :: Enumerable.t() when start_fun: (-> acc), reducer: (element(), acc -> {Enumerable.t(), acc} | {:halt, acc}), last_fun: (acc -> {Enumerable.t(), acc} | {:halt, acc}), after_fun: (acc -> term()), acc: any()
使用基於函式的開始、最後和之後的回呼轉換現有的串流。
一旦轉換開始,start_fun
會被呼叫來計算初始累加器。然後,對於可列舉中的每個元素,reducer
函式會使用元素和累加器呼叫,傳回新元素和新累加器,如同 transform/3
。
收集完成後,last_fun
會呼叫累加器來發出任何剩餘的項目。然後呼叫 after_fun
來關閉任何資源,但不會發出任何新項目。last_fun
僅在給定的可列舉成功終止時呼叫(因為它已完成或已自行停止)。after_fun
總是被呼叫,因此 after_fun
必須用於關閉資源。
@spec unfold(acc(), (acc() -> {element(), acc()} | nil)) :: Enumerable.t()
針對給定的累加器發射一系列值。
透過呼叫 next_fun
並使用先前的累加器來產生後續值,它必須傳回一個包含目前值和下一個累加器的元組。如果它傳回 nil
,列舉就會結束。
範例
建立一個會倒數並在零之前停止的串流
iex> Stream.unfold(5, fn
...> 0 -> nil
...> n -> {n, n - 1}
...> end) |> Enum.to_list()
[5, 4, 3, 2, 1]
如果 next_fun
從未傳回 nil
,傳回的串流將是無限的
iex> Stream.unfold(0, fn
...> n -> {n, n + 1}
...> end) |> Enum.take(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
iex> Stream.unfold(1, fn
...> n -> {n, n * 2}
...> end) |> Enum.take(10)
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
@spec uniq(Enumerable.t()) :: Enumerable.t()
建立一個串流,僅發射唯一的元素。
請記住,為了知道一個元素是否唯一,此函式需要儲存串流發出的所有唯一值。因此,如果串流是無限的,儲存的元素數量將無限增加,永遠不會被垃圾回收。
範例
iex> Stream.uniq([1, 2, 3, 3, 2, 1]) |> Enum.to_list()
[1, 2, 3]
@spec uniq_by(Enumerable.t(), (element() -> term())) :: Enumerable.t()
建立一個串流,僅發射唯一的元素,方法是移除函式 fun
傳回重複元素的元素。
函式 fun
會將每個元素對應到一個用於判斷兩個元素是否重複的詞彙。
請記住,為了知道一個元素是否唯一,此函式需要儲存串流發出的所有唯一值。因此,如果串流是無限的,儲存的元素數量將無限增加,永遠不會被垃圾回收。
範例
iex> Stream.uniq_by([{1, :x}, {2, :y}, {1, :z}], fn {x, _} -> x end) |> Enum.to_list()
[{1, :x}, {2, :y}]
iex> Stream.uniq_by([a: {:tea, 2}, b: {:tea, 2}, c: {:coffee, 1}], fn {_, y} -> y end) |> Enum.to_list()
[a: {:tea, 2}, c: {:coffee, 1}]
@spec with_index(Enumerable.t(), integer()) :: Enumerable.t()
建立一個串流,其中列舉中的每個元素將與其索引一起封裝在元組中。
如果給定 offset
,我們將從給定的偏移量開始索引,而不是從零開始。
範例
iex> stream = Stream.with_index([1, 2, 3])
iex> Enum.to_list(stream)
[{1, 0}, {2, 1}, {3, 2}]
iex> stream = Stream.with_index([1, 2, 3], 3)
iex> Enum.to_list(stream)
[{1, 3}, {2, 4}, {3, 5}]
@spec zip(enumerables) :: Enumerable.t() when enumerables: [Enumerable.t()] | Enumerable.t()
將有限集合的列舉中對應的元素壓縮成一個元組串流。
只要給定集合中的任何可列舉完成,合併就會結束。
範例
iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle(["foo", "bar", "baz"])
iex> Stream.zip([concat, [:a, :b, :c], cycle]) |> Enum.to_list()
[{1, :a, "foo"}, {2, :b, "bar"}, {3, :c, "baz"}]
@spec zip(Enumerable.t(), Enumerable.t()) :: Enumerable.t()
將兩個列舉壓縮在一起,延遲執行。
任何一個列舉完成時,就會完成壓縮。
範例
iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle([:a, :b, :c])
iex> Stream.zip(concat, cycle) |> Enum.to_list()
[{1, :a}, {2, :b}, {3, :c}, {4, :a}, {5, :b}, {6, :c}]
@spec zip_with(enumerables, (Enumerable.t() -> term())) :: Enumerable.t() when enumerables: [Enumerable.t()] | Enumerable.t()
延遲將有限集合的列舉中對應的元素壓縮成一個新的列舉,並在執行時使用 zip_fun
函式轉換它們。
enumerables
中每個列舉的第一個元素會放入一個清單,然後傳遞給一元函數 zip_fun
。接著,每個列舉的第二個元素會放入一個清單,並傳遞給 zip_fun
,以此類推,直到 enumerables
中的任何一個列舉完成。
傳回一個新的列舉,其結果為呼叫 zip_fun
。
範例
iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with([concat, concat], fn [a, b] -> a + b end) |> Enum.to_list()
[2, 4, 6, 8, 10, 12]
iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with([concat, concat, 1..3], fn [a, b, c] -> a + b + c end) |> Enum.to_list()
[3, 6, 9]
@spec zip_with(Enumerable.t(), Enumerable.t(), (term(), term() -> term())) :: Enumerable.t()
延遲將兩個列舉中對應的元素壓縮成一個新的列舉,並在執行時使用 zip_fun
函式轉換它們。
zip_fun
會呼叫 enumerable1
的第一個元素和 enumerable2
的第一個元素,接著呼叫每個元素的第二個元素,以此類推,直到任一列舉完成。
範例
iex> concat = Stream.concat(1..3, 4..6)
iex> Stream.zip_with(concat, concat, fn a, b -> a + b end) |> Enum.to_list()
[2, 4, 6, 8, 10, 12]