Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Streams: Futures dalam Urutan

Ingat kembali gimana cara kita memakai receiver buat asinkron channel kita di awal bab ini di bagian “Message Passing”. Method asinkron recv memproduksi serangkaian item seiring berjalannya waktu. Ini adalah salah satu bentuk dari pola yang jauh lebih umum yang dikenal sebagai stream. Banyak konsep yang secara alami bisa direpresentasikan sebagai streams: item-item yang mulai tersedia di dalam antrean (queue), potongan data yang ditarik secara bertahap dari sistem file pas set datanya terlalu besar buat memori komputer, atau data yang tiba melalui jaringan seiring waktu. Karena streams adalah futures, kita bisa memakainya bareng jenis future lainnya dan menggabungkannya dengan cara-cara yang menarik. Misalnya, kita bisa mengumpulkan (batch up) kejadian-kejadian (events) buat menghindari terlalu banyak pemanggilan jaringan, mengatur batas waktu (timeouts) pada urutan operasi yang berjalan lama, atau membatasi (throttle) kejadian-kejadian UI buat menghindari melakukan pekerjaan yang sia-sia.

Kita sudah pernah melihat serangkaian item di Bab 13, pas kita melihat trait Iterator di bagian “Trait Iterator dan Method next, tapi ada dua perbedaan antara iterator dan asinkron channel receiver. Perbedaan pertama adalah waktu: iterator itu sinkron, sedangkan channel receiver itu asinkron. Perbedaan kedua adalah API-nya. Pas bekerja langsung dengan Iterator, kita memanggil method next-nya yang sinkron. Khusus buat stream trpl::Receiver, kita memanggil method recv yang asinkron sebagai gantinya. Di luar itu, API-API ini terasa sangat mirip, dan kemiripan itu bukan kebetulan kok. Sebuah stream itu kayak bentuk asinkron dari iterasi. Meskipun trpl::Receiver secara spesifik menunggu buat menerima pesan, tapi API stream yang buat tujuan umum itu jauh lebih luas: dia menyediakan item berikutnya sama kayak yang dilakukan Iterator, tapi secara asinkron.

Kemiripan antara iterator dan streams di Rust artinya kita sebenarnya bisa membikin sebuah stream dari iterator apa saja. Sama kayak iterator, kita bisa bekerja bareng sebuah stream dengan memanggil method next-nya lalu me-await output-nya, kayak di Listing 17-21, yang mana kode ini belum bisa di-compile.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-21: Membikin sebuah stream dari sebuah iterator dan mencetak nilai-nilainya

Kita mulai dengan sebuah array angka, yang kemudian kita ubah jadi sebuah iterator lalu memanggil map buat melipatgandakan semua nilainya. Terus kita mengubah iterator tersebut jadi sebuah stream menggunakan fungsi trpl::stream_from_iter. Berikutnya, kita melakukan loop melewati item-item di dalam stream tersebut pas mereka tiba memakai loop while let.

Sayangnya, pas kita coba menjalankan kodenya, ia tidak bisa di-compile melainkan malah melaporkan kalau tidak ada method next yang tersedia:

error[E0599]: no method named `next` found for struct `tokio_stream::iter::Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

Sesuai penjelasan output ini, alasan error compiler-nya adalah karena kita butuh trait yang tepat berada di dalam scope supaya bisa memakai method next. Mengingat pembahasan kita sejauh ini, kita mungkin secara wajar berekspektasi kalau trait tersebut adalah Stream, tapi sebenarnya ia adalah StreamExt. Singkatan dari extension (ekstensi), Ext adalah pola yang umum di komunitas Rust buat memperluas satu trait dengan trait lainnya.

Trait Stream mendefinisikan sebuah low-level interface yang secara efektif menggabungkan trait Iterator dan Future. StreamExt menyuplai sekumpulan API tingkat lebih tinggi di atas Stream, termasuk method next sekaligus method pembantu lainnya yang mirip sama yang disediakan oleh trait Iterator. Stream dan StreamExt belum menjadi bagian dari standard library milik Rust, tapi mayoritas crates di ekosistem memakai definisi yang serupa.

Solusi buat error compiler tadi adalah dengan menambahkan statement use buat trpl::StreamExt, kayak yang ditunjukkan di Listing 17-22.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::block_on(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        // --snip--
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}
Listing 17-22: Berhasil memakai sebuah iterator sebagai dasar buat sebuah stream

Dengan semua bagian itu disatukan, kode ini berjalan sesuai yang kita mau! Bahkan, sekarang karena kita punya StreamExt di dalam scope, kita bisa memakai semua method pembantunya, persis kayak iterator.