Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ categories = ["concurrency", "os"]

[dependencies]
num_cpus = "1.6"
crossbeam-channel = "0.4"
130 changes: 123 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@
//! assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23);
//! ```

extern crate crossbeam_channel;
extern crate num_cpus;

use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

use crossbeam_channel as cbc;

trait FnBox {
fn call_box(self: Box<Self>);
}
Expand All @@ -96,7 +98,7 @@ impl<F: FnOnce()> FnBox for F {
}
}

type Thunk<'a> = Box<FnBox + Send + 'a>;
type Thunk<'a> = Box<dyn FnBox + Send + 'a>;

struct Sentinel<'a> {
shared_data: &'a Arc<ThreadPoolSharedData>,
Expand Down Expand Up @@ -159,6 +161,7 @@ pub struct Builder {
num_threads: Option<usize>,
thread_name: Option<String>,
thread_stack_size: Option<usize>,
queue_len: Option<usize>,
}

impl Builder {
Expand All @@ -176,6 +179,7 @@ impl Builder {
num_threads: None,
thread_name: None,
thread_stack_size: None,
queue_len: None,
}
}

Expand Down Expand Up @@ -211,6 +215,49 @@ impl Builder {
self
}

/// Set the maximum number of pending jobs that can be queued to
/// the [`ThreadPool`]. Once the queue is full further calls will
/// block until slots become available. A `len` of 0 will always
/// block until a thread is available. If not specified, defaults
/// to unlimited.
///
/// [`ThreadPool`]: struct.ThreadPool.html
///
/// # Panics
///
/// This method will panic if `len` is less-than 0;
///
/// # Examples
///
/// With a single thread and a queue len of 1, the final execute
/// will have to wait until the first job finishes to be queued.
///
/// ```
/// use std::thread;
/// use std::time::Duration;
///
/// let pool = threadpool::Builder::new()
/// .num_threads(1)
/// .queue_len(1)
/// .build();
///
/// for _ in 0..2 {
/// pool.execute(|| {
/// println!("Hello from a worker thread! I'm going to rest now...");
/// thread::sleep(Duration::from_secs(10));
/// println!("All done!");
/// })
/// }
///
/// pool.execute(|| {
/// println!("Hello from 10 seconds in the future!");
/// });
/// ```
pub fn queue_len(mut self, len: usize) -> Builder {
self.queue_len = Some(len);
self
}

/// Set the thread name for each of the threads spawned by the built [`ThreadPool`]. If not
/// specified, threads spawned by the thread pool will be unnamed.
///
Expand Down Expand Up @@ -279,7 +326,10 @@ impl Builder {
/// .build();
/// ```
pub fn build(self) -> ThreadPool {
let (tx, rx) = channel::<Thunk<'static>>();
let (tx, rx) = self.queue_len.map_or(
cbc::unbounded(),
|len| cbc::bounded(len)
);

let num_threads = self.num_threads.unwrap_or_else(num_cpus::get);

Expand Down Expand Up @@ -310,7 +360,7 @@ impl Builder {

struct ThreadPoolSharedData {
name: Option<String>,
job_receiver: Mutex<Receiver<Thunk<'static>>>,
job_receiver: Mutex<cbc::Receiver<Thunk<'static>>>,
empty_trigger: Mutex<()>,
empty_condvar: Condvar,
join_generation: AtomicUsize,
Expand Down Expand Up @@ -343,7 +393,7 @@ pub struct ThreadPool {
//
// This is the only such Sender, so when it is dropped all subthreads will
// quit.
jobs: Sender<Thunk<'static>>,
jobs: cbc::Sender<Thunk<'static>>,
shared_data: Arc<ThreadPoolSharedData>,
}

Expand Down Expand Up @@ -780,7 +830,7 @@ mod test {
use super::{Builder, ThreadPool};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, sync_channel};
use std::sync::{Arc, Barrier};
use std::sync::{Arc, Barrier, Mutex};
use std::thread::{self, sleep};
use std::time::Duration;

Expand Down Expand Up @@ -928,7 +978,7 @@ mod test {
b1.wait();
}

tx.send(1).is_ok();
tx.send(1).unwrap();
});
}

Expand Down Expand Up @@ -1324,4 +1374,70 @@ mod test {

clock_thread.join().unwrap();
}

#[test]
fn test_bounded_pool() {
let pool = Builder::new()
.num_threads(1)
.queue_len(1)
.build();
let end = Arc::new(Barrier::new(2));
let count = Arc::new(Mutex::new(0));

fn inc_wait(c: &Arc<Mutex<i64>>, val: i64, millis: i64) -> bool{
for _ in 0..millis/10 {
{
let l = c.lock().unwrap();
if *l == val {
return true;
}
}
sleep(Duration::from_millis(10));
}
return false;
}

// Lock up the only thread
let e1 = end.clone();
let c1 = count.clone();
pool.execute(move || {
{
let mut c = c1.lock().unwrap();
*c += 1;
}
e1.wait();
});

// Wait for it to be ready
assert!(inc_wait(&count, 1, 1000));
assert_eq!(pool.queued_count(), 0);

// Schedule 2nd job; sits on the queue
let e2 = end.clone();
let c2 = count.clone();
pool.execute(move || {
{
let mut c = c2.lock().unwrap();
*c += 1;
}
e2.wait();
});

assert!(!inc_wait(&count, 2, 1000));
assert_eq!(pool.queued_count(), 1);

// Third attempt should block
let c3 = count.clone();
thread::spawn(move || {
pool.execute(move || {
} );
{
let mut c = c3.lock().unwrap();
*c += 1;
}
});
assert!(!inc_wait(&count, 2, 1000));

end.wait();
}
}