#include #include #include "caffe/data_reader.hpp" #include "caffe/layers/base_data_layer.hpp" #include "caffe/parallel.hpp" #include "caffe/util/blocking_queue.hpp" namespace caffe { template class BlockingQueue::sync { public: mutable boost::mutex mutex_; boost::condition_variable condition_; }; template BlockingQueue::BlockingQueue() : sync_(new sync()) { } template void BlockingQueue::push(const T& t) { boost::mutex::scoped_lock lock(sync_->mutex_); queue_.push(t); lock.unlock(); sync_->condition_.notify_one(); } template bool BlockingQueue::try_pop(T* t) { boost::mutex::scoped_lock lock(sync_->mutex_); if (queue_.empty()) { return false; } *t = queue_.front(); queue_.pop(); return true; } template T BlockingQueue::pop(const string& log_on_wait) { boost::mutex::scoped_lock lock(sync_->mutex_); while (queue_.empty()) { if (!log_on_wait.empty()) { LOG_EVERY_N(INFO, 1000)<< log_on_wait; } sync_->condition_.wait(lock); } T t = queue_.front(); queue_.pop(); return t; } template bool BlockingQueue::try_peek(T* t) { boost::mutex::scoped_lock lock(sync_->mutex_); if (queue_.empty()) { return false; } *t = queue_.front(); return true; } template T BlockingQueue::peek() { boost::mutex::scoped_lock lock(sync_->mutex_); while (queue_.empty()) { sync_->condition_.wait(lock); } return queue_.front(); } template size_t BlockingQueue::size() const { boost::mutex::scoped_lock lock(sync_->mutex_); return queue_.size(); } template class BlockingQueue*>; template class BlockingQueue*>; template class BlockingQueue; template class BlockingQueue >; template class BlockingQueue*>; template class BlockingQueue*>; } // namespace caffe