Skip to content

[improvement](streaming-job) Add RETRYING status to distinguish abnormal jobs#62238

Closed
JNSimba wants to merge 5 commits intoapache:masterfrom
JNSimba:addretrystatus
Closed

[improvement](streaming-job) Add RETRYING status to distinguish abnormal jobs#62238
JNSimba wants to merge 5 commits intoapache:masterfrom
JNSimba:addretrystatus

Conversation

@JNSimba
Copy link
Copy Markdown
Member

@JNSimba JNSimba commented Apr 8, 2026

Summary

  • Add a new RETRYING status to JobStatus enum for streaming jobs, so users can distinguish between healthy running jobs and jobs that are encountering errors and auto-retrying.
  • Previously, both user-initiated pause and recoverable errors shared the same PAUSED status, making it impossible to tell them apart in show streaming jobs.
  • Now PAUSED is exclusively for user-initiated pause and unrecoverable errors, while RETRYING indicates the job is auto-recovering with exponential backoff.

State transition diagram

                          CREATE JOB
                              |
                              v
                         +---------+
                         | PENDING |
                         +----+----+
                              | createStreamingTask(), autoResumeCount = 0
                              v
                         +---------+
            +------------|  RUNNING |------------------+
            |            +--+----+--+                  |
            |               |    |                     |
       user PAUSE      task fail/ |               hasReachedEnd
      (MANUAL_PAUSE)  meta fail/  |                    |
            |         sched fail  |                    v
            |        (recoverable)|               +----------+
            |               |    data quality     | FINISHED |
            |               |    error            +----------+
            |               |  (unrecoverable)
            v               v        |
       +--------+     +----------+   |
       | PAUSED |     | RETRYING |   |
       +---+----+     +----+-----+   |
           |               |         v
      user RESUME     backoff,   +--------+
           |          recreate   | PAUSED |
           v          task       +--------+
       +---------+         |
       | PENDING |    task result
       +----+----+    /         \
            |       success     fail
            v       |            |
         RUNNING    v            v
                 RUNNING      RETRYING
                (count=0)    (keep, count++)

       RETRYING --> user PAUSE --> PAUSED
       any non-final --> user STOP --> STOPPED

Key changes

File Change
JobStatus.java Add RETRYING enum, include in isRunning()
AbstractJob.java Allow RETRYING state transitions
StreamingJobSchedulerTask.java New handleRetryingState() with backoff + task recreation; PAUSED does nothing
StreamingInsertJob.java onStreamTaskFail/fetchMeta → RETRYING; onStreamTaskSuccess → RUNNING + reset count; gsonPostProcess null fallback for downgrade safety
ResumeJobCommand.java Reject RESUME on RETRYING jobs
StreamingTaskScheduler.java Schedule failure → RETRYING
AbstractJobStatusTest.java Add RETRYING transition tests
Regression tests (5 files) Update expected status from PAUSED to RETRYING

Test plan

  • UT: AbstractJobStatusTest covers all RETRYING state transitions
  • Regression: test_streaming_insert_job_alter_aksk — alter to wrong credentials, verify RETRYING status
  • Regression: test_streaming_insert_job_fetch_meta_error — debug point fetch meta failure, verify RETRYING
  • Regression: test_streaming_job_schedule_task_error — debug point schedule failure, verify RETRYING
  • Regression: test_streaming_insert_job_task_retry — task timeout, verify RETRYING
  • Regression: CDC test_streaming_job_cdc_stream_postgres_latest_alter_cred — wrong PG credentials, verify RETRYING

🤖 Generated with Claude Code

…mal jobs from normal ones

Previously, when a streaming job encountered recoverable errors (task failure,
meta fetch failure, scheduling failure), it transitioned to PAUSED — the same
state used for user-initiated pause. This made it impossible to distinguish
healthy jobs from problematic ones in `show streaming jobs`.

This change introduces a new RETRYING status for recoverable errors, keeping
PAUSED exclusively for user-initiated pause and unrecoverable errors
(TOO_MANY_FAILURE_ROWS, CANNOT_RESUME).

State transition diagram after this change:

