[improvement](streaming-job) Add RETRYING status to distinguish abnormal jobs#62238
[improvement](streaming-job) Add RETRYING status to distinguish abnormal jobs#62238JNSimba wants to merge 5 commits intoapache:masterfrom
Conversation
…mal jobs from normal ones
Previously, when a streaming job encountered recoverable errors (task failure,
meta fetch failure, scheduling failure), it transitioned to PAUSED — the same
state used for user-initiated pause. This made it impossible to distinguish
healthy jobs from problematic ones in `show streaming jobs`.
This change introduces a new RETRYING status for recoverable errors, keeping
PAUSED exclusively for user-initiated pause and unrecoverable errors
(TOO_MANY_FAILURE_ROWS, CANNOT_RESUME).
State transition diagram after this change:
```
CREATE JOB
|
v
+---------+
| PENDING |
+----+----+
| createStreamingTask(), autoResumeCount = 0
v
+---------+
+------------| RUNNING |------------------+
| +--+----+--+ |
| | | |
user PAUSE task fail/ | hasReachedEnd
(MANUAL_PAUSE) meta fail/ | |
| sched fail | v
| (recoverable)| +----------+
| | data quality | FINISHED |
| | error +----------+
| | (unrecoverable)
v v |
+--------+ +----------+ |
| PAUSED | | RETRYING | |
+---+----+ +----+-----+ |
| | v
user RESUME backoff, +--------+
| recreate | PAUSED |
v task +--------+
+---------+ |
| PENDING | task result
+----+----+ / \
| success fail
v | |
RUNNING v v
RUNNING RETRYING
(count=0) (keep, count++)
RETRYING --> user PAUSE --> PAUSED
any non-final --> user STOP --> STOPPED
```
Key changes:
- JobStatus: add RETRYING enum, include in isRunning()
- AbstractJob: allow RETRYING transitions
- StreamingJobSchedulerTask: new handleRetryingState() with backoff and task
recreation; PAUSED case does nothing (waits for user RESUME)
- StreamingInsertJob: onStreamTaskFail/fetchMeta -> RETRYING instead of PAUSED;
onStreamTaskSuccess: RETRYING->RUNNING + reset autoResumeCount;
gsonPostProcess: null jobStatus fallback to PAUSED for downgrade safety
- ResumeJobCommand: reject RESUME on RETRYING jobs
- StreamingTaskScheduler: schedule failure -> RETRYING
- UT: add RETRYING transition tests in AbstractJobStatusTest
- Regression tests: update expected status from PAUSED to RETRYING
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
…bStatusTest Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
run buildall |
RETRYING handles its own retry logic internally, it should never go back to PENDING. Only PAUSED can go to PENDING via user RESUME. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
/review |
There was a problem hiding this comment.
I found 3 issues that should be addressed before merge.
handleRetryingState()can create overlapping retry tasks while the job remains inRETRYING.- Async failure paths can overwrite a concurrent manual pause and flip the job back to
RETRYING. - The updated CDC regression now waits for
RETRYING, but the laterALTER JOBstep still only acceptsPAUSEDjobs.
Critical checkpoint conclusions:
- Goal of the task: Partially achieved. The new
RETRYINGstatus is added and surfaced, but the end-to-end retry/manual-control flow is still inconsistent. - Modification size/focus: Small and focused, but not all parallel paths were updated consistently.
- Concurrency: Not safe yet. The timer-scheduler thread, streaming-task callback thread, and user pause/resume commands can race in ways that either spawn duplicate retry tasks or lose a manual pause.
- Lifecycle/static initialization: No new static-init issue found. The main lifecycle concern is the retry task lifecycle noted above.
- Configuration: No new configuration items.
- Compatibility/incompatible change: A new persisted enum value is introduced; the
gsonPostProcess()fallback covers the obvious downgrade deserialization case. - Parallel code paths: Not fully updated. Manual repair/alter flow and some async failure paths still assume
PAUSED. - Special conditional checks: The retry/manual-pause conditions need stronger guarding; current checks are incomplete.
- Test coverage: Added UT/regression coverage for visible status changes, but missing coverage for manual-pause races, overlapping retry tasks, and alter-from-
RETRYINGrecovery. - Observability: Existing logs/metrics look adequate for this patch; no blocking observability gap found.
- Transaction/persistence: No additional transaction or edit-log issue identified in this diff.
- Data writes/modifications: Not safe yet, because overlapping retry tasks can perform duplicate work against the same streaming job.
- FE/BE variable passing: Not applicable.
- Performance: Potential duplicate retry task creation can waste scheduler/execution resources.
- Other issues: See inline comments.
I did not run FE UT or regression tests locally in this review environment.
| return; | ||
| } | ||
| streamingInsertJob.createStreamingTask(); | ||
| streamingInsertJob.setSampleStartTime(System.currentTimeMillis()); |
There was a problem hiding this comment.
handleRetryingState() now creates a new streaming task but keeps the job in RETRYING. Because needScheduleTask() returns true for RETRYING and there is no guard here for an already-active runningStreamTask, the next timer-driven StreamingJobSchedulerTask can come back through this branch again before the first retry attempt finishes and call createStreamingTask() a second time. That opens the door to two concurrent streaming tasks for the same job. The old flow moved back to PENDING, which avoided this duplicate-task window.
| updateJobStatus(JobStatus.RUNNING); | ||
| } | ||
| if (offsetProvider.hasReachedEnd()) { | ||
| // offset provider has reached a natural end, mark job as finished |
There was a problem hiding this comment.
fetchMeta() explicitly preserves MANUAL_PAUSE_ERR before switching to RETRYING, but this task-failure path now unconditionally flips the job to RETRYING. If the user runs PAUSE JOB while a streaming task is failing, onStreamTaskFail() overwrites the manual pause reason/status and the job starts auto-retrying again. That violates the new contract that PAUSED is reserved for user-initiated pause and unrecoverable errors. This path needs the same manual-pause guard as fetchMeta(); the scheduling-failure path has the same race.
| def s = sql """select status from jobs("type"="insert") where Name='${jobName}'""" | ||
| log.info("status after wrong cred resume: " + s) | ||
| s.size() == 1 && s.get(0).get(0) == "PAUSED" | ||
| s.size() == 1 && s.get(0).get(0) == "RETRYING" |
There was a problem hiding this comment.
Once this assertion is updated to wait for RETRYING, the later ALTER JOB in scenario C no longer works: AlterJobCommand.validate() still rejects anything except PAUSED. Before this patch the auto-error state was PAUSED, so the alter step was reachable; now this test needs an explicit PAUSE JOB before the alter, or the product code needs to allow altering a RETRYING job.
There was a problem hiding this comment.
Pull request overview
Adds a new RETRYING job status for streaming insert jobs so “auto-recovering after recoverable errors” is distinguishable from user-paused / unrecoverable paused states.
Changes:
- Introduces
JobStatus.RETRYINGand treats it as a “running-like” state (JobStatus.isRunning()). - Updates streaming job scheduler/task flow to use
RETRYING+ exponential backoff + task recreation, and updates streaming job failure/success transitions accordingly. - Updates unit + regression tests to expect
RETRYINGinstead ofPAUSEDfor recoverable-error scenarios.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/test_streaming_job_schedule_task_error.groovy | Expects RETRYING for schedule failure cases |
| regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_task_retry.groovy | Expects RETRYING during task retry scenarios |
| regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_fetch_meta_error.groovy | Expects RETRYING when fetch-meta fails recoverably |
| regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_alter_aksk.groovy | Expects RETRYING after altering credentials to invalid values |
| regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_postgres_latest_alter_cred.groovy | Expects RETRYING after wrong credential resume |
| fe/fe-core/src/test/java/org/apache/doris/job/base/AbstractJobStatusTest.java | Adds RETRYING transition tests |
| fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/ResumeJobCommand.java | Blocks manual RESUME while job is RETRYING |
| fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java | Sets job to RETRYING on scheduling failures |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java | Adds handleRetryingState() backoff + task recreation; PAUSED no longer auto-resumes |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Moves recoverable failures to RETRYING; resets to RUNNING on success; adds Gson downgrade fallback |
| fe/fe-core/src/main/java/org/apache/doris/job/common/JobStatus.java | Adds enum value + includes in isRunning() |
| fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java | Allows RETRYING transitions and cancels tasks on entering RETRYING |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (JobStatus.RETRYING.equals(job.getJobStatus())) { | ||
| throw new Exception("Job is in RETRYING state, cannot be manually resumed"); | ||
| } |
There was a problem hiding this comment.
Throwing a generic Exception here will be treated as an internal error in some call paths and loses the structured “analysis/user error” handling used elsewhere in commands (e.g., AlterJobStatusCommand.validate() uses AnalysisException). Consider throwing AnalysisException (or a job-specific exception type used by other job commands) and include the job name/status in the message so users get a clear, actionable error.
| if (streamingInsertJob.hasReachedEnd()) { | ||
| streamingInsertJob.updateJobStatus(JobStatus.FINISHED); | ||
| return; | ||
| } |
There was a problem hiding this comment.
handleRetryingState() recreates tasks directly but does not call streamingInsertJob.replayOffsetProviderIfNeed() like handlePendingState() does (see handlePendingState() around line 71). Since the previous auto-resume path transitioned to PENDING first, this replay step may be skipped when a job is persisted/restored in RETRYING, potentially creating tasks with an un-replayed offset provider after FE restart. Consider replaying the offset provider in handleRetryingState() before createStreamingTask() (and/or reusing the PENDING flow internally while keeping the external status as RETRYING).
| } | |
| } | |
| streamingInsertJob.replayOffsetProviderIfNeed(); |
| StreamingInsertJob job = | ||
| (StreamingInsertJob) Env.getCurrentEnv().getJobManager().getJob(task.getJobId()); | ||
| job.setFailureReason(new FailureReason(e.getMessage())); | ||
| try { | ||
| job.updateJobStatus(JobStatus.PAUSED); | ||
| job.updateJobStatus(JobStatus.RETRYING); | ||
| } catch (JobException ex) { | ||
| log.warn("Failed to pause job {} after task {} scheduling failed", | ||
| log.warn("Failed to set job {} to RETRYING after task {} scheduling failed", | ||
| task.getJobId(), task.getTaskId(), ex); | ||
| } |
There was a problem hiding this comment.
In the scheduling failure handler, Env.getCurrentEnv().getJobManager().getJob(task.getJobId()) can return null (e.g., job dropped concurrently), which would cause an NPE at job.setFailureReason(...)/job.updateJobStatus(...). Add a null-check before using job (and consider logging that the job no longer exists) to avoid the scheduler thread dying unexpectedly.
| job.createTasks(TaskType.STREAMING, null); | ||
| job.updateJobStatus(JobStatus.RETRYING); | ||
| Assertions.assertEquals(JobStatus.RETRYING, job.getJobStatus()); |
There was a problem hiding this comment.
testRetryingCancelsTasks calls job.createTasks(...) directly, which does not add tasks to AbstractJob’s runningTasks (only commonCreateTasks()/initTasks() does). As a result, updateJobStatus(RETRYING) won’t actually cancel anything and the test doesn’t validate the intended behavior implied by its name. Consider creating tasks via commonCreateTasks(TaskType.STREAMING, ...) (or calling initTasks(...)) and asserting that the task’s cancel logic was invoked and/or canceledTaskCount incremented.
| job.createTasks(TaskType.STREAMING, null); | |
| job.updateJobStatus(JobStatus.RETRYING); | |
| Assertions.assertEquals(JobStatus.RETRYING, job.getJobStatus()); | |
| job.commonCreateTasks(TaskType.STREAMING, null); | |
| job.updateJobStatus(JobStatus.RETRYING); | |
| Assertions.assertEquals(JobStatus.RETRYING, job.getJobStatus()); | |
| Assertions.assertEquals(1, job.canceledTaskCount); |
…d pauseOrRetry, remove unnecessary replay logic - Forbid RETRYING -> PENDING transition in AbstractJob (RETRYING handles its own retry, only PAUSED goes to PENDING via user RESUME) - Add pauseOrRetry() in StreamingInsertJob: unrecoverable errors (TOO_MANY_FAILURE_ROWS, CANNOT_RESUME) go to PAUSED, others to RETRYING - Remove replayOnCloudMode/hasReachedEnd from handleRetryingState since RETRYING is not persisted to editlog (same as RUNNING), FE restart recovers to PENDING via handlePendingState - Remove invalid testPendingFromRetrying UT, add testPendingFromRetryingIsInvalid Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
RETRYINGstatus toJobStatusenum for streaming jobs, so users can distinguish between healthy running jobs and jobs that are encountering errors and auto-retrying.PAUSEDstatus, making it impossible to tell them apart inshow streaming jobs.PAUSEDis exclusively for user-initiated pause and unrecoverable errors, whileRETRYINGindicates the job is auto-recovering with exponential backoff.State transition diagram
Key changes
JobStatus.javaRETRYINGenum, include inisRunning()AbstractJob.javaStreamingJobSchedulerTask.javahandleRetryingState()with backoff + task recreation; PAUSED does nothingStreamingInsertJob.javaonStreamTaskFail/fetchMeta→ RETRYING;onStreamTaskSuccess→ RUNNING + reset count;gsonPostProcessnull fallback for downgrade safetyResumeJobCommand.javaStreamingTaskScheduler.javaAbstractJobStatusTest.javaTest plan
AbstractJobStatusTestcovers all RETRYING state transitionstest_streaming_insert_job_alter_aksk— alter to wrong credentials, verify RETRYING statustest_streaming_insert_job_fetch_meta_error— debug point fetch meta failure, verify RETRYINGtest_streaming_job_schedule_task_error— debug point schedule failure, verify RETRYINGtest_streaming_insert_job_task_retry— task timeout, verify RETRYINGtest_streaming_job_cdc_stream_postgres_latest_alter_cred— wrong PG credentials, verify RETRYING🤖 Generated with Claude Code