chatdebug

package
v2.34.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2026 License: AGPL-3.0 Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const BroadcastPubsubChannel = "chat_debug:broadcast"

BroadcastPubsubChannel is the shared pubsub channel for chat-debug events that are not scoped to a single chat, such as stale finalization sweeps.

View Source
const DefaultStaleThreshold = 5 * time.Minute

DefaultStaleThreshold is the fallback stale timeout for debug rows when no caller-provided value is supplied.

View Source
const MaxLabelLength = 200

MaxLabelLength is the maximum number of runes kept when building first_message labels for debug run summaries.

View Source
const MaxMessagePartTextLength = 10_000

MaxMessagePartTextLength is the rune limit for bounded text stored in request message parts. Longer text is truncated with an ellipsis.

View Source
const RedactedValue = "[REDACTED]"

RedactedValue replaces sensitive values in debug payloads.

Variables

AllOperations contains every Operation value. Update this when adding new constants above.

AllRunKinds contains every RunKind value. Update this when adding new constants above.

AllStatuses contains every Status value. Update this when adding new constants above.

View Source
var ErrNilModelResult = xerrors.New("language model returned nil result")

ErrNilModelResult is returned when the underlying language model returns a nil response or stream. Callers can match with errors.Is to distinguish this from provider-level failures.

Functions

func CleanupStepCounter

func CleanupStepCounter(runID uuid.UUID)

CleanupStepCounter removes per-run step counter and reference count state. This is used by tests and later stacked branches that have a real run lifecycle.

func ContextWithRun

func ContextWithRun(ctx context.Context, rc *RunContext) context.Context

ContextWithRun stores rc in ctx.

Step counter cleanup is reference-counted per RunID: each live RunContext increments a counter and runtime.AddCleanup decrements it when the struct is garbage collected. Shared state (step counters) is only deleted when the last RunContext for a given RunID becomes unreachable, preventing premature cleanup when multiple RunContext instances share the same RunID.

func ContextWithStep

func ContextWithStep(ctx context.Context, sc *StepContext) context.Context

ContextWithStep stores sc in ctx.

func ExtractFirstUserText

func ExtractFirstUserText(prompt fantasy.Prompt) string

ExtractFirstUserText extracts the plain text content from a fantasy.Prompt for the first user message. Used to derive first_message labels at run creation time.

func PubsubChannel

func PubsubChannel(chatID uuid.UUID) string

PubsubChannel returns the chat-scoped pubsub channel for debug events. Nil chat IDs use the shared broadcast channel so publishers and subscribers can coordinate through one discoverable helper.

func RedactHeaders

func RedactHeaders(h http.Header) map[string]string

RedactHeaders returns a flattened copy of h with sensitive values redacted.

func RedactJSONSecrets

func RedactJSONSecrets(data []byte) []byte

RedactJSONSecrets redacts sensitive JSON values by key name. When the input is not valid JSON (truncated body, HTML error page, etc.) the raw bytes are replaced entirely with a diagnostic placeholder to avoid leaking credentials from malformed payloads.

func RedactNDJSONSecrets

func RedactNDJSONSecrets(data []byte) []byte

RedactNDJSONSecrets redacts sensitive values in newline-delimited JSON (NDJSON) payloads. Each non-empty line is treated as an independent JSON document and redacted individually. Lines that fail to parse are replaced with a diagnostic placeholder.

func ReuseStep

func ReuseStep(ctx context.Context) context.Context

ReuseStep marks ctx so wrapped model calls under it share one debug step.

func SeedSummary

func SeedSummary(label string) map[string]any

SeedSummary builds a base summary map with a first_message label. Returns nil if label is empty.

func TruncateLabel

func TruncateLabel(text string, maxLen int) string

TruncateLabel whitespace-normalizes and truncates text to maxLen runes. Returns "" if input is empty or whitespace-only.

func WrapModel

func WrapModel(
	model fantasy.LanguageModel,
	svc *Service,
	opts RecorderOptions,
) fantasy.LanguageModel

WrapModel returns model unchanged when debug recording is disabled, or a debug wrapper when a service is available.

Types

type Attempt

