From c60f0768735851efe0856dbf147c8a92b88fec25 Mon Sep 17 00:00:00 2001 From: Steve Smith Date: Fri, 13 Dec 2019 15:40:34 +1100 Subject: [PATCH 1/2] Move to using crossbeam and add a blocking fixed-length queue option. --- Cargo.toml | 1 + src/lib.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 123 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1599b0d..f7ee0b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ categories = ["concurrency", "os"] [dependencies] num_cpus = "1.6" +crossbeam-channel = "0.4" diff --git a/src/lib.rs b/src/lib.rs index 5d168cf..2d2bf58 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -78,14 +78,16 @@ //! assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23); //! ``` +extern crate crossbeam_channel; extern crate num_cpus; use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::{Arc, Condvar, Mutex}; use std::thread; +use crossbeam_channel as cbc; + trait FnBox { fn call_box(self: Box); } @@ -96,7 +98,7 @@ impl FnBox for F { } } -type Thunk<'a> = Box; +type Thunk<'a> = Box; struct Sentinel<'a> { shared_data: &'a Arc, @@ -159,6 +161,7 @@ pub struct Builder { num_threads: Option, thread_name: Option, thread_stack_size: Option, + queue_len: Option, } impl Builder { @@ -176,6 +179,7 @@ impl Builder { num_threads: None, thread_name: None, thread_stack_size: None, + queue_len: None, } } @@ -211,6 +215,48 @@ impl Builder { self } + /// Set the maximum number of pending jobs that can be queued to + /// the [`ThreadPool`]. Once the queue is full further calls will + /// block until slots become available. A `len` of 0 will always + /// block until a thread is available. If not specified, defaults + /// to unlimited. + /// + /// [`ThreadPool`]: struct.ThreadPool.html + /// + /// # Panics + /// + /// This method will panic if `len` is less-than 0; + /// + /// # Examples + /// + /// With a single thread and a queue len of 1, the final execute + /// will have to wait until the first job finishes to be queued. + /// + /// ``` + /// use std::thread; + /// + /// let pool = threadpool::Builder::new() + /// .num_threads(1) + /// .queue_len(1) + /// .build(); + /// + /// for _ in 0..2 { + /// pool.execute(|| { + /// println!("Hello from a worker thread! I'm going to rest now...") + /// thread::sleep(Duration::from_secs(10)); + /// println!("All done!") + /// }) + /// } + /// + /// pool.execute(|| { + /// println!("Hello from 10 seconds in the future!"); + /// }); + /// ``` + pub fn queue_len(mut self, len: usize) -> Builder { + self.queue_len = Some(len); + self + } + /// Set the thread name for each of the threads spawned by the built [`ThreadPool`]. If not /// specified, threads spawned by the thread pool will be unnamed. /// @@ -279,7 +325,10 @@ impl Builder { /// .build(); /// ``` pub fn build(self) -> ThreadPool { - let (tx, rx) = channel::>(); + let (tx, rx) = self.queue_len.map_or( + cbc::unbounded(), + |len| cbc::bounded(len) + ); let num_threads = self.num_threads.unwrap_or_else(num_cpus::get); @@ -310,7 +359,7 @@ impl Builder { struct ThreadPoolSharedData { name: Option, - job_receiver: Mutex>>, + job_receiver: Mutex>>, empty_trigger: Mutex<()>, empty_condvar: Condvar, join_generation: AtomicUsize, @@ -343,7 +392,7 @@ pub struct ThreadPool { // // This is the only such Sender, so when it is dropped all subthreads will // quit. - jobs: Sender>, + jobs: cbc::Sender>, shared_data: Arc, } @@ -780,7 +829,7 @@ mod test { use super::{Builder, ThreadPool}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc::{channel, sync_channel}; - use std::sync::{Arc, Barrier}; + use std::sync::{Arc, Barrier, Mutex}; use std::thread::{self, sleep}; use std::time::Duration; @@ -928,7 +977,7 @@ mod test { b1.wait(); } - tx.send(1).is_ok(); + tx.send(1).unwrap(); }); } @@ -1324,4 +1373,70 @@ mod test { clock_thread.join().unwrap(); } + + #[test] + fn test_bounded_pool() { + let pool = Builder::new() + .num_threads(1) + .queue_len(1) + .build(); + let end = Arc::new(Barrier::new(2)); + let count = Arc::new(Mutex::new(0)); + + fn inc_wait(c: &Arc>, val: i64, millis: i64) -> bool{ + for _ in 0..millis/10 { + { + let l = c.lock().unwrap(); + if *l == val { + return true; + } + } + sleep(Duration::from_millis(10)); + } + return false; + } + + // Lock up the only thread + let e1 = end.clone(); + let c1 = count.clone(); + pool.execute(move || { + { + let mut c = c1.lock().unwrap(); + *c += 1; + } + e1.wait(); + }); + + // Wait for it to be ready + assert!(inc_wait(&count, 1, 1000)); + assert_eq!(pool.queued_count(), 0); + + // Schedule 2nd job; sits on the queue + let e2 = end.clone(); + let c2 = count.clone(); + pool.execute(move || { + { + let mut c = c2.lock().unwrap(); + *c += 1; + } + e2.wait(); + }); + + assert!(!inc_wait(&count, 2, 1000)); + assert_eq!(pool.queued_count(), 1); + + // Third attempt should block + let c3 = count.clone(); + thread::spawn(move || { + pool.execute(move || { + } ); + { + let mut c = c3.lock().unwrap(); + *c += 1; + } + }); + assert!(!inc_wait(&count, 2, 1000)); + + end.wait(); + } } From 23034ff1b6174657a3046be8915683a8e3ca3bfa Mon Sep 17 00:00:00 2001 From: Steve Smith Date: Fri, 13 Dec 2019 15:49:19 +1100 Subject: [PATCH 2/2] Fixes to example code. --- src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2d2bf58..fdb9eb2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -234,6 +234,7 @@ impl Builder { /// /// ``` /// use std::thread; + /// use std::time::Duration; /// /// let pool = threadpool::Builder::new() /// .num_threads(1) @@ -242,9 +243,9 @@ impl Builder { /// /// for _ in 0..2 { /// pool.execute(|| { - /// println!("Hello from a worker thread! I'm going to rest now...") + /// println!("Hello from a worker thread! I'm going to rest now..."); /// thread::sleep(Duration::from_secs(10)); - /// println!("All done!") + /// println!("All done!"); /// }) /// } ///