Memakai Message Passing buat Mentransfer Data Antar Threads
Satu pendekatan yang makin populer buat memastikan konkurensi yang aman adalah message passing (pengiriman pesan), di mana threads atau actors berkomunikasi dengan mengirimkan pesan berisi data ke satu sama lain. Ide ini digambarkan dalam sebuah slogan dari dokumentasi bahasa Go: “Jangan berkomunikasi dengan membagikan (sharing) memori; sebaliknya, bagikan memori dengan berkomunikasi.”
Buat mencapai konkurensi pengiriman pesan ini, standard library Rust menyediakan sebuah implementasi dari saluran (channels). Sebuah channel adalah konsep pemrograman umum di mana data dikirim dari satu thread ke thread lainnya.
Kita bisa membayangkan sebuah channel di dalam pemrograman itu kayak saluran air yang mengalir ke satu arah, seperti sungai atau selokan. Kalau kita menaruh sesuatu kayak bebek karet ke dalam sungai, dia bakal mengalir ke hilir (downstream) sampai ke ujung saluran air tersebut.
Sebuah channel punya dua paruh: sebuah transmitter (pemancar/pengirim) dan sebuah receiver (penerima). Paruh transmitter adalah lokasi hulu (upstream) tempat kita menaruh bebek karetnya ke dalam sungai, dan paruh receiver adalah hilir tempat si bebek karet pada akhirnya berlabuh. Satu bagian dari kode kita memanggil method-method di transmitter dengan data yang mau kita kirim, dan bagian lain mengecek ujung penerima (receiving end) buat melihat pesan yang datang. Sebuah channel dikatakan closed (tertutup) kalau entah paruh transmitter atau receiver-nya di-drop (dibuang).
Di sini, kita bakal perlahan ngebangun sebuah program yang punya satu thread buat nge-generate nilai dan mengirimkannya ke dalam sebuah channel, dan satu thread lain yang bakal menerima nilai-nilai tersebut lalu mencetaknya ke layar. Kita bakal mengirim nilai-nilai sederhana antar threads memakai sebuah channel buat mengilustrasikan fitur ini. Begitu kita udah terbiasa sama tekniknya, kita bisa memakai channels buat threads mana aja yang butuh berkomunikasi satu sama lain, kayak sistem chat atau sistem di mana banyak threads melakukan bagian-bagian dari sebuah perhitungan lalu mengirim bagian-bagian tersebut ke satu thread yang mengagregasikan (mengumpulkan) hasilnya.
Pertama, di Listing 16-6, kita bakal membikin sebuah channel tapi tidak melakukan apa-apa dengannya. Perhatikan bahwa ini belum bisa di-compile karena Rust tidak tahu tipe nilai apa yang mau kita kirim lewat channel ini.
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel();
}
tx dan rxKita membikin sebuah channel baru memakai fungsi mpsc::channel; mpsc
adalah singkatan dari multiple producer, single consumer (banyak
penghasil, satu konsumen). Singkatnya, cara standard library Rust
mengimplementasikan channels berarti sebuah channel bisa punya banyak
ujung pengirim (sending ends) yang memproduksi nilai tapi cuma bisa punya
satu ujung penerima (receiving end) yang mengonsumsi nilai-nilai tersebut.
Bayangin ada banyak aliran sungai kecil yang ngalir bersatu jadi satu
sungai besar: apa pun yang dikirim ke salah satu sungai kecil itu bakal
berakhir di satu sungai besar tersebut di ujungnya. Kita bakal mulai dengan
satu produsen (producer) aja buat sekarang, tapi kita bakal nambahin banyak
produsen pas contoh ini udah bisa jalan.
Fungsi mpsc::channel mengembalikan sebuah tuple, yang elemen pertamanya
adalah ujung pengirim (the sending end)—yaitu si transmitter—dan elemen
keduanya adalah ujung penerima (the receiving end)—yaitu si receiver.
Singkatan tx dan rx secara tradisional sering dipakai di banyak bidang
untuk masing-masing transmitter dan receiver, jadi kita menamai variabel
kita dengan nama tersebut buat mengindikasikan setiap ujungnya. Kita
memakai statement let dengan sebuah pola (pattern) yang men-destructure
tuples tersebut; kita bakal membahas pemakaian pola di dalam statements
let dan destructuring di Bab 19. Buat sekarang, ketahui aja kalau
memakai statement let dengan cara ini adalah pendekatan yang nyaman buat
mengekstrak potongan-potongan dari tuple yang dikembalikan sama
mpsc::channel.
Mari kita pindahkan ujung pengirim (transmitting end) ke dalam spawned thread lalu suruh dia mengirim satu string supaya spawned thread tersebut berkomunikasi sama main thread, seperti yang ditunjukkan di Listing 16-7. Ini ibarat naruh bebek karet di bagian hulu sungai atau ngirim pesan chat dari satu thread ke thread lainnya.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
}
tx ke dalam sebuah spawned thread dan mengirim "hi"Sekali lagi, kita memakai thread::spawn buat membikin thread baru lalu
memakai move buat memindahkan tx ke dalam closure supaya spawned
thread tersebut memiliki (owns) tx. Spawned thread perlu memiliki si
transmitter supaya bisa mengirim pesan lewat channel.
Transmitter punya sebuah method send yang menerima nilai yang mau kita
kirim. Method send mengembalikan sebuah tipe Result<T, E>, jadi kalau
receiver-nya ternyata sudah di-drop dan tidak ada tempat lagi buat ngirim
nilai, operasi pengirimannya (send) bakal mengembalikan sebuah error. Di
contoh ini, kita memanggil unwrap buat panic seandainya terjadi error. Tapi
di aplikasi betulan (real application), kita bakal menanganinya dengan benar:
silakan kembali ke Bab 9 buat me-review strategi-strategi buat penanganan
error yang tepat.
Di Listing 16-8, kita bakal mengambil nilainya dari receiver di dalam main thread. Ini ibarat memungut bebek karet dari air di ujung hilir sungai atau menerima sebuah pesan chat.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
"hi" di dalam main thread dan mencetaknyaReceiver punya dua method yang berguna: recv dan try_recv. Kita
memakai recv, singkatan dari receive (menerima), yang bakal memblokir
eksekusi dari main thread dan menunggu sampai sebuah nilai dikirim lewat
channel tersebut. Begitu ada nilai yang dikirim, recv bakal
mengembalikannya di dalam sebuah Result<T, E>. Saat transmitter ditutup,
recv bakal mengembalikan sebuah error buat memberi sinyal bahwa tidak akan
ada lagi nilai yang bakal datang.
Method try_recv tidak melakukan pemblokiran (doesn’t block), tapi ia bakal
langsung mengembalikan sebuah Result<T, E>: nilai Ok yang memegang sebuah
pesan kalau pesannya lagi tersedia dan nilai Err kalau tidak ada pesan
sama sekali saat ini. Memakai try_recv berguna kalau thread ini punya
kerjaan lain yang harus dilakuin sambil nunggu pesan: kita bisa nulis
sebuah loop yang memanggil try_recv sesekali, menangani pesannya kalau
lagi tersedia, dan kalau enggak, ngerjain tugas lain dulu sebentar sampai
waktunya ngecek lagi.
Kita memakai recv di contoh ini buat kesederhanaan; kita tidak punya
kerjaan lain buat main thread selain menunggu pesan, jadi memblokir
main thread adalah pilihan yang tepat.
Pas kita menjalankan kode di Listing 16-8, kita bakal melihat nilai yang dicetak dari main thread:
Got: hi
Sempurna!
Channels dan Transfer Kepemilikan (Ownership Transference)
Aturan-aturan ownership (kepemilikan) memainkan peran vital dalam
pengiriman pesan karena mereka ngebantu kita nulis kode konkuren yang aman.
Mencegah error di pemrograman konkuren adalah keuntungan (advantage) dari
memikirkan tentang ownership di sepanjang program Rust kita. Mari kita
lakukan sebuah eksperimen buat nunjukin gimana channels dan ownership
bekerja bersama-sama mencegah timbulnya masalah: kita bakal nyoba memakai
sebuah nilai val di dalam spawned thread setelah kita ngirim nilai itu
lewat channel. Coba compile kode di Listing 16-9 buat melihat kenapa kode
ini tidak diizinkan.
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {val}");
});
let received = rx.recv().unwrap();
println!("Got: {received}");
}
val setelah kita mengirimnya lewat channelDi sini, kita mencoba buat mencetak val setelah kita mengirimnya lewat
channel via tx.send. Mengizinkan ini adalah ide yang buruk: begitu nilainya
sudah dikirim ke thread lain, thread tersebut bisa aja memodifikasi atau
men-drop-nya sebelum kita nyoba memakai nilai itu lagi. Secara potensial,
modifikasi yang dilakukan sama thread lain bisa menyebabkan error atau hasil
yang tidak disangka-sangka akibat adanya data yang tidak konsisten atau sudah
tidak eksis lagi. Namun, Rust ngasih kita error kalau kita mencoba men-compile
kode di Listing 16-9:
$ cargo run
Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
--> src/main.rs:10:27
|
8 | let val = String::from("hi");
| --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9 | tx.send(val).unwrap();
| --- value moved here
10 | println!("val is {val}");
| ^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)
For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error
Kesalahan konkurensi (concurrency mistake) kita ini sudah membikin sebuah error
compile-time. Fungsi send mengambil kepemilikan atas parameternya, dan
saat nilainya dipindahkan (moved), receiver bakal mengambil kepemilikannya.
Hal ini menghentikan kita dari memakai nilai itu secara tidak sengaja lagi
setelah mengirimnya; sistem ownership memastikan kalau semuanya aman terkendali.
Mengirim Banyak Nilai dan Melihat Receiver Menunggu
Kode di Listing 16-8 berhasil di-compile dan jalan, tapi dia tidak secara jelas menunjukkan ke kita kalau ada dua threads terpisah yang lagi ngobrol satu sama lain lewat channel.
Di Listing 16-10 kita sudah membuat beberapa modifikasi yang bakal membuktikan kalau kode di Listing 16-8 itu berjalan secara konkuren: spawned thread sekarang bakal mengirim banyak pesan dan ngasih jeda sebentar (pause) satu detik di antara setiap pesannya.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
}
Kali ini, spawned thread punya sebuah vector berisi string yang mau kita
kirim ke main thread. Kita iterasi ngelewatin mereka, ngirim setiap string-nya
satu-satu, dan ngasih jeda di antara setiap pengiriman dengan memanggil
fungsi thread::sleep beserta sebuah nilai Duration satu detik.
Di dalam main thread, kita tidak memanggil fungsi recv secara eksplisit
lagi: sebaliknya, kita memperlakukan rx sebagai sebuah iterator. Buat setiap
nilai yang diterima, kita bakal mencetaknya. Saat channel-nya ditutup,
iterasinya bakal berakhir.
Pas kita menjalankan kode di Listing 16-10, kita seharusnya melihat output berikut dengan jeda satu detik di antara setiap barisnya:
Got: hi
Got: from
Got: the
Got: thread
Karena kita tidak punya kode yang melakukan jeda atau penundaan (delays) di dalam for loop di main thread, kita bisa tahu kalau main thread tersebut lagi nunggu buat nerima nilai-nilai dari spawned thread.
Membikin Banyak Producers dengan Meng-clone si Transmitter
Tadi kita sempat menyebutkan kalau mpsc adalah singkatan dari multiple producer,
single consumer (banyak penghasil, satu konsumen). Mari kita manfaatin
mpsc ini lalu ekspansi kode di Listing 16-10 buat membikin beberapa threads
yang semuanya mengirim nilai ke satu receiver yang sama. Kita bisa melakukan
itu dengan meng-clone transmitter-nya, seperti yang ditunjukkan di Listing 16-11.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for received in rx {
println!("Got: {received}");
}
// --snip--
}
Kali ini, sebelum kita membikin spawned thread yang pertama, kita memanggil
clone pada transmitter-nya. Ini bakal ngasih kita sebuah transmitter baru
yang bisa kita teruskan ke spawned thread yang pertama. Kita meneruskan
transmitter aslinya ke sebuah spawned thread yang kedua. Hal ini ngasih
kita dua threads, di mana masing-masing mengirim pesan yang berbeda ke
satu receiver yang sama.
Pas kita menjalankan kodenya, output kita harusnya bakal kelihatan kurang lebih kayak gini:
Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you
kita mungkin bakal melihat nilai-nilainya dalam urutan yang berbeda,
tergantung dari sistem yang kita pakai. Inilah yang membikin konkurensi
jadi hal yang menarik sekaligus sulit. Kalau kita eksperimen sama
thread::sleep, ngasih dia nilai yang beda-beda di berbagai threads
tersebut, masing-masing jalan (run) bakal jadi makin tidak deterministik
dan menghasilkan output yang berbeda-beda setiap kalinya.
Sekarang setelah kita melihat gimana channels itu bekerja, mari kita lihat metode konkurensi yang berbeda.