Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Memakai Message Passing buat Mentransfer Data Antar Threads

Satu pendekatan yang makin populer buat memastikan konkurensi yang aman adalah message passing (pengiriman pesan), di mana threads atau actors berkomunikasi dengan mengirimkan pesan berisi data ke satu sama lain. Ide ini digambarkan dalam sebuah slogan dari dokumentasi bahasa Go: “Jangan berkomunikasi dengan membagikan (sharing) memori; sebaliknya, bagikan memori dengan berkomunikasi.”

Buat mencapai konkurensi pengiriman pesan ini, standard library Rust menyediakan sebuah implementasi dari saluran (channels). Sebuah channel adalah konsep pemrograman umum di mana data dikirim dari satu thread ke thread lainnya.

Kita bisa membayangkan sebuah channel di dalam pemrograman itu kayak saluran air yang mengalir ke satu arah, seperti sungai atau selokan. Kalau kita menaruh sesuatu kayak bebek karet ke dalam sungai, dia bakal mengalir ke hilir (downstream) sampai ke ujung saluran air tersebut.

Sebuah channel punya dua paruh: sebuah transmitter (pemancar/pengirim) dan sebuah receiver (penerima). Paruh transmitter adalah lokasi hulu (upstream) tempat kita menaruh bebek karetnya ke dalam sungai, dan paruh receiver adalah hilir tempat si bebek karet pada akhirnya berlabuh. Satu bagian dari kode kita memanggil method-method di transmitter dengan data yang mau kita kirim, dan bagian lain mengecek ujung penerima (receiving end) buat melihat pesan yang datang. Sebuah channel dikatakan closed (tertutup) kalau entah paruh transmitter atau receiver-nya di-drop (dibuang).

Di sini, kita bakal perlahan ngebangun sebuah program yang punya satu thread buat nge-generate nilai dan mengirimkannya ke dalam sebuah channel, dan satu thread lain yang bakal menerima nilai-nilai tersebut lalu mencetaknya ke layar. Kita bakal mengirim nilai-nilai sederhana antar threads memakai sebuah channel buat mengilustrasikan fitur ini. Begitu kita udah terbiasa sama tekniknya, kita bisa memakai channels buat threads mana aja yang butuh berkomunikasi satu sama lain, kayak sistem chat atau sistem di mana banyak threads melakukan bagian-bagian dari sebuah perhitungan lalu mengirim bagian-bagian tersebut ke satu thread yang mengagregasikan (mengumpulkan) hasilnya.

Pertama, di Listing 16-6, kita bakal membikin sebuah channel tapi tidak melakukan apa-apa dengannya. Perhatikan bahwa ini belum bisa di-compile karena Rust tidak tahu tipe nilai apa yang mau kita kirim lewat channel ini.

Filename: src/main.rs
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}
Listing 16-6: Membikin sebuah channel dan me-assign kedua paruhnya ke tx dan rx

Kita membikin sebuah channel baru memakai fungsi mpsc::channel; mpsc adalah singkatan dari multiple producer, single consumer (banyak penghasil, satu konsumen). Singkatnya, cara standard library Rust mengimplementasikan channels berarti sebuah channel bisa punya banyak ujung pengirim (sending ends) yang memproduksi nilai tapi cuma bisa punya satu ujung penerima (receiving end) yang mengonsumsi nilai-nilai tersebut. Bayangin ada banyak aliran sungai kecil yang ngalir bersatu jadi satu sungai besar: apa pun yang dikirim ke salah satu sungai kecil itu bakal berakhir di satu sungai besar tersebut di ujungnya. Kita bakal mulai dengan satu produsen (producer) aja buat sekarang, tapi kita bakal nambahin banyak produsen pas contoh ini udah bisa jalan.

Fungsi mpsc::channel mengembalikan sebuah tuple, yang elemen pertamanya adalah ujung pengirim (the sending end)—yaitu si transmitter—dan elemen keduanya adalah ujung penerima (the receiving end)—yaitu si receiver. Singkatan tx dan rx secara tradisional sering dipakai di banyak bidang untuk masing-masing transmitter dan receiver, jadi kita menamai variabel kita dengan nama tersebut buat mengindikasikan setiap ujungnya. Kita memakai statement let dengan sebuah pola (pattern) yang men-destructure tuples tersebut; kita bakal membahas pemakaian pola di dalam statements let dan destructuring di Bab 19. Buat sekarang, ketahui aja kalau memakai statement let dengan cara ini adalah pendekatan yang nyaman buat mengekstrak potongan-potongan dari tuple yang dikembalikan sama mpsc::channel.