type Attempt struct {
	Number              int               `json:"number"`
	Status              string            `json:"status,omitempty"`
	Method              string            `json:"method,omitempty"`
	URL                 string            `json:"url,omitempty"`
	Path                string            `json:"path,omitempty"`
	StartedAt           string            `json:"started_at,omitempty"`
	FinishedAt          string            `json:"finished_at,omitempty"`
	RequestHeaders      map[string]string `json:"request_headers,omitempty"`
	RequestBody         []byte            `json:"request_body,omitempty"`
	ResponseStatus      int               `json:"response_status,omitempty"`
	ResponseHeaders     map[string]string `json:"response_headers,omitempty"`
	ResponseBody        []byte            `json:"response_body,omitempty"`
	Error               string            `json:"error,omitempty"`
	DurationMs          int64             `json:"duration_ms"`
	RetryClassification string            `json:"retry_classification,omitempty"`
	RetryDelayMs        int64             `json:"retry_delay_ms,omitempty"`
}

Attempt captures a single HTTP round trip made during a step.

type CreateRunParams

type CreateRunParams struct {
	ChatID              uuid.UUID
	RootChatID          uuid.UUID
	ParentChatID        uuid.UUID
	ModelConfigID       uuid.UUID
	TriggerMessageID    int64
	HistoryTipMessageID int64
	Kind                RunKind
	Status              Status
	Provider            string
	Model               string
	Summary             any
}

CreateRunParams contains friendly inputs for creating a debug run.

type CreateStepParams

type CreateStepParams struct {
	RunID               uuid.UUID
	ChatID              uuid.UUID
	StepNumber          int32
	Operation           Operation
	Status              Status
	HistoryTipMessageID int64
	NormalizedRequest   any
}

CreateStepParams contains friendly inputs for creating a debug step.

type DebugEvent

type DebugEvent struct {
	Kind   EventKind `json:"kind"`
	ChatID uuid.UUID `json:"chat_id"`
	RunID  uuid.UUID `json:"run_id"`
	StepID uuid.UUID `json:"step_id"`
}

DebugEvent is the lightweight pubsub envelope for chat debug updates.

type EventKind

type EventKind string

EventKind identifies the type of pubsub debug event.

const (
	// EventKindRunUpdate publishes a run mutation.
	EventKindRunUpdate EventKind = "run_update"
	// EventKindStepUpdate publishes a step mutation.
	EventKindStepUpdate EventKind = "step_update"
	// EventKindFinalize publishes a finalization signal.
	EventKindFinalize EventKind = "finalize"
	// EventKindDelete publishes a deletion signal.
	EventKindDelete EventKind = "delete"
)

type FinalizeRunParams

type FinalizeRunParams struct {
	RunID       uuid.UUID
	ChatID      uuid.UUID
	Status      Status
	SeedSummary map[string]any
	// Timeout for the aggregate + update calls. Zero defaults to 5s.
	Timeout time.Duration
}

FinalizeRunParams bundles the arguments for FinalizeRun.

type Operation

type Operation string

Operation identifies the model operation a step performed.

const (
	// OperationStream records a streaming model operation.
	OperationStream Operation = "stream"
	// OperationGenerate records a non-streaming generation operation.
	OperationGenerate Operation = "generate"
)

type RecorderOptions

type RecorderOptions struct {
	ChatID   uuid.UUID
	OwnerID  uuid.UUID
	Provider string
	Model    string
}

RecorderOptions identifies the chat/model context for debug recording.

type RecordingTransport

type RecordingTransport struct {
	// Base is the underlying transport. nil defaults to http.DefaultTransport.
	Base http.RoundTripper
}

RecordingTransport captures HTTP request/response data for debug steps. When the request context carries an attemptSink, it records each round trip. Otherwise it delegates directly.

func (*RecordingTransport) RoundTrip

func (t *RecordingTransport) RoundTrip(req *http.Request) (*http.Response, error)

type RunContext

type RunContext struct {
	RunID               uuid.UUID
	ChatID              uuid.UUID
	RootChatID          uuid.UUID // Zero means not set.
	ParentChatID        uuid.UUID // Zero means not set.
	ModelConfigID       uuid.UUID // Zero means not set.
	TriggerMessageID    int64     // Zero means not set.
	HistoryTipMessageID int64     // Zero means not set.
	Kind                RunKind
	Provider            string
	Model               string
}

RunContext carries identity and metadata for a debug run.

func RunFromContext

func RunFromContext(ctx context.Context) (*RunContext, bool)

RunFromContext returns the debug run context stored in ctx.

type RunKind

type RunKind string

RunKind identifies the kind of debug run being recorded.

