Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Menerapkan Konkurensi dengan Async

Di bagian ini, kita bakal menerapkan asinkron ke beberapa tantangan konkurensi yang sama dengan yang sudah kita tangani memakai threads di Bab 16. Karena kita sudah banyak ngomongin ide-ide kuncinya di sana, di bagian ini kita bakal fokus sama apa aja yang berbeda antara threads dan futures.

Di banyak kasus, API buat bekerja bareng konkurensi memakai asinkron itu mirip sekali sama yang dipakai buat threads. Di kasus lain, mereka jadinya lumayan berbeda. Bahkan kalau API-nya kelihatan mirip antara threads dan asinkron, mereka sering kali punya perilaku yang berbeda—dan mereka hampir selalu punya karakteristik performa yang berbeda.

Membikin Task (Tugas) Baru dengan spawn_task

Operasi pertama yang kita tangani di bagian “Membikin Thread Baru dengan spawn di Bab 16 adalah menghitung angka di dua threads terpisah. Mari kita lakukan hal yang sama memakai asinkron. Crate trpl menyediakan fungsi spawn_task yang kelihatannya mirip sekali sama API thread::spawn, dan fungsi sleep yang merupakan versi asinkron dari API thread::sleep. Kita bisa memakai keduanya bersama-sama buat mengimplementasikan contoh penghitungan angka tersebut, kayak yang ditunjukkan di Listing 17-6.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}
Listing 17-6: Membikin task baru buat mencetak satu hal sementara task utama mencetak hal lainnya

Sebagai titik awal, kita menyiapkan fungsi main kita pakai trpl::block_on supaya fungsi tingkat teratas kita bisa bersifat asinkron.

Catatan: Mulai dari titik ini di bab ini, setiap contoh bakal menyertakan kode pembungkus yang sama persis pakai trpl::block_on di main, jadi kita bakal sering mengabaikannya (skip) sama seperti kita mengabaikan main. Jangan lupa buat menyertakannya di dalam kode kita ya!

Terus kita menulis dua loops di dalam blok tersebut, masing-masing mengandung pemanggilan trpl::sleep, yang bakal nunggu selama setengah detik (500 milidetik) sebelum mengirim pesan berikutnya. Kita menaruh satu loop di dalam isi dari trpl::spawn_task dan satu lagi di dalam loop for tingkat teratas. Kita juga nambahin await setelah pemanggilan sleep.

Kode ini berperilaku mirip sama implementasi berbasis thread—termasuk fakta bahwa kita mungkin bakal melihat pesan-pesannya muncul dalam urutan yang berbeda di terminal kita pas kita menjalankannya:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

Versi ini bakal berhenti begitu loop for di dalam isi blok asinkron utamanya selesai, karena task yang ditelurkan sama spawn_task bakal dimatikan pas fungsi main berakhir. Kalau kita pengen task tersebut jalan sampai kelar, kita perlu memakai join handle buat nungguin task pertamanya selesai. Pas pakai threads, kita memakai method join buat “memblokir” sampai thread-nya selesai jalan. Di Listing 17-7, kita bisa memakai await buat ngelakuin hal yang sama, karena task handle itu sendiri adalah sebuah future. Tipe Output-nya adalah sebuah Result, jadi kita juga meng-unwrap-nya setelah me-await-nya.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}
Listing 17-7: Memakai await bersama join handle buat menjalankan task sampai selesai

Versi yang sudah di-update ini bakal jalan sampai kedua loops selesai:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Sejauh ini, kelihatannya asinkron dan threads ngasih kita hasil yang mirip, cuma beda di sintaksnya aja: memakai await ketimbang memanggil join pada join handle, dan me-await pemanggilan sleep.

Perbedaan besarnya adalah kita tidak perlu menelurkan (spawn) thread sistem operasi lainnya buat melakukan hal ini. Faktanya, kita bahkan tidak perlu menelurkan sebuah task di sini. Karena blok asinkron dikompilasi jadi futures anonim, kita bisa menaruh tiap loop di dalam blok asinkron lalu menyuruh runtime menjalankan keduanya sampai selesai memakai fungsi trpl::join.

Di bagian “Menunggu Semua Thread sampai Selesai” di Bab 16, kita menunjukkan gimana cara memakai method join pada tipe JoinHandle yang dikembalikan pas kita memanggil std::thread::spawn. Fungsi trpl::join itu mirip, tapi buat futures. Pas kita ngasih dia dua futures, dia bakal memproduksi satu future baru yang output-nya adalah sebuah tuple berisi output dari tiap future yang kita masukkan tadi begitu keduanya selesai. Jadi, di Listing 17-8, kita memakai trpl::join buat nungguin baik fut1 maupun fut2 sampai selesai. Kita tidak me-await fut1 dan fut2 melainkan me-await future baru yang dihasilkan sama trpl::join. Kita ngabaiin output-nya, karena dia cuma sebuah tuple berisi dua nilai unit.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}
Listing 17-8: Memakai trpl::join buat menunggu dua futures anonim

Pas kita jalankan ini, kita melihat kedua futures jalan sampai selesai:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Nah, kita bakal melihat urutan yang sama persis di tiap jalannya, yang mana berbeda sekali sama apa yang kita lihat di threads dan di trpl::spawn_task di Listing 17-7. Itu gara-gara fungsi trpl::join ini sifatnya adil (fair), yang artinya dia mengecek tiap future dengan frekuensi yang sama, ganti- gantian di antara mereka, dan tidak pernah membiarkan satu pun balapan (race) mendahului yang lain kalau yang lainnya juga sudah siap. Kalau pakai threads, sistem operasilah yang menentukan thread mana yang mau dicek dan seberapa lama dia dibiarkan jalan. Kalau di Rust asinkron, runtime-lah yang menentukan task mana yang mau dicek. (Di praktiknya, detail-detailnya jadi rumit karena sebuah async runtime mungkin memakai threads sistem operasi di balik layar sebagai bagian dari caranya mengelola konkurensi, jadi menjamin keadilan bisa jadi kerjaan ekstra buat sebuah runtime—tapi tetap mungkin kok!) Runtimes tidak diwajibkan buat menjamin keadilan buat sembarang operasi, dan mereka sering kali menawarkan API yang berbeda-beda biar kita bisa milih mau yang adil atau nggak.

Cobain deh beberapa variasi me-await futures ini dan lihat apa yang mereka lakukan:

  • Hapus blok asinkron dari salah satu atau kedua loops tersebut.
  • Await tiap blok asinkron seketika setelah mendefinisikannya.
  • Bungkus cuma loop pertama saja di dalam blok asinkron, dan await future hasilnya setelah isi dari loop kedua.

Buat tantangan ekstra, coba deh tebak bakal kayak apa output-nya di tiap kasus sebelum kita menjalankan kodenya!

Mengirim Data di Antara Dua Task Memakai Message Passing

Berbagi data antar futures juga bakal terasa familier: kita bakal memakai message passing lagi, tapi kali ini pakai versi asinkron dari tipe-tipe dan fungsi-fungsinya. Kita bakal ngambil jalan yang agak beda dibanding waktu di bagian “Transfer Data antar Threads Memakai Message Passing” di Bab 16 buat mengilustrasikan beberapa perbedaan kunci antara konkurensi berbasis thread dan berbasis futures. Di Listing 17-9, kita bakal mulai dengan cuma satu blok asinkron saja—tidak menelurkan sebuah task terpisah sebagaimana dulu kita menelurkan sebuah thread terpisah.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}
Listing 17-9: Membikin sebuah async channel dan memberikan kedua bagiannya ke tx dan rx