```
                          CREATE JOB
                              |
                              v
                         +---------+
                         | PENDING |
                         +----+----+
                              | createStreamingTask(), autoResumeCount = 0
                              v
                         +---------+
            +------------|  RUNNING |------------------+
            |            +--+----+--+                  |
            |               |    |                     |
       user PAUSE      task fail/ |               hasReachedEnd
      (MANUAL_PAUSE)  meta fail/  |                    |
            |         sched fail  |                    v
            |        (recoverable)|               +----------+
            |               |    data quality     | FINISHED |
            |               |    error            +----------+
            |               |  (unrecoverable)
            v               v        |
       +--------+     +----------+   |
       | PAUSED |     | RETRYING |   |
       +---+----+     +----+-----+   |
           |               |         v
      user RESUME     backoff,   +--------+
           |          recreate   | PAUSED |
           v          task       +--------+
       +---------+         |
       | PENDING |    task result
       +----+----+    /         \
            |       success     fail
            v       |            |
         RUNNING    v            v
                 RUNNING      RETRYING
                (count=0)    (keep, count++)

       RETRYING --> user PAUSE --> PAUSED
       any non-final --> user STOP --> STOPPED
```

Key changes:
- JobStatus: add RETRYING enum, include in isRunning()
- AbstractJob: allow RETRYING transitions
- StreamingJobSchedulerTask: new handleRetryingState() with backoff and task
  recreation; PAUSED case does nothing (waits for user RESUME)
- StreamingInsertJob: onStreamTaskFail/fetchMeta -> RETRYING instead of PAUSED;
  onStreamTaskSuccess: RETRYING->RUNNING + reset autoResumeCount;
  gsonPostProcess: null jobStatus fallback to PAUSED for downgrade safety
- ResumeJobCommand: reject RESUME on RETRYING jobs
- StreamingTaskScheduler: schedule failure -> RETRYING
- UT: add RETRYING transition tests in AbstractJobStatusTest
- Regression tests: update expected status from PAUSED to RETRYING

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@Thearas
Copy link
Copy Markdown
Contributor

Thearas commented Apr 8, 2026

Thank you for your contribution to Apache Doris.
Don't know what should be done next? See How to process your PR.

Please clearly describe your PR:

  1. What problem was fixed (it's best to include specific error reporting information). How it was fixed.
  2. Which behaviors were modified. What was the previous behavior, what is it now, why was it modified, and what possible impacts might there be.
  3. What features were added. Why was this function added?
  4. Which code was refactored and why was this part of the code refactored?
  5. Which functions were optimized and what is the difference before and after the optimization?

@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 8, 2026

run buildall

…bStatusTest

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 9, 2026

run buildall

RETRYING handles its own retry logic internally, it should never go
back to PENDING. Only PAUSED can go to PENDING via user RESUME.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba
Copy link
Copy Markdown
Member Author

JNSimba commented Apr 9, 2026

/review

Copy link
Copy Markdown
Contributor

@github-actions github-actions Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found 3 issues that should be addressed before merge.

  1. handleRetryingState() can create overlapping retry tasks while the job remains in RETRYING.
  2. Async failure paths can overwrite a concurrent manual pause and flip the job back to RETRYING.
  3. The updated CDC regression now waits for RETRYING, but the later ALTER JOB step still only accepts PAUSED jobs.

Critical checkpoint conclusions:

  • Goal of the task: Partially achieved. The new RETRYING status is added and surfaced, but the end-to-end retry/manual-control flow is still inconsistent.
  • Modification size/focus: Small and focused, but not all parallel paths were updated consistently.
  • Concurrency: Not safe yet. The timer-scheduler thread, streaming-task callback thread, and user pause/resume commands can race in ways that either spawn duplicate retry tasks or lose a manual pause.
  • Lifecycle/static initialization: No new static-init issue found. The main lifecycle concern is the retry task lifecycle noted above.
  • Configuration: No new configuration items.
  • Compatibility/incompatible change: A new persisted enum value is introduced; the gsonPostProcess() fallback covers the obvious downgrade deserialization case.
  • Parallel code paths: Not fully updated. Manual repair/alter flow and some async failure paths still assume PAUSED.
  • Special conditional checks: The retry/manual-pause conditions need stronger guarding; current checks are incomplete.
  • Test coverage: Added UT/regression coverage for visible status changes, but missing coverage for manual-pause races, overlapping retry tasks, and alter-from-RETRYING recovery.
  • Observability: Existing logs/metrics look adequate for this patch; no blocking observability gap found.
  • Transaction/persistence: No additional transaction or edit-log issue identified in this diff.
  • Data writes/modifications: Not safe yet, because overlapping retry tasks can perform duplicate work against the same streaming job.
  • FE/BE variable passing: Not applicable.
  • Performance: Potential duplicate retry task creation can waste scheduler/execution resources.
  • Other issues: See inline comments.

I did not run FE UT or regression tests locally in this review environment.

return;
}
streamingInsertJob.createStreamingTask();
streamingInsertJob.setSampleStartTime(System.currentTimeMillis());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleRetryingState() now creates a new streaming task but keeps the job in RETRYING. Because needScheduleTask() returns true for RETRYING and there is no guard here for an already-active runningStreamTask, the next timer-driven StreamingJobSchedulerTask can come back through this branch again before the first retry attempt finishes and call createStreamingTask() a second time. That opens the door to two concurrent streaming tasks for the same job. The old flow moved back to PENDING, which avoided this duplicate-task window.