const (
	// KindChatTurn records a standard chat turn.
	KindChatTurn RunKind = "chat_turn"
	// KindTitleGeneration records title generation for a chat.
	KindTitleGeneration RunKind = "title_generation"
	// KindQuickgen records quick-generation workflows.
	KindQuickgen RunKind = "quickgen"
	// KindCompaction records history compaction workflows.
	KindCompaction RunKind = "compaction"
)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service persists chat debug rows and fans out lightweight change events.

func NewService

func NewService(db database.Store, log slog.Logger, ps pubsub.Pubsub, opts ...ServiceOption) *Service

NewService constructs a chat debug persistence service.

func (*Service) AggregateRunSummary

func (s *Service) AggregateRunSummary(
	ctx context.Context,
	runID uuid.UUID,
	baseSummary map[string]any,
) (map[string]any, error)

AggregateRunSummary reads all steps for the given run, computes token totals, and merges them with the run's existing summary (preserving any seeded first_message label). The baseSummary parameter should be the current run summary (may be nil).

func (*Service) CreateRun

func (s *Service) CreateRun(
	ctx context.Context,
	params CreateRunParams,
) (database.ChatDebugRun, error)

CreateRun inserts a new debug run and emits a run update event.

func (*Service) CreateStep

func (s *Service) CreateStep(
	ctx context.Context,
	params CreateStepParams,
) (database.ChatDebugStep, error)

CreateStep inserts a new debug step and emits a step update event. It returns errRunFinalized if the parent run has already finished, or errRunNotFound if the run_id/chat_id pair does not match an existing run. The finalization guard is enforced atomically by the INSERT's CTE, which issues an UPDATE on the parent run (taking a row lock). This prevents concurrent FinalizeStale from setting finished_at between the check and the INSERT.

func (*Service) DeleteAfterMessageID

func (s *Service) DeleteAfterMessageID(
	ctx context.Context,
	chatID uuid.UUID,
	messageID int64,
	startedBefore time.Time,
) (int64, error)

DeleteAfterMessageID deletes debug data newer than the given message. The startedBefore bound scopes deletion to runs created before that instant so that retried cleanup does not remove runs created by a replacement turn that raced ahead of the retry window.

func (*Service) DeleteByChatID

func (s *Service) DeleteByChatID(
	ctx context.Context,
	chatID uuid.UUID,
	startedBefore time.Time,
) (int64, error)

DeleteByChatID deletes debug data for a chat and emits a delete event. The startedBefore bound scopes deletion to runs created before that instant so that retried cleanup does not remove runs created by a replacement turn that raced ahead of the retry window (for example, an unarchive that fires between the initial archive-cleanup attempt and its retry).

func (*Service) FinalizeRun

func (s *Service) FinalizeRun(ctx context.Context, p FinalizeRunParams) error

FinalizeRun aggregates the run summary, updates the run status, and cleans up the step counter. It detaches from the parent context's cancellation so finalization succeeds even when the request context is already done. Errors are returned but are always safe to ignore; callers that treat debug instrumentation as best-effort can discard them.

func (*Service) FinalizeStale

func (s *Service) FinalizeStale(
	ctx context.Context,
) (database.FinalizeStaleChatDebugRowsRow, error)

FinalizeStale finalizes stale in-flight debug rows and emits a broadcast.

func (*Service) IsEnabled

func (s *Service) IsEnabled(
	ctx context.Context,
	chatID uuid.UUID,
	ownerID uuid.UUID,
) bool

IsEnabled returns whether debug logging is enabled for the given chat.

func (*Service) SetStaleAfter

func (s *Service) SetStaleAfter(staleAfter time.Duration)

SetStaleAfter overrides the in-flight stale threshold used when finalizing abandoned debug rows. Zero or negative durations are ignored, leaving the current threshold (initial or previously overridden) unchanged. Active heartbeat goroutines are woken so they can re-read the (possibly shorter) interval immediately.

func (*Service) TouchStep

func (s *Service) TouchStep(
	ctx context.Context,
	stepID uuid.UUID,
	runID uuid.UUID,
	chatID uuid.UUID,
) error

TouchStep bumps the step's and its parent run's updated_at timestamps without changing any other fields. This prevents long-running operations (e.g. streaming) from being prematurely swept by FinalizeStale, which first marks runs stale by chat_debug_runs.updated_at and then cascades to steps whose run_id was just finalized.

func (*Service) UpdateRun

func (s *Service) UpdateRun(
	ctx context.Context,
	params UpdateRunParams,
) (database.ChatDebugRun, error)

UpdateRun updates an existing debug run and emits a run update event. When a terminal status is set without an explicit FinishedAt, the service auto-fills the timestamp so the row is immediately visible to the InsertChatDebugStep atomic guard (finished_at IS NULL). UpdateChatDebugRun itself enforces finished_at as write-once: once the column is populated, repeated auto-fills or explicit refreshes never overwrite the original completion timestamp, so calling this more than once on an already-finalized run is idempotent.

func (*Service) UpdateStep

func (s *Service) UpdateStep(
	ctx context.Context,
	params UpdateStepParams,
) (database.ChatDebugStep, error)

UpdateStep updates an existing debug step and emits a step update event. When a terminal status is set without an explicit FinishedAt, the service auto-fills the timestamp so the stale sweep does not leave terminal rows with finished_at = NULL.

type ServiceOption

type ServiceOption func(*Service)

ServiceOption configures optional Service behavior.

func WithAlwaysEnable

func WithAlwaysEnable(always bool) ServiceOption

WithAlwaysEnable forces debug logging on for every chat regardless of the runtime admin and user opt-in settings. This is used for the deployment-level serpent flag.

func WithClock

func WithClock(c quartz.Clock) ServiceOption

WithClock overrides the default real clock. Tests inject quartz.NewMock(t) to control time-dependent behavior such as heartbeat tickers and FinalizeStale timestamps.

func WithStaleThreshold

func WithStaleThreshold(d time.Duration) ServiceOption

WithStaleThreshold overrides the default stale-row finalization threshold. Callers that already have a configurable in-flight chat timeout (e.g. chatd's InFlightChatStaleAfter) should pass it here so the two sweeps stay in sync.

type Status

type Status string

Status identifies lifecycle state shared by runs and steps.

const (
	// StatusInProgress indicates work is still running.
	StatusInProgress Status = "in_progress"
	// StatusCompleted indicates work finished successfully.
	StatusCompleted Status = "completed"
	// StatusError indicates work finished with an error.
	StatusError Status = "error"
	// StatusInterrupted indicates work was canceled or interrupted.
	StatusInterrupted Status = "interrupted"
)

func ClassifyError

func ClassifyError(err error) Status

ClassifyError maps a run error to the appropriate debug status. nil → StatusCompleted, context.Canceled → StatusInterrupted, everything else → StatusError. Callers with additional classification rules (e.g. ErrInterrupted, ErrDynamicToolCall) should handle those before falling back to this helper.

func (Status) IsTerminal

func (s Status) IsTerminal() bool

IsTerminal reports whether the status represents a final state that should not be overwritten by stale callbacks.

func (Status) Priority

func (s Status) Priority() int

Priority returns a numeric ordering used to prevent stale callbacks from regressing a step's status. Higher values win over lower ones.

type StepContext

type StepContext struct {
	StepID              uuid.UUID
	RunID               uuid.UUID
	ChatID              uuid.UUID
	StepNumber          int32
	Operation           Operation
	HistoryTipMessageID int64 // Zero means not set.
}

StepContext carries identity and metadata for a debug step.

func StepFromContext

func StepFromContext(ctx context.Context) (*StepContext, bool)

StepFromContext returns the debug step context stored in ctx.

type UpdateRunParams

type UpdateRunParams struct {
	ID         uuid.UUID
	ChatID     uuid.UUID
	Status     Status
	Summary    any
	FinishedAt time.Time
}

UpdateRunParams contains inputs for updating a debug run. Zero-valued fields are treated as "keep the existing value" by the COALESCE-based SQL query. Once a field is set it cannot be cleared back to NULL; this is intentional for the write-once-finalize lifecycle of debug rows.

type UpdateStepParams

type UpdateStepParams struct {
	ID                 uuid.UUID
	ChatID             uuid.UUID
	Status             Status
	AssistantMessageID int64
	NormalizedResponse any
	Usage              any
	Attempts           []Attempt
	Error              any
	Metadata           any
	FinishedAt         time.Time
}

UpdateStepParams contains optional inputs for updating a debug step. Most payload fields are typed as any and serialized through nullJSON because their shape varies by provider. The Attempts field uses a concrete slice for compile-time safety where the schema is stable. Zero-valued fields are treated as "keep the existing value" by the COALESCE-based SQL query. Once set, fields cannot be cleared back to NULL. This is intentional for the write-once-finalize lifecycle of debug rows.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL