Menerapkan Konkurensi dengan Async
Di bagian ini, kita bakal menerapkan asinkron ke beberapa tantangan konkurensi yang sama dengan yang sudah kita tangani memakai threads di Bab 16. Karena kita sudah banyak ngomongin ide-ide kuncinya di sana, di bagian ini kita bakal fokus sama apa aja yang berbeda antara threads dan futures.
Di banyak kasus, API buat bekerja bareng konkurensi memakai asinkron itu mirip sekali sama yang dipakai buat threads. Di kasus lain, mereka jadinya lumayan berbeda. Bahkan kalau API-nya kelihatan mirip antara threads dan asinkron, mereka sering kali punya perilaku yang berbeda—dan mereka hampir selalu punya karakteristik performa yang berbeda.
Membikin Task (Tugas) Baru dengan spawn_task
Operasi pertama yang kita tangani di bagian “Membikin Thread Baru dengan
spawn” di Bab 16 adalah menghitung angka di dua
threads terpisah. Mari kita lakukan hal yang sama memakai asinkron. Crate
trpl menyediakan fungsi spawn_task yang kelihatannya mirip sekali sama API
thread::spawn, dan fungsi sleep yang merupakan versi asinkron dari API
thread::sleep. Kita bisa memakai keduanya bersama-sama buat
mengimplementasikan contoh penghitungan angka tersebut, kayak yang ditunjukkan
di Listing 17-6.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
}
Sebagai titik awal, kita menyiapkan fungsi main kita pakai trpl::block_on
supaya fungsi tingkat teratas kita bisa bersifat asinkron.
Catatan: Mulai dari titik ini di bab ini, setiap contoh bakal menyertakan kode pembungkus yang sama persis pakai
trpl::block_ondimain, jadi kita bakal sering mengabaikannya (skip) sama seperti kita mengabaikanmain. Jangan lupa buat menyertakannya di dalam kode kita ya!
Terus kita menulis dua loops di dalam blok tersebut, masing-masing mengandung
pemanggilan trpl::sleep, yang bakal nunggu selama setengah detik (500
milidetik) sebelum mengirim pesan berikutnya. Kita menaruh satu loop di dalam
isi dari trpl::spawn_task dan satu lagi di dalam loop for tingkat teratas.
Kita juga nambahin await setelah pemanggilan sleep.
Kode ini berperilaku mirip sama implementasi berbasis thread—termasuk fakta bahwa kita mungkin bakal melihat pesan-pesannya muncul dalam urutan yang berbeda di terminal kita pas kita menjalankannya:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
Versi ini bakal berhenti begitu loop for di dalam isi blok asinkron utamanya
selesai, karena task yang ditelurkan sama spawn_task bakal dimatikan pas
fungsi main berakhir. Kalau kita pengen task tersebut jalan sampai kelar,
kita perlu memakai join handle buat nungguin task pertamanya selesai. Pas
pakai threads, kita memakai method join buat “memblokir” sampai thread-nya
selesai jalan. Di Listing 17-7, kita bisa memakai await buat ngelakuin hal yang
sama, karena task handle itu sendiri adalah sebuah future. Tipe Output-nya
adalah sebuah Result, jadi kita juga meng-unwrap-nya setelah me-await-nya.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
handle.await.unwrap();
});
}
await bersama join handle buat menjalankan task sampai selesaiVersi yang sudah di-update ini bakal jalan sampai kedua loops selesai:
hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Sejauh ini, kelihatannya asinkron dan threads ngasih kita hasil yang mirip,
cuma beda di sintaksnya aja: memakai await ketimbang memanggil join pada
join handle, dan me-await pemanggilan sleep.
Perbedaan besarnya adalah kita tidak perlu menelurkan (spawn) thread sistem
operasi lainnya buat melakukan hal ini. Faktanya, kita bahkan tidak perlu
menelurkan sebuah task di sini. Karena blok asinkron dikompilasi jadi
futures anonim, kita bisa menaruh tiap loop di dalam blok asinkron lalu
menyuruh runtime menjalankan keduanya sampai selesai memakai fungsi
trpl::join.
Di bagian “Menunggu Semua Thread sampai Selesai”
di Bab 16, kita menunjukkan gimana cara memakai method join pada tipe
JoinHandle yang dikembalikan pas kita memanggil std::thread::spawn. Fungsi
trpl::join itu mirip, tapi buat futures. Pas kita ngasih dia dua futures,
dia bakal memproduksi satu future baru yang output-nya adalah sebuah tuple
berisi output dari tiap future yang kita masukkan tadi begitu keduanya
selesai. Jadi, di Listing 17-8, kita memakai trpl::join buat nungguin baik
fut1 maupun fut2 sampai selesai. Kita tidak me-await fut1 dan fut2
melainkan me-await future baru yang dihasilkan sama trpl::join. Kita
ngabaiin output-nya, karena dia cuma sebuah tuple berisi dua nilai unit.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let fut1 = async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
let fut2 = async {
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
trpl::join(fut1, fut2).await;
});
}
trpl::join buat menunggu dua futures anonimPas kita jalankan ini, kita melihat kedua futures jalan sampai selesai:
hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!
Nah, kita bakal melihat urutan yang sama persis di tiap jalannya, yang mana
berbeda sekali sama apa yang kita lihat di threads dan di trpl::spawn_task
di Listing 17-7. Itu gara-gara fungsi trpl::join ini sifatnya adil (fair),
yang artinya dia mengecek tiap future dengan frekuensi yang sama, ganti-
gantian di antara mereka, dan tidak pernah membiarkan satu pun balapan (race)
mendahului yang lain kalau yang lainnya juga sudah siap. Kalau pakai threads,
sistem operasilah yang menentukan thread mana yang mau dicek dan seberapa
lama dia dibiarkan jalan. Kalau di Rust asinkron, runtime-lah yang menentukan
task mana yang mau dicek. (Di praktiknya, detail-detailnya jadi rumit karena
sebuah async runtime mungkin memakai threads sistem operasi di balik layar
sebagai bagian dari caranya mengelola konkurensi, jadi menjamin keadilan bisa
jadi kerjaan ekstra buat sebuah runtime—tapi tetap mungkin kok!) Runtimes
tidak diwajibkan buat menjamin keadilan buat sembarang operasi, dan mereka
sering kali menawarkan API yang berbeda-beda biar kita bisa milih mau yang adil
atau nggak.
Cobain deh beberapa variasi me-await futures ini dan lihat apa yang mereka lakukan:
- Hapus blok asinkron dari salah satu atau kedua loops tersebut.
- Await tiap blok asinkron seketika setelah mendefinisikannya.
- Bungkus cuma loop pertama saja di dalam blok asinkron, dan await future hasilnya setelah isi dari loop kedua.
Buat tantangan ekstra, coba deh tebak bakal kayak apa output-nya di tiap kasus sebelum kita menjalankan kodenya!
Mengirim Data di Antara Dua Task Memakai Message Passing
Berbagi data antar futures juga bakal terasa familier: kita bakal memakai message passing lagi, tapi kali ini pakai versi asinkron dari tipe-tipe dan fungsi-fungsinya. Kita bakal ngambil jalan yang agak beda dibanding waktu di bagian “Transfer Data antar Threads Memakai Message Passing” di Bab 16 buat mengilustrasikan beberapa perbedaan kunci antara konkurensi berbasis thread dan berbasis futures. Di Listing 17-9, kita bakal mulai dengan cuma satu blok asinkron saja—tidak menelurkan sebuah task terpisah sebagaimana dulu kita menelurkan sebuah thread terpisah.
extern crate trpl; // required for mdbook test
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let val = String::from("hi");
tx.send(val).unwrap();
let received = rx.recv().await.unwrap();
println!("received '{received}'");
});
}
tx dan rxDi sini, kita memakai trpl::channel, sebuah versi asinkron dari API multiple-
producer, single-consumer channel yang kita pakai bareng threads dulu di Bab
16. Versi asinkron dari API-nya cuma beda sedikit dari versi berbasis thread:
ia memakai receiver rx yang bersifat mutable bukannya immutable, dan
method recv-nya menghasilkan sebuah future yang perlu kita await bukannya
menghasilkan nilainya secara langsung. Sekarang kita bisa mengirim pesan dari
sisi pengirim ke sisi penerima. Perhatikan bahwa kita tidak harus menelurkan
sebuah thread terpisah atau bahkan sebuah task; kita cuma butuh me-await
pemanggilan rx.recv.
Method Receiver::recv yang sinkron di std::mpsc::channel memblokir sampai
ia menerima pesan. Method trpl::Receiver::recv tidak memblokir, karena ia
sifatnya asinkron. Alih-alih memblokir, ia menyerahkan kontrol kembali ke
runtime sampai entah ada pesan yang diterima atau sisi pengirim (send side)
dari channel tersebut ditutup. Sebaliknya, kita tidak me-await pemanggilan
send, karena ia tidak memblokir. Dia tidak perlu memblokir karena channel
yang kita pakai buat mengirim ini sifatnya unbounded (tidak dibatasi).
Catatan: Karena semua kode asinkron ini jalan di dalam blok asinkron di dalam pemanggilan
trpl::block_on, semua hal di dalamnya bisa menghindari memblokir. Namun, kode di luar blok tersebut bakal memblokir pada saat fungsiblock_ondikembalikan. Itulah poin utama dari fungsitrpl::block_on: ia membiarkan kita memilih di mana harus memblokir pada sekumpulan kode inkron, dan dengan begitu bisa bertransisi antara kode sinkron dan asinkron.
Perhatikan dua hal soal contoh ini. Pertama, pesannya bakal tiba seketika itu juga. Kedua, meskipun kita memakai sebuah future di sini, belum ada konkurensi sama sekali. Semua hal di dalam listing tersebut terjadi secara berurutan (sequence), persis kayak yang bakal terjadi kalau seandainya tidak ada futures yang dilibatkan.
Mari kita beresin bagian pertamanya dengan mengirim serangkaian pesan dan tidur di antara mereka, kayak yang ditunjukkan di Listing 17-10.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
await di antara tiap pesanSelain mengirim pesannya, kita juga perlu menerimanya. Di kasus ini, karena kita
tahu ada berapa banyak pesan yang masuk, kita bisa melakukan itu secara manual
dengan memanggil rx.recv().await sebanyak empat kali. Tapi di dunia nyata,
kita umumnya bakal nungguin pesan yang jumlahnya tidak diketahui, jadi kita
butuh terus menunggu sampai kita yakin sudah tidak ada pesan lagi.
Di Listing 16-10, kita memakai loop for buat memproses semua item yang
diterima dari sebuah channel yang sinkron. Namun, Rust belum punya cara buat
memakai loop for dengan serangkaian item yang diproduksi secara asinkron,
jadi kita perlu memakai jenis loop yang belum pernah kita lihat sebelumnya:
yaitu perulangan bersyarat while let. Ini adalah versi loop dari konstruk
if let yang sudah kita lihat di Bab 6 di bagian “Control Flow Singkat Memakai
if let dan let...else”. Loop ini bakal terus
dijalankan selama pattern yang ditentukannya terus cocok sama nilainya.
Pemanggilan rx.recv menghasilkan sebuah future, yang kemudian kita await.
Runtime bakal me-pause future tersebut sampai dia siap. Begitu sebuah
pesan tiba, future tersebut bakal selesai (resolve) jadi Some(message)
sebanyak jumlah pesan yang tiba. Saat channel-nya ditutup, terlepas dari
apakah ada pesan yang tiba atau nggak, future tersebut bakal selesai jadi
None buat mengindikasikan kalau sudah tidak ada lagi nilainya dan oleh karena
itu kita harus berhenti melakukan polling—yakni, berhenti me-await.
Loop while let menggabungkan ini semua. Kalau hasil dari pemanggilan
rx.recv().await adalah Some(message), kita dapet akses ke pesannya dan kita
bisa memakainya di dalam isi loop, persis kayak pas pakai if let. Kalau
hasilnya None, loop-nya berakhir. Setiap kali loop-nya selesai, dia kena
await point lagi, jadi runtime me-pause-nya lagi sampai ada pesan lain
yang tiba.
Kodenya sekarang berhasil mengirim dan menerima semua pesan tersebut. Sayangnya, masih ada beberapa masalah. Salah satunya, pesan-pesannya tidak tiba dalam interval setengah detik. Mereka tiba semuanya sekaligus, 2 detik (2.000 milidetik) setelah kita menyalakan programnya. Terus, program ini juga tidak pernah berakhir! Sebaliknya, ia menunggu selamanya buat pesan baru. Kita bakal perlu mematikannya memakai ctrl-C.
Kode di Dalam Satu Blok Asinkron Dieksekusi secara Linear
Mari kita mulai dengan menyelidiki kenapa pesan-pesannya datang sekaligus
setelah penundaan penuh (full delay), bukannya datang dengan jeda di tiap
pesannya. Di dalam sebuah blok asinkron tertentu, urutan munculnya keyword
await di dalam kode juga merupakan urutan saat mereka dieksekusi pas programnya
jalan.
Cuma ada satu blok asinkron di Listing 17-10, jadi semua hal di dalamnya jalan
secara linear. Masih belum ada konkurensi. Semua pemanggilan tx.send terjadi,
diselingi sama semua pemanggilan trpl::sleep dan await points terkaitnya.
Baru setelah itu loop while let dapat giliran buat melewati titik await
apa pun pada pemanggilan recv.
Buat mendapatkan perilaku yang kita mau, di mana jeda tidurnya (sleep delay)
terjadi di antara tiap pesan, kita perlu menaruh operasi tx dan rx di dalam
blok asinkronnya masing-masing, kayak yang ditunjukkan di Listing 17-11. Terus
si runtime bisa mengeksekusi masing-masing blok secara terpisah memakai
trpl::join, persis kayak di Listing 17-8. Sekali lagi, kita me-await hasil
pemanggilan trpl::join, bukannya me-await futures individunya. Kalau kita
me-await futures individunya secara berurutan, kita ujung-ujungnya cuma
bakal balik lagi ke alur sekuensial—persis kayak apa yang mau kita hindari.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
send dan recv ke dalam blok asinkronnya masing-masing dan menunggu futures dari blok-blok tersebutDengan kode yang sudah di-update di Listing 17-11, pesan-pesannya dicetak dalam interval 500 milidetik, bukannya buru-buru sekaligus setelah 2 detik.
Memindahkan Kepemilikan (Ownership) ke dalam Blok Asinkron
Meskipun begitu, programnya tetap tidak pernah berakhir, karena cara loop
while let berinteraksi sama trpl::join:
- Future yang dikembalikan dari
trpl::joincuma selesai begitu kedua futures yang diberikan ke dia sudah selesai. - Future
tx_futselesai begitu dia kelar tidur setelah mengirim pesan terakhir di dalamvals. - Future
rx_futtidak bakal selesai sampai loopwhile letberakhir. - Loop
while lettidak bakal berakhir sampai me-awaitrx.recvmenghasilkanNone. - Me-await
rx.recvbakal mengembalikanNonecuma kalau sisi lain dari channel-nya sudah ditutup. - Channel-nya bakal tutup cuma kalau kita memanggil
rx.closeatau pas sisi pengirimnya,tx, di-drop. - Kita tidak memanggil
rx.closedi mana pun, dantxtidak bakal di-drop sampai blok asinkron terluar yang diberikan ketrpl::block_onberakhir. - Blok tersebut tidak bisa berakhir karena dia terhambat (blocked) menunggu
trpl::joinselesai, yang mana membawa kita balik lagi ke urutan paling atas dari daftar ini.
Saat ini, blok asinkron tempat kita mengirim pesannya cuma meminjam tx
karena mengirim pesan tidak mewajibkan adanya kepemilikan, tapi kalau seandainya
kita bisa memindahkan (move) tx ke dalam blok asinkron tersebut, dia bakal
di-drop begitu blok itu berakhir. Di Bab 13 di bagian “Menangkap Referensi
atau Memindahkan Kepemilikan”, kita sudah
mempelajari cara memakai keyword move bersama closures, dan, seperti yang
dibahas di bagian “Memakai move Closures bersama Threads” di Bab 16, kita sering kali perlu memindahkan data ke dalam
closures saat bekerja dengan threads. Dinamika dasar yang sama ini juga
berlaku buat blok asinkron, jadi keyword move juga bekerja di blok asinkron
sama seperti di closures.
Di Listing 17-12, kita mengubah blok yang dipakai buat mengirim pesan dari
async jadi async move.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async move {
// --snip--
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
Pas kita menjalankan versi kode yang ini, dia bakal berhenti secara anggun begitu pesan terakhir sudah dikirim dan diterima. Berikutnya, mari kita lihat apa yang butuh diubah buat mengirim data dari lebih dari satu future.
Menggabungkan (Joining) Sejumlah Futures dengan Macro join!
Async channel ini juga merupakan multiple-producer channel, jadi kita bisa
memanggil clone pada tx kalau kita mau mengirim pesan dari banyak futures,
kayak yang ditunjukkan di Listing 17-13.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}
};
trpl::join!(tx1_fut, tx_fut, rx_fut);
});
}
Pertama, kita meng-clone tx, membikin tx1 di luar blok asinkron pertama.
Kita memindahkan tx1 ke dalam blok tersebut sama kayak yang kita lakukan
sebelumnya sama tx. Terus, belakangan, kita memindahkan tx asli ke dalam
blok asinkron baru, di mana kita mengirim lebih banyak pesan dengan jeda yang
sedikit lebih lambat. Kebetulan kita menaruh blok asinkron baru ini setelah blok
asinkron buat menerima pesan, tapi kita juga bisa kok menaruhnya sebelumnya.
Kuncinya adalah urutan pas futures-nya di-await, bukan urutan pas mereka
dibikin.
Kedua blok asinkron buat mengirim pesannya wajib berupa blok async move supaya
baik tx maupun tx1 di-drop pas blok-blok tersebut selesai. Kalau tidak,
kita bakal balik lagi ke perulangan tiada henti yang sama kayak di awal tadi.
Terakhir, kita beralih dari trpl::join ke trpl::join! buat menangani
future tambahannya: macro join! me-await jumlah futures yang sembarang
asalkan kita sudah tahu jumlah futures-nya pas masa kompilasi. Kita bakal
ngebahas soal menunggu sekumpulan futures yang jumlahnya tidak diketahui nanti
di bab ini.
Sekarang kita bisa melihat semua pesan dari kedua futures pengirimnya, dan karena futures pengirimnya memakai jeda yang sedikit berbeda setelah mengirim, pesan-pesannya juga diterima dalam interval yang berbeda tersebut:
received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'
Kita sudah mengeksplorasi cara memakai message passing buat mengirim data antar futures, gimana kode di dalam blok asinkron jalan secara sekuensial, gimana cara memindahkan kepemilikan ke dalam blok asinkron, dan gimana cara menggabungkan banyak futures. Berikutnya, mari kita bahas gimana dan kenapa harus kasih tahu runtime kalau dia bisa beralih ke tugas lain.