Di sini, kita memakai trpl::channel, sebuah versi asinkron dari API multiple- producer, single-consumer channel yang kita pakai bareng threads dulu di Bab 16. Versi asinkron dari API-nya cuma beda sedikit dari versi berbasis thread: ia memakai receiver rx yang bersifat mutable bukannya immutable, dan method recv-nya menghasilkan sebuah future yang perlu kita await bukannya menghasilkan nilainya secara langsung. Sekarang kita bisa mengirim pesan dari sisi pengirim ke sisi penerima. Perhatikan bahwa kita tidak harus menelurkan sebuah thread terpisah atau bahkan sebuah task; kita cuma butuh me-await pemanggilan rx.recv.

Method Receiver::recv yang sinkron di std::mpsc::channel memblokir sampai ia menerima pesan. Method trpl::Receiver::recv tidak memblokir, karena ia sifatnya asinkron. Alih-alih memblokir, ia menyerahkan kontrol kembali ke runtime sampai entah ada pesan yang diterima atau sisi pengirim (send side) dari channel tersebut ditutup. Sebaliknya, kita tidak me-await pemanggilan send, karena ia tidak memblokir. Dia tidak perlu memblokir karena channel yang kita pakai buat mengirim ini sifatnya unbounded (tidak dibatasi).

Catatan: Karena semua kode asinkron ini jalan di dalam blok asinkron di dalam pemanggilan trpl::block_on, semua hal di dalamnya bisa menghindari memblokir. Namun, kode di luar blok tersebut bakal memblokir pada saat fungsi block_on dikembalikan. Itulah poin utama dari fungsi trpl::block_on: ia membiarkan kita memilih di mana harus memblokir pada sekumpulan kode inkron, dan dengan begitu bisa bertransisi antara kode sinkron dan asinkron.

Perhatikan dua hal soal contoh ini. Pertama, pesannya bakal tiba seketika itu juga. Kedua, meskipun kita memakai sebuah future di sini, belum ada konkurensi sama sekali. Semua hal di dalam listing tersebut terjadi secara berurutan (sequence), persis kayak yang bakal terjadi kalau seandainya tidak ada futures yang dilibatkan.

Mari kita beresin bagian pertamanya dengan mengirim serangkaian pesan dan tidur di antara mereka, kayak yang ditunjukkan di Listing 17-10.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}
Listing 17-10: Mengirim dan menerima banyak pesan melewati async channel dan tidur dengan sebuah await di antara tiap pesan

Selain mengirim pesannya, kita juga perlu menerimanya. Di kasus ini, karena kita tahu ada berapa banyak pesan yang masuk, kita bisa melakukan itu secara manual dengan memanggil rx.recv().await sebanyak empat kali. Tapi di dunia nyata, kita umumnya bakal nungguin pesan yang jumlahnya tidak diketahui, jadi kita butuh terus menunggu sampai kita yakin sudah tidak ada pesan lagi.

Di Listing 16-10, kita memakai loop for buat memproses semua item yang diterima dari sebuah channel yang sinkron. Namun, Rust belum punya cara buat memakai loop for dengan serangkaian item yang diproduksi secara asinkron, jadi kita perlu memakai jenis loop yang belum pernah kita lihat sebelumnya: yaitu perulangan bersyarat while let. Ini adalah versi loop dari konstruk if let yang sudah kita lihat di Bab 6 di bagian “Control Flow Singkat Memakai if let dan let...else. Loop ini bakal terus dijalankan selama pattern yang ditentukannya terus cocok sama nilainya.

Pemanggilan rx.recv menghasilkan sebuah future, yang kemudian kita await. Runtime bakal me-pause future tersebut sampai dia siap. Begitu sebuah pesan tiba, future tersebut bakal selesai (resolve) jadi Some(message) sebanyak jumlah pesan yang tiba. Saat channel-nya ditutup, terlepas dari apakah ada pesan yang tiba atau nggak, future tersebut bakal selesai jadi None buat mengindikasikan kalau sudah tidak ada lagi nilainya dan oleh karena itu kita harus berhenti melakukan polling—yakni, berhenti me-await.

