Skip to content

Commit dbb8045

Browse files
committed
trunk: some changes to online-nnet2 threaded decoding program to fix an issue Nagendra and Tanel noticed where endpointing does not work if simulate-realtime-decoding=false.
git-svn-id: https://svn.code.sf.net/p/kaldi/code/trunk@5016 5e6a8d80-dfce-4ca6-a32a-6e07a63d50c8
1 parent 38ab1f4 commit dbb8045

File tree

4 files changed

+121
-2
lines changed

4 files changed

+121
-2
lines changed

src/online2/online-nnet2-decoding-threaded.cc

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ SingleUtteranceNnet2DecoderThreaded::SingleUtteranceNnet2DecoderThreaded(
119119
config_(config), am_nnet_(am_nnet), tmodel_(tmodel), sampling_rate_(0.0),
120120
num_samples_received_(0), input_finished_(false),
121121
feature_pipeline_(feature_info),
122+
num_samples_discarded_(0),
122123
silence_weighting_(tmodel, feature_info.silence_weighting_config),
123124
decodable_(tmodel),
124125
num_frames_decoded_(0), decoder_(fst, config_.decoder_opts),
@@ -176,6 +177,10 @@ SingleUtteranceNnet2DecoderThreaded::~SingleUtteranceNnet2DecoderThreaded() {
176177
delete input_waveform_.front();
177178
input_waveform_.pop_front();
178179
}
180+
while (!processed_waveform_.empty()) {
181+
delete processed_waveform_.front();
182+
processed_waveform_.pop_front();
183+
}
179184
}
180185