Mari kita pindahkan ujung pengirim (transmitting end) ke dalam spawned thread lalu suruh dia mengirim satu string supaya spawned thread tersebut berkomunikasi sama main thread, seperti yang ditunjukkan di Listing 16-7. Ini ibarat naruh bebek karet di bagian hulu sungai atau ngirim pesan chat dari satu thread ke thread lainnya.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}
Listing 16-7: Memindahkan tx ke dalam sebuah spawned thread dan mengirim "hi"

Sekali lagi, kita memakai thread::spawn buat membikin thread baru lalu memakai move buat memindahkan tx ke dalam closure supaya spawned thread tersebut memiliki (owns) tx. Spawned thread perlu memiliki si transmitter supaya bisa mengirim pesan lewat channel.

Transmitter punya sebuah method send yang menerima nilai yang mau kita kirim. Method send mengembalikan sebuah tipe Result<T, E>, jadi kalau receiver-nya ternyata sudah di-drop dan tidak ada tempat lagi buat ngirim nilai, operasi pengirimannya (send) bakal mengembalikan sebuah error. Di contoh ini, kita memanggil unwrap buat panic seandainya terjadi error. Tapi di aplikasi betulan (real application), kita bakal menanganinya dengan benar: silakan kembali ke Bab 9 buat me-review strategi-strategi buat penanganan error yang tepat.

Di Listing 16-8, kita bakal mengambil nilainya dari receiver di dalam main thread. Ini ibarat memungut bebek karet dari air di ujung hilir sungai atau menerima sebuah pesan chat.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-8: Menerima nilai "hi" di dalam main thread dan mencetaknya

Receiver punya dua method yang berguna: recv dan try_recv. Kita memakai recv, singkatan dari receive (menerima), yang bakal memblokir eksekusi dari main thread dan menunggu sampai sebuah nilai dikirim lewat channel tersebut. Begitu ada nilai yang dikirim, recv bakal mengembalikannya di dalam sebuah Result<T, E>. Saat transmitter ditutup, recv bakal mengembalikan sebuah error buat memberi sinyal bahwa tidak akan ada lagi nilai yang bakal datang.

Method try_recv tidak melakukan pemblokiran (doesn’t block), tapi ia bakal langsung mengembalikan sebuah Result<T, E>: nilai Ok yang memegang sebuah pesan kalau pesannya lagi tersedia dan nilai Err kalau tidak ada pesan sama sekali saat ini. Memakai try_recv berguna kalau thread ini punya kerjaan lain yang harus dilakuin sambil nunggu pesan: kita bisa nulis sebuah loop yang memanggil try_recv sesekali, menangani pesannya kalau lagi tersedia, dan kalau enggak, ngerjain tugas lain dulu sebentar sampai waktunya ngecek lagi.

Kita memakai recv di contoh ini buat kesederhanaan; kita tidak punya kerjaan lain buat main thread selain menunggu pesan, jadi memblokir main thread adalah pilihan yang tepat.

Pas kita menjalankan kode di Listing 16-8, kita bakal melihat nilai yang dicetak dari main thread:

Got: hi

Sempurna!

Channels dan Transfer Kepemilikan (Ownership Transference)

Aturan-aturan ownership (kepemilikan) memainkan peran vital dalam pengiriman pesan karena mereka ngebantu kita nulis kode konkuren yang aman. Mencegah error di pemrograman konkuren adalah keuntungan (advantage) dari memikirkan tentang ownership di sepanjang program Rust kita. Mari kita lakukan sebuah eksperimen buat nunjukin gimana channels dan ownership bekerja bersama-sama mencegah timbulnya masalah: kita bakal nyoba memakai sebuah nilai val di dalam spawned thread setelah kita ngirim nilai itu lewat channel. Coba compile kode di Listing 16-9 buat melihat kenapa kode ini tidak diizinkan.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {val}");
    });

    let received = rx.recv().unwrap();
    println!("Got: {received}");
}
Listing 16-9: Mencoba memakai val setelah kita mengirimnya lewat channel