Loop while let menggabungkan ini semua. Kalau hasil dari pemanggilan rx.recv().await adalah Some(message), kita dapet akses ke pesannya dan kita bisa memakainya di dalam isi loop, persis kayak pas pakai if let. Kalau hasilnya None, loop-nya berakhir. Setiap kali loop-nya selesai, dia kena await point lagi, jadi runtime me-pause-nya lagi sampai ada pesan lain yang tiba.

Kodenya sekarang berhasil mengirim dan menerima semua pesan tersebut. Sayangnya, masih ada beberapa masalah. Salah satunya, pesan-pesannya tidak tiba dalam interval setengah detik. Mereka tiba semuanya sekaligus, 2 detik (2.000 milidetik) setelah kita menyalakan programnya. Terus, program ini juga tidak pernah berakhir! Sebaliknya, ia menunggu selamanya buat pesan baru. Kita bakal perlu mematikannya memakai ctrl-C.

Kode di Dalam Satu Blok Asinkron Dieksekusi secara Linear

Mari kita mulai dengan menyelidiki kenapa pesan-pesannya datang sekaligus setelah penundaan penuh (full delay), bukannya datang dengan jeda di tiap pesannya. Di dalam sebuah blok asinkron tertentu, urutan munculnya keyword await di dalam kode juga merupakan urutan saat mereka dieksekusi pas programnya jalan.

Cuma ada satu blok asinkron di Listing 17-10, jadi semua hal di dalamnya jalan secara linear. Masih belum ada konkurensi. Semua pemanggilan tx.send terjadi, diselingi sama semua pemanggilan trpl::sleep dan await points terkaitnya. Baru setelah itu loop while let dapat giliran buat melewati titik await apa pun pada pemanggilan recv.

Buat mendapatkan perilaku yang kita mau, di mana jeda tidurnya (sleep delay) terjadi di antara tiap pesan, kita perlu menaruh operasi tx dan rx di dalam blok asinkronnya masing-masing, kayak yang ditunjukkan di Listing 17-11. Terus si runtime bisa mengeksekusi masing-masing blok secara terpisah memakai trpl::join, persis kayak di Listing 17-8. Sekali lagi, kita me-await hasil pemanggilan trpl::join, bukannya me-await futures individunya. Kalau kita me-await futures individunya secara berurutan, kita ujung-ujungnya cuma bakal balik lagi ke alur sekuensial—persis kayak apa yang mau kita hindari.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-11: Memisahkan send dan recv ke dalam blok asinkronnya masing-masing dan menunggu futures dari blok-blok tersebut

Dengan kode yang sudah di-update di Listing 17-11, pesan-pesannya dicetak dalam interval 500 milidetik, bukannya buru-buru sekaligus setelah 2 detik.

Memindahkan Kepemilikan (Ownership) ke dalam Blok Asinkron

Meskipun begitu, programnya tetap tidak pernah berakhir, karena cara loop while let berinteraksi sama trpl::join:

  • Future yang dikembalikan dari trpl::join cuma selesai begitu kedua futures yang diberikan ke dia sudah selesai.
  • Future tx_fut selesai begitu dia kelar tidur setelah mengirim pesan terakhir di dalam vals.
  • Future rx_fut tidak bakal selesai sampai loop while let berakhir.
  • Loop while let tidak bakal berakhir sampai me-await rx.recv menghasilkan None.
  • Me-await rx.recv bakal mengembalikan None cuma kalau sisi lain dari channel-nya sudah ditutup.
  • Channel-nya bakal tutup cuma kalau kita memanggil rx.close atau pas sisi pengirimnya, tx, di-drop.
  • Kita tidak memanggil rx.close di mana pun, dan tx tidak bakal di-drop sampai blok asinkron terluar yang diberikan ke trpl::block_on berakhir.
  • Blok tersebut tidak bisa berakhir karena dia terhambat (blocked) menunggu trpl::join selesai, yang mana membawa kita balik lagi ke urutan paling atas dari daftar ini.

Saat ini, blok asinkron tempat kita mengirim pesannya cuma meminjam tx karena mengirim pesan tidak mewajibkan adanya kepemilikan, tapi kalau seandainya kita bisa memindahkan (move) tx ke dalam blok asinkron tersebut, dia bakal di-drop begitu blok itu berakhir. Di Bab 13 di bagian “Menangkap Referensi atau Memindahkan Kepemilikan”, kita sudah mempelajari cara memakai keyword move bersama closures, dan, seperti yang dibahas di bagian “Memakai move Closures bersama Threads” di Bab 16, kita sering kali perlu memindahkan data ke dalam closures saat bekerja dengan threads. Dinamika dasar yang sama ini juga berlaku buat blok asinkron, jadi keyword move juga bekerja di blok asinkron sama seperti di closures.

Di Listing 17-12, kita mengubah blok yang dipakai buat mengirim pesan dari async jadi async move.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}
Listing 17-12: Revisi dari kode di Listing 17-11 yang mematikan program secara benar begitu selesai

Pas kita menjalankan versi kode yang ini, dia bakal berhenti secara anggun begitu pesan terakhir sudah dikirim dan diterima. Berikutnya, mari kita lihat apa yang butuh diubah buat mengirim data dari lebih dari satu future.

Menggabungkan (Joining) Sejumlah Futures dengan Macro join!

Async channel ini juga merupakan multiple-producer channel, jadi kita bisa memanggil clone pada tx kalau kita mau mengirim pesan dari banyak futures, kayak yang ditunjukkan di Listing 17-13.

Filename: src/main.rs
extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::block_on(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}
Listing 17-13: Memakai multiple producers bersama blok asinkron

Pertama, kita meng-clone tx, membikin tx1 di luar blok asinkron pertama. Kita memindahkan tx1 ke dalam blok tersebut sama kayak yang kita lakukan sebelumnya sama tx. Terus, belakangan, kita memindahkan tx asli ke dalam blok asinkron baru, di mana kita mengirim lebih banyak pesan dengan jeda yang sedikit lebih lambat. Kebetulan kita menaruh blok asinkron baru ini setelah blok asinkron buat menerima pesan, tapi kita juga bisa kok menaruhnya sebelumnya. Kuncinya adalah urutan pas futures-nya di-await, bukan urutan pas mereka dibikin.

Kedua blok asinkron buat mengirim pesannya wajib berupa blok async move supaya baik tx maupun tx1 di-drop pas blok-blok tersebut selesai. Kalau tidak, kita bakal balik lagi ke perulangan tiada henti yang sama kayak di awal tadi.

Terakhir, kita beralih dari trpl::join ke trpl::join! buat menangani future tambahannya: macro join! me-await jumlah futures yang sembarang asalkan kita sudah tahu jumlah futures-nya pas masa kompilasi. Kita bakal ngebahas soal menunggu sekumpulan futures yang jumlahnya tidak diketahui nanti di bab ini.

Sekarang kita bisa melihat semua pesan dari kedua futures pengirimnya, dan karena futures pengirimnya memakai jeda yang sedikit berbeda setelah mengirim, pesan-pesannya juga diterima dalam interval yang berbeda tersebut:

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Kita sudah mengeksplorasi cara memakai message passing buat mengirim data antar futures, gimana kode di dalam blok asinkron jalan secara sekuensial, gimana cara memindahkan kepemilikan ke dalam blok asinkron, dan gimana cara menggabungkan banyak futures. Berikutnya, mari kita bahas gimana dan kenapa harus kasih tahu runtime kalau dia bisa beralih ke tugas lain.