@@ -1387,41 +1387,67 @@ Status MasterSession::DoRunWithLocalExecution(
13871387 pss.collect_rpcs = ph->should_collect_rpcs ();
13881388 }
13891389
1390- TF_RETURN_IF_ERROR (rcg->RunPartitions (env_, step_id, count,
1391- execution_state_.get (), &pss, opts, req,
1392- resp, cancellation_manager_, false ));
1390+ Status s =
1391+ rcg->RunPartitions (env_, step_id, count, execution_state_.get (), &pss,
1392+ opts, req, resp, cancellation_manager_, false );
1393+ if (s.ok ()) {
1394+ pss.end_micros = Env::Default ()->NowMicros ();
13931395
1394- pss.end_micros = Env::Default ()->NowMicros ();
1395-
1396- // Schedule post-processing and cleanup to be done asynchronously.
1396+ // Schedule post-processing and cleanup to be done asynchronously.
1397+ rcg->ProcessStats (step_id, &pss, execution_state_.get (), ph.get (),
1398+ req.options (), resp->mutable_metadata ());
1399+ } else if (errors::IsCancelled (s)) {
1400+ mutex_lock l (mu_);
1401+ if (closed_) {
1402+ if (garbage_collected_) {
1403+ s = errors::Cancelled (
1404+ " Step was cancelled because the session was garbage collected due "
1405+ " to inactivity." );
1406+ } else {
1407+ s = errors::Cancelled (
1408+ " Step was cancelled by an explicit call to `Session::Close()`." );
1409+ }
1410+ }
1411+ }
13971412 rcg->Ref ();
1398- rcg->ProcessStats (step_id, &pss, execution_state_.get (), ph.get (),
1399- req.options (), resp->mutable_metadata ());
14001413 rcg->CleanupPartitionsAsync (step_id, [rcg](const Status& s) {
14011414 if (!s.ok ()) {
14021415 LOG (ERROR ) << " Cleanup partition error: " << s;
14031416 }
14041417 rcg->Unref ();
14051418 });
1406- return Status::OK () ;
1419+ return s ;
14071420}
14081421
14091422Status MasterSession::Close () {
1423+ {
1424+ mutex_lock l (mu_);
1425+ closed_ = true ; // All subsequent calls to Run() or Extend() will fail.
1426+ }
14101427 cancellation_manager_->StartCancel ();
14111428 std::vector<ReffedClientGraph*> to_unref;
14121429 {
14131430 mutex_lock l (mu_);
14141431 while (num_running_ != 0 ) {
14151432 num_running_is_zero_.wait (l);
14161433 }
1417- closed_ = true ; // All subsequent calls to Run() or Extend() will fail.
14181434 ClearRunsTable (&to_unref, &run_graphs_);
14191435 ClearRunsTable (&to_unref, &partial_run_graphs_);
14201436 }
14211437 for (ReffedClientGraph* rcg : to_unref) rcg->Unref ();
14221438 return Status::OK ();
14231439}
14241440
1441+ void MasterSession::GarbageCollect () {
1442+ {
1443+ mutex_lock l (mu_);
1444+ closed_ = true ;
1445+ garbage_collected_ = true ;
1446+ }
1447+ cancellation_manager_->StartCancel ();
1448+ Unref ();
1449+ }
1450+
14251451MasterSession::RunState::RunState (const std::vector<string>& input_names,
14261452 const std::vector<string>& output_names,
14271453 ReffedClientGraph* rcg, const uint64 step_id,
0 commit comments