Di sini, kita mencoba buat mencetak val setelah kita mengirimnya lewat channel via tx.send. Mengizinkan ini adalah ide yang buruk: begitu nilainya sudah dikirim ke thread lain, thread tersebut bisa aja memodifikasi atau men-drop-nya sebelum kita nyoba memakai nilai itu lagi. Secara potensial, modifikasi yang dilakukan sama thread lain bisa menyebabkan error atau hasil yang tidak disangka-sangka akibat adanya data yang tidak konsisten atau sudah tidak eksis lagi. Namun, Rust ngasih kita error kalau kita mencoba men-compile kode di Listing 16-9:

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:27
   |
 8 |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
 9 |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {val}");
   |                           ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` (bin "message-passing") due to 1 previous error

Kesalahan konkurensi (concurrency mistake) kita ini sudah membikin sebuah error compile-time. Fungsi send mengambil kepemilikan atas parameternya, dan saat nilainya dipindahkan (moved), receiver bakal mengambil kepemilikannya. Hal ini menghentikan kita dari memakai nilai itu secara tidak sengaja lagi setelah mengirimnya; sistem ownership memastikan kalau semuanya aman terkendali.

Mengirim Banyak Nilai dan Melihat Receiver Menunggu

Kode di Listing 16-8 berhasil di-compile dan jalan, tapi dia tidak secara jelas menunjukkan ke kita kalau ada dua threads terpisah yang lagi ngobrol satu sama lain lewat channel.

Di Listing 16-10 kita sudah membuat beberapa modifikasi yang bakal membuktikan kalau kode di Listing 16-8 itu berjalan secara konkuren: spawned thread sekarang bakal mengirim banyak pesan dan ngasih jeda sebentar (pause) satu detik di antara setiap pesannya.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }
}
Listing 16-10: Mengirim banyak pesan dan ngasih jeda (pause) di antara setiap pesannya

Kali ini, spawned thread punya sebuah vector berisi string yang mau kita kirim ke main thread. Kita iterasi ngelewatin mereka, ngirim setiap string-nya satu-satu, dan ngasih jeda di antara setiap pengiriman dengan memanggil fungsi thread::sleep beserta sebuah nilai Duration satu detik.

Di dalam main thread, kita tidak memanggil fungsi recv secara eksplisit lagi: sebaliknya, kita memperlakukan rx sebagai sebuah iterator. Buat setiap nilai yang diterima, kita bakal mencetaknya. Saat channel-nya ditutup, iterasinya bakal berakhir.

Pas kita menjalankan kode di Listing 16-10, kita seharusnya melihat output berikut dengan jeda satu detik di antara setiap barisnya:

Got: hi
Got: from
Got: the
Got: thread

Karena kita tidak punya kode yang melakukan jeda atau penundaan (delays) di dalam for loop di main thread, kita bisa tahu kalau main thread tersebut lagi nunggu buat nerima nilai-nilai dari spawned thread.

Membikin Banyak Producers dengan Meng-clone si Transmitter

Tadi kita sempat menyebutkan kalau mpsc adalah singkatan dari multiple producer, single consumer (banyak penghasil, satu konsumen). Mari kita manfaatin mpsc ini lalu ekspansi kode di Listing 16-10 buat membikin beberapa threads yang semuanya mengirim nilai ke satu receiver yang sama. Kita bisa melakukan itu dengan meng-clone transmitter-nya, seperti yang ditunjukkan di Listing 16-11.

Filename: src/main.rs
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {received}");
    }

    // --snip--
}
Listing 16-11: Mengirim banyak pesan dari banyak producers

Kali ini, sebelum kita membikin spawned thread yang pertama, kita memanggil clone pada transmitter-nya. Ini bakal ngasih kita sebuah transmitter baru yang bisa kita teruskan ke spawned thread yang pertama. Kita meneruskan transmitter aslinya ke sebuah spawned thread yang kedua. Hal ini ngasih kita dua threads, di mana masing-masing mengirim pesan yang berbeda ke satu receiver yang sama.

Pas kita menjalankan kodenya, output kita harusnya bakal kelihatan kurang lebih kayak gini:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

kita mungkin bakal melihat nilai-nilainya dalam urutan yang berbeda, tergantung dari sistem yang kita pakai. Inilah yang membikin konkurensi jadi hal yang menarik sekaligus sulit. Kalau kita eksperimen sama thread::sleep, ngasih dia nilai yang beda-beda di berbagai threads tersebut, masing-masing jalan (run) bakal jadi makin tidak deterministik dan menghasilkan output yang berbeda-beda setiap kalinya.

Sekarang setelah kita melihat gimana channels itu bekerja, mari kita lihat metode konkurensi yang berbeda.