updateJobStatus(JobStatus.RUNNING);
}
if (offsetProvider.hasReachedEnd()) {
// offset provider has reached a natural end, mark job as finished
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetchMeta() explicitly preserves MANUAL_PAUSE_ERR before switching to RETRYING, but this task-failure path now unconditionally flips the job to RETRYING. If the user runs PAUSE JOB while a streaming task is failing, onStreamTaskFail() overwrites the manual pause reason/status and the job starts auto-retrying again. That violates the new contract that PAUSED is reserved for user-initiated pause and unrecoverable errors. This path needs the same manual-pause guard as fetchMeta(); the scheduling-failure path has the same race.

def s = sql """select status from jobs("type"="insert") where Name='${jobName}'"""
log.info("status after wrong cred resume: " + s)
s.size() == 1 && s.get(0).get(0) == "PAUSED"
s.size() == 1 && s.get(0).get(0) == "RETRYING"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once this assertion is updated to wait for RETRYING, the later ALTER JOB in scenario C no longer works: AlterJobCommand.validate() still rejects anything except PAUSED. Before this patch the auto-error state was PAUSED, so the alter step was reachable; now this test needs an explicit PAUSE JOB before the alter, or the product code needs to allow altering a RETRYING job.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new RETRYING job status for streaming insert jobs so “auto-recovering after recoverable errors” is distinguishable from user-paused / unrecoverable paused states.

Changes:

  • Introduces JobStatus.RETRYING and treats it as a “running-like” state (JobStatus.isRunning()).
  • Updates streaming job scheduler/task flow to use RETRYING + exponential backoff + task recreation, and updates streaming job failure/success transitions accordingly.
  • Updates unit + regression tests to expect RETRYING instead of PAUSED for recoverable-error scenarios.

Reviewed changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
regression-test/suites/job_p0/streaming_job/test_streaming_job_schedule_task_error.groovy Expects RETRYING for schedule failure cases
regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy Expects RETRYING during task retry scenarios
regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy Expects RETRYING when fetch-meta fails recoverably
regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter_aksk.groovy Expects RETRYING after altering credentials to invalid values
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy Expects RETRYING after wrong credential resume
fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java Adds RETRYING transition tests
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java Blocks manual RESUME while job is RETRYING
fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java Sets job to RETRYING on scheduling failures
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java Adds handleRetryingState() backoff + task recreation; PAUSED no longer auto-resumes
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java Moves recoverable failures to RETRYING; resets to RUNNING on success; adds Gson downgrade fallback
fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java Adds enum value + includes in isRunning()
fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java Allows RETRYING transitions and cancels tasks on entering RETRYING

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +46 to +48
if (JobStatus.RETRYING.equals(job.getJobStatus())) {
throw new Exception("Job is in RETRYING state, cannot be manually resumed");
}
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing a generic Exception here will be treated as an internal error in some call paths and loses the structured “analysis/user error” handling used elsewhere in commands (e.g., AlterJobStatusCommand.validate() uses AnalysisException). Consider throwing AnalysisException (or a job-specific exception type used by other job commands) and include the job name/status in the message so users get a clear, actionable error.

Copilot uses AI. Check for mistakes.
if (streamingInsertJob.hasReachedEnd()) {
streamingInsertJob.updateJobStatus(JobStatus.FINISHED);
return;
}
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleRetryingState() recreates tasks directly but does not call streamingInsertJob.replayOffsetProviderIfNeed() like handlePendingState() does (see handlePendingState() around line 71). Since the previous auto-resume path transitioned to PENDING first, this replay step may be skipped when a job is persisted/restored in RETRYING, potentially creating tasks with an un-replayed offset provider after FE restart. Consider replaying the offset provider in handleRetryingState() before createStreamingTask() (and/or reusing the PENDING flow internally while keeping the external status as RETRYING).

Suggested change
}
}
streamingInsertJob.replayOffsetProviderIfNeed();