181186
void SingleUtteranceNnet2DecoderThreaded::AcceptWaveform(
@@ -200,6 +205,22 @@ void SingleUtteranceNnet2DecoderThreaded::AcceptWaveform(
200205
waveform_synchronizer_.UnlockSuccess(ThreadSynchronizer::kProducer);
201206
}
202207

208+
int32 SingleUtteranceNnet2DecoderThreaded::NumWaveformPiecesPending() {
209+
// Note RE locking: what we really want here is just to lock the mutex. As a
210+
// side effect, because of the way the synchronizer code works, it will also
211+
// increment the semaphore and might wake up the consumer thread. This will
212+
// possibly make it do a little useless work (go around a loop once), but
213+
// won't really do any harm. Perhaps we should have implemented a version of
214+
// the Lock function that takes no arguments.
215+
if (!waveform_synchronizer_.Lock(ThreadSynchronizer::kProducer)) {
216+
KALDI_ERR << "Failure locking mutex: decoding aborted.";
217+
}
218+
int32 ans = input_waveform_.size();
219+
waveform_synchronizer_.UnlockSuccess(ThreadSynchronizer::kProducer);
220+
return ans;
221+
}
222+
223+
203224
int32 SingleUtteranceNnet2DecoderThreaded::NumFramesReceivedApprox() const {
204225
return num_samples_received_ /
205226
(sampling_rate_ * feature_pipeline_.FrameShiftInSeconds());
@@ -237,6 +258,55 @@ void SingleUtteranceNnet2DecoderThreaded::FinalizeDecoding() {
237258
decoder_.FinalizeDecoding();
238259
}
239260

261+
BaseFloat SingleUtteranceNnet2DecoderThreaded::GetRemainingWaveform(
262+
Vector<BaseFloat> *waveform) const {
263+
if (KALDI_PTHREAD_PTR(threads_[0]) != 0) {
264+
KALDI_ERR << "It is an error to call GetRemainingWaveform before Wait().";
265+
}
266+
int64 num_samples_stored = 0; // number of samples we still have.
267+
std::vector< Vector<BaseFloat>* > all_pieces;
268+
std::deque< Vector<BaseFloat>* >::const_iterator iter;
269+
for (iter = input_waveform_.begin(); iter != input_waveform_.end(); ++iter) {
270+
num_samples_stored += (*iter)->Dim();
271+
all_pieces.push_back(*iter);
272+
}
273+
for (iter = processed_waveform_.begin(); iter != processed_waveform_.end();
274+
++iter) {
275+
num_samples_stored += (*iter)->Dim();
276+
all_pieces.push_back(*iter);
277+
}
278+
// put the pieces in chronological order.
279+
std::reverse(all_pieces.begin(), all_pieces.end());
280+
int64 samples_shift_per_frame =
281+
sampling_rate_ * feature_pipeline_.FrameShiftInSeconds();
282+
int64 num_samples_to_discard = samples_shift_per_frame * num_frames_decoded_;
283+
KALDI_ASSERT(num_samples_to_discard >= num_samples_discarded_);
284+
285+
// num_samp_discard is how many samples we must discard from our stored
286+
// samples.
287+
int64 num_samp_discard = num_samples_to_discard - num_samples_discarded_,
288+
num_samp_keep = num_samples_stored - num_samp_discard;
289+
KALDI_ASSERT(num_samp_discard <= num_samples_stored && num_samp_keep >= 0);
290+
waveform->Resize(num_samp_keep, kUndefined);
291+
int32 offset = 0; // offset in output waveform. assume output waveform is no
292+
// larger than int32.
293+
for (size_t i = 0; i < all_pieces.size(); i++) {
294+
Vector<BaseFloat> *this_piece = all_pieces[i];
295+
int32 this_dim = this_piece->Dim();
296+
if (num_samp_discard >= this_dim) {
297+
num_samp_discard -= this_dim;
298+
} else {
299+
// normal case is num_samp_discard = 0.
300+
int32 this_dim_keep = this_dim - num_samp_discard;
301+
waveform->Range(offset, this_dim_keep).CopyFromVec(
302+
this_piece->Range(num_samp_discard, this_dim_keep));
303+
offset += this_dim_keep;
304+
num_samp_discard = 0;
305+
}
306+
}
307+
KALDI_ASSERT(offset == num_samp_keep && num_samp_discard == 0);
308+
return sampling_rate_;
309+
}
240310

241311
void SingleUtteranceNnet2DecoderThreaded::GetAdaptationState(
242312
OnlineIvectorExtractorAdaptationState *adaptation_state) {
@@ -413,11 +483,23 @@ bool SingleUtteranceNnet2DecoderThreaded::FeatureComputation(
413483
while (num_frames_usable < config_.nnet_batch_size &&
414484
!input_waveform_.empty()) {
415485
feature_pipeline_.AcceptWaveform(sampling_rate_, *input_waveform_.front());
416-
delete input_waveform_.front();
486+
processed_waveform_.push_back(input_waveform_.front());
417487
input_waveform_.pop_front();
418488
num_frames_ready = feature_pipeline_.NumFramesReady();
419489
num_frames_usable = num_frames_ready - num_frames_output;
420490
}
491+
// Delete already-processed pieces of waveform if we have already decoded
492+
// those frames. (If not already decoded, we keep them around for the
493+
// sake of GetRemainingWaveform()).
494+
int32 samples_shift_per_frame =
495+
sampling_rate_ * feature_pipeline_.FrameShiftInSeconds();
496+
while (!processed_waveform_.empty() &&
497+
num_samples_discarded_ + processed_waveform_.front()->Dim() <
498+
samples_shift_per_frame * num_frames_decoded_) {
499+
num_samples_discarded_ += processed_waveform_.front()->Dim();
500+
delete processed_waveform_.front();
501+
processed_waveform_.pop_front();
502+
}
421503
return waveform_synchronizer_.UnlockSuccess(ThreadSynchronizer::kConsumer);
422504
}
423505
}
@@ -605,4 +687,3 @@ bool SingleUtteranceNnet2DecoderThreaded::EndpointDetected(
605687

606688

607689
} // namespace kaldi
608-

src/online2/online-nnet2-decoding-threaded.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,18 @@ class SingleUtteranceNnet2DecoderThreaded {
201201
const OnlineNnet2FeaturePipelineInfo &feature_info,
202202
const OnlineIvectorExtractorAdaptationState &adaptation_state);
203203

204+
205+
204206
/// You call this to provide this class with more waveform to decode. This
205207
/// call is, for all practical purposes, non-blocking.
206208
void AcceptWaveform(BaseFloat samp_freq,
207209
const VectorBase<BaseFloat> &wave_part);
208210

211+
/// Returns the number of pieces of waveform that are still waiting to be
212+
/// processed. This may be useful for calling code to judge whether to supply
213+
/// more waveform or to wait.
214+
int32 NumWaveformPiecesPending();
215+
209216
/// You call this to inform the class that no more waveform will be provided;
210217
/// this allows it to flush out the last few frames of features, and is
211218
/// necessary if you want to call Wait() to wait until all decoding is done.
@@ -285,6 +292,12 @@ class SingleUtteranceNnet2DecoderThreaded {
285292
/// You may only call this function after either calling TerminateDecoding() or
286293
/// InputFinished, and then Wait(). Otherwise it is an error.
287294
void GetAdaptationState(OnlineIvectorExtractorAdaptationState *adaptation_state);
295+
296+
/// Gets the remaining, un-decoded part of the waveform and returns the sample
297+
/// rate. May only be called after Wait(), and it only makes sense to call
298+
/// this if you called TerminateDecoding() before Wait(). The idea is that
299+
/// you can then provide this un-decoded piece of waveform to another decoder.
300+
BaseFloat GetRemainingWaveform(Vector<BaseFloat> *waveform_out) const;
288301

289302
~SingleUtteranceNnet2DecoderThreaded();
290303
private:
@@ -349,6 +362,8 @@ class SingleUtteranceNnet2DecoderThreaded {
349362
// sampling_rate_ is only needed for checking that it matches the config.
350363
bool input_finished_;
351364
std::deque< Vector<BaseFloat>* > input_waveform_;
365+
366+
352367
ThreadSynchronizer waveform_synchronizer_;
353368

354369
// feature_pipeline_ is accessed by the nnet-evaluation thread, by the main
@@ -358,6 +373,15 @@ class SingleUtteranceNnet2DecoderThreaded {
358373
OnlineNnet2FeaturePipeline feature_pipeline_;
359374
Mutex feature_pipeline_mutex_;
360375

376+
// The next two variables are required only for implementation of the function
377+
// GetRemainingWaveform(). After we take waveform from the input_waveform_
378+
// queue to be processed into features, we put them onto this deque. Then we
379+
// discard from this queue any that we can discard because we have already
380+
// decoded those frames (see num_frames_decoded_), and we increment
381+
// num_samples_discarded_ by the corresponding number of samples.
382+
std::deque< Vector<BaseFloat>* > processed_waveform_;
383+
int64 num_samples_discarded_;
384+
361385
// This object is used to control the (optional) downweighting of silence in iVector estimation,
362386
// which is based on the decoder traceback.
363387
OnlineSilenceWeighting silence_weighting_;

src/online2bin/online2-wav-nnet2-latgen-threaded.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ int main(int argc, char *argv[]) {
219219
: samp_remaining;
220220

221221
SubVector<BaseFloat> wave_part(data, samp_offset, num_samp);
222+
223+
// The endpointing code won't work if we let the waveform be given to
224+
// the decoder all at once, because we'll exit this while loop, and
225+
// the endpointing happens inside this while loop. The next statement
226+
// is intended to prevent this from happening.
227+
while (do_endpointing &&
228+
decoder.NumWaveformPiecesPending() * chunk_length_secs > 2.0)
229+
Sleep(0.5);
230+
222231
decoder.AcceptWaveform(samp_freq, wave_part);
223232

224233
samp_offset += num_samp;

tools/Makefile

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ fortran_opt = $(shell gcc -v 2>&1 | perl -e '$$x = join(" ", <STDIN>); if($$x =~
166166
# if you want Open Blas to use multiple threads. then you could set,
167167
# for example, OPENBLAS_NUM_THREADS=2 in your path.sh so that the
168168
# runtime knows how many threads to use.
169+
# Note: if you ever get the error "Program is Terminated. Because you tried to
170+
# allocate too many memory regions.", this is because OpenBLAS has a fixed
171+
# buffer size for the number of threads and you might have gone beyond that. It
172+
# may possibly help to add e.g. NUM_THREADS=64 to the command line below (after
173+
# $(MAKE)).
169174
openblas_compiled:
170175
-git clone git://github.com/xianyi/OpenBLAS
171176
cd OpenBLAS; sed 's:# FCOMMON_OPT = -frecursive:FCOMMON_OPT = -frecursive:' < Makefile.rule >tmp && mv tmp Makefile.rule

0 commit comments

Comments
 (0)