Streams: Futures dalam Urutan
Ingat kembali gimana cara kita memakai receiver buat asinkron channel kita di
awal bab ini di bagian “Message Passing”.
Method asinkron recv memproduksi serangkaian item seiring berjalannya waktu.
Ini adalah salah satu bentuk dari pola yang jauh lebih umum yang dikenal sebagai
stream. Banyak konsep yang secara alami bisa direpresentasikan sebagai
streams: item-item yang mulai tersedia di dalam antrean (queue), potongan data
yang ditarik secara bertahap dari sistem file pas set datanya terlalu besar buat
memori komputer, atau data yang tiba melalui jaringan seiring waktu. Karena
streams adalah futures, kita bisa memakainya bareng jenis future lainnya
dan menggabungkannya dengan cara-cara yang menarik. Misalnya, kita bisa
mengumpulkan (batch up) kejadian-kejadian (events) buat menghindari terlalu
banyak pemanggilan jaringan, mengatur batas waktu (timeouts) pada urutan
operasi yang berjalan lama, atau membatasi (throttle) kejadian-kejadian UI
buat menghindari melakukan pekerjaan yang sia-sia.
Kita sudah pernah melihat serangkaian item di Bab 13, pas kita melihat trait
Iterator di bagian “Trait Iterator dan Method next”, tapi ada dua perbedaan antara iterator dan asinkron channel
receiver. Perbedaan pertama adalah waktu: iterator itu sinkron, sedangkan
channel receiver itu asinkron. Perbedaan kedua adalah API-nya. Pas bekerja
langsung dengan Iterator, kita memanggil method next-nya yang sinkron.
Khusus buat stream trpl::Receiver, kita memanggil method recv yang
asinkron sebagai gantinya. Di luar itu, API-API ini terasa sangat mirip, dan
kemiripan itu bukan kebetulan kok. Sebuah stream itu kayak bentuk asinkron
dari iterasi. Meskipun trpl::Receiver secara spesifik menunggu buat menerima
pesan, tapi API stream yang buat tujuan umum itu jauh lebih luas: dia
menyediakan item berikutnya sama kayak yang dilakukan Iterator, tapi secara
asinkron.
Kemiripan antara iterator dan streams di Rust artinya kita sebenarnya bisa
membikin sebuah stream dari iterator apa saja. Sama kayak iterator, kita bisa
bekerja bareng sebuah stream dengan memanggil method next-nya lalu me-await
output-nya, kayak di Listing 17-21, yang mana kode ini belum bisa di-compile.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Kita mulai dengan sebuah array angka, yang kemudian kita ubah jadi sebuah
iterator lalu memanggil map buat melipatgandakan semua nilainya. Terus kita
mengubah iterator tersebut jadi sebuah stream menggunakan fungsi
trpl::stream_from_iter. Berikutnya, kita melakukan loop melewati item-item
di dalam stream tersebut pas mereka tiba memakai loop while let.
Sayangnya, pas kita coba menjalankan kodenya, ia tidak bisa di-compile
melainkan malah melaporkan kalau tidak ada method next yang tersedia:
error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
Sesuai penjelasan output ini, alasan error compiler-nya adalah karena kita
butuh trait yang tepat berada di dalam scope supaya bisa memakai method
next. Mengingat pembahasan kita sejauh ini, kita mungkin secara wajar
berekspektasi kalau trait tersebut adalah Stream, tapi sebenarnya ia adalah
StreamExt. Singkatan dari extension (ekstensi), Ext adalah pola yang umum
di komunitas Rust buat memperluas satu trait dengan trait lainnya.
Trait Stream mendefinisikan sebuah low-level interface yang secara efektif
menggabungkan trait Iterator dan Future. StreamExt menyuplai sekumpulan
API tingkat lebih tinggi di atas Stream, termasuk method next sekaligus
method pembantu lainnya yang mirip sama yang disediakan oleh trait Iterator.
Stream dan StreamExt belum menjadi bagian dari standard library milik
Rust, tapi mayoritas crates di ekosistem memakai definisi yang serupa.
Solusi buat error compiler tadi adalah dengan menambahkan statement use buat
trpl::StreamExt, kayak yang ditunjukkan di Listing 17-22.
extern crate trpl; // required for mdbook test
use trpl::StreamExt;
fn main() {
trpl::block_on(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
// --snip--
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Dengan semua bagian itu disatukan, kode ini berjalan sesuai yang kita mau!
Bahkan, sekarang karena kita punya StreamExt di dalam scope, kita bisa
memakai semua method pembantunya, persis kayak iterator.