Copilot uses AI. Check for mistakes.
Comment on lines 93 to 101
StreamingInsertJob job =
(StreamingInsertJob) Env.getCurrentEnv().getJobManager().getJob(task.getJobId());
job.setFailureReason(new FailureReason(e.getMessage()));
try {
job.updateJobStatus(JobStatus.PAUSED);
job.updateJobStatus(JobStatus.RETRYING);
} catch (JobException ex) {
log.warn("Failed to pause job {} after task {} scheduling failed",
log.warn("Failed to set job {} to RETRYING after task {} scheduling failed",
task.getJobId(), task.getTaskId(), ex);
}
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the scheduling failure handler, Env.getCurrentEnv().getJobManager().getJob(task.getJobId()) can return null (e.g., job dropped concurrently), which would cause an NPE at job.setFailureReason(...)/job.updateJobStatus(...). Add a null-check before using job (and consider logging that the job no longer exists) to avoid the scheduler thread dying unexpectedly.

Copilot uses AI. Check for mistakes.
Comment on lines +275 to +277
job.createTasks(TaskType.STREAMING, null);
job.updateJobStatus(JobStatus.RETRYING);
Assertions.assertEquals(JobStatus.RETRYING, job.getJobStatus());
Copy link

Copilot AI Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

testRetryingCancelsTasks calls job.createTasks(...) directly, which does not add tasks to AbstractJob’s runningTasks (only commonCreateTasks()/initTasks() does). As a result, updateJobStatus(RETRYING) won’t actually cancel anything and the test doesn’t validate the intended behavior implied by its name. Consider creating tasks via commonCreateTasks(TaskType.STREAMING, ...) (or calling initTasks(...)) and asserting that the task’s cancel logic was invoked and/or canceledTaskCount incremented.

Suggested change
job.createTasks(TaskType.STREAMING, null);
job.updateJobStatus(JobStatus.RETRYING);
Assertions.assertEquals(JobStatus.RETRYING, job.getJobStatus());
job.commonCreateTasks(TaskType.STREAMING, null);
job.updateJobStatus(JobStatus.RETRYING);
Assertions.assertEquals(JobStatus.RETRYING, job.getJobStatus());
Assertions.assertEquals(1, job.canceledTaskCount);

Copilot uses AI. Check for mistakes.
JNSimba and others added 2 commits April 9, 2026 11:12
…d pauseOrRetry, remove unnecessary replay logic

- Forbid RETRYING -> PENDING transition in AbstractJob (RETRYING handles
  its own retry, only PAUSED goes to PENDING via user RESUME)
- Add pauseOrRetry() in StreamingInsertJob: unrecoverable errors
  (TOO_MANY_FAILURE_ROWS, CANNOT_RESUME) go to PAUSED, others to RETRYING
- Remove replayOnCloudMode/hasReachedEnd from handleRetryingState since
  RETRYING is not persisted to editlog (same as RUNNING), FE restart
  recovers to PENDING via handlePendingState
- Remove invalid testPendingFromRetrying UT, add testPendingFromRetryingIsInvalid

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JNSimba JNSimba closed this Apr 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants