Skip to content

Commit 59c6880

Browse files
committed
[web-console] Integration tests for retrieving connector errors
Refactor existing UI tests to re-use pipeline state management procedures Signed-off-by: Karakatiza666 <bulakh.96@gmail.com>
1 parent b28f581 commit 59c6880

File tree

7 files changed

+259
-169
lines changed

7 files changed

+259
-169
lines changed

js-packages/web-console/src/lib/components/pipelines/editor/performance/ConnectorErrors.svelte

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import InlineDropdown from '$lib/components/common/InlineDropdown.svelte'
66
import { Tooltip } from '$lib/components/common/Tooltip.svelte'
77
import InlineDrawer from '$lib/components/layout/InlineDrawer.svelte'
8+
import { getCaseDependentName } from '$lib/functions/felderaRelation'
89
import { formatDateTime } from '$lib/functions/format'
910
import {
1011
type ConnectorError,
1112
type InputEndpointStatus,
1213
type OutputEndpointStatus
1314
} from '$lib/services/manager'
1415
import { getInputConnectorStatus, getOutputConnectorStatus } from '$lib/services/pipelineManager'
15-
import { getCaseDependentName } from '$lib/functions/felderaRelation'
1616
1717
export type ConnectorErrorFilter = 'all' | 'parse' | 'transport' | 'encode'
1818
@@ -62,7 +62,9 @@
6262
tagsFilter = filter
6363
})
6464
65-
const strippedConnectorName = $derived(connectorName.slice(getCaseDependentName(relationName).name.length + 1))
65+
const strippedConnectorName = $derived(
66+
connectorName.slice(getCaseDependentName(relationName).name.length + 1)
67+
)
6668
6769
$effect(() => {
6870
pipelineName

js-packages/web-console/src/lib/services/pipelineManager.test.ts

Lines changed: 38 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,124 +2,84 @@
22
* Integration tests for pipelineManager.ts that require a running Feldera instance.
33
* Run via the 'integration' vitest project:
44
*
5-
* FELDERA_API_URL=http://localhost:8080 bun run test-integration
5+
* bun run test-integration
66
*/
77
import { afterAll, beforeAll, describe, expect, it } from 'vitest'
8-
9-
const BASE_URL = process.env.FELDERA_API_URL ?? 'http://localhost:8080'
8+
import {
9+
getInputConnectorStatus,
10+
getOutputConnectorStatus,
11+
putPipeline
12+
} from '$lib/services/pipelineManager'
13+
import {
14+
cleanupPipeline,
15+
configureTestClient,
16+
startPipelineAndWaitForRunning,
17+
waitForCompilation
18+
} from '$lib/services/testPipelineHelpers'
1019

1120
// Pipeline name is URL-safe by convention, but table/view/connector names are not.
1221
const PIPELINE_NAME = 'test-special-chars-connector-status'
1322

1423
// Names with dots and URL-unsafe characters
1524
const TABLE_NAME = 'my.input.table'
1625
const VIEW_NAME = 'my.output.view'
17-
const INPUT_CONNECTOR_NAME = 'http.input/special&name'
18-
const OUTPUT_CONNECTOR_NAME = 'http.output/special&name'
19-
20-
const api = (path: string, init?: RequestInit) =>
21-
fetch(`${BASE_URL}${path}`, {
22-
...init,
23-
headers: { 'Content-Type': 'application/json', ...init?.headers }
24-
})
26+
const INPUT_CONNECTOR_NAME = 'http-input_special-name'
27+
const OUTPUT_CONNECTOR_NAME = 'http-output_special-name'
2528

2629
describe('pipelineManager connector status with special characters', () => {
2730
beforeAll(async () => {
31+
configureTestClient()
32+
2833
// Clean up any leftover pipeline from a previous run
29-
const del = await api(`/v0/pipelines/${PIPELINE_NAME}`, { method: 'DELETE' })
30-
if (del.ok || del.status === 404) {
31-
// ok
32-
} else {
33-
throw new Error(`Cleanup failed: ${del.status} ${await del.text()}`)
34-
}
34+
await cleanupPipeline(PIPELINE_NAME)
3535

3636
// SQL program with quoted identifiers containing dots
3737
const programCode = `
3838
CREATE TABLE "${TABLE_NAME}" (id INT NOT NULL, val VARCHAR)
3939
WITH (
4040
'connectors' = '[{
4141
"name": "${INPUT_CONNECTOR_NAME}",
42-
"transport": { "name": "http_input" },
42+
"transport": { "name": "url_input", "config": { "path": "https://feldera.com/test-data.json" } },
4343
"format": { "name": "json", "config": { "update_format": "raw" } }
4444
}]'
4545
);
46-
CREATE VIEW "${VIEW_NAME}" AS SELECT * FROM "${TABLE_NAME}"
46+
CREATE VIEW "${VIEW_NAME}"
4747
WITH (
4848
'connectors' = '[{
4949
"name": "${OUTPUT_CONNECTOR_NAME}",
50-
"transport": { "name": "http_output" },
51-
"format": { "name": "json", "config": { "update_format": "raw" } }
50+
"transport": { "name": "file_output", "config": { "path": "/tmp/feldera-test-output.json" } },
51+
"format": { "name": "json" }
5252
}]'
53-
);
53+
)
54+
AS SELECT * FROM "${TABLE_NAME}";
5455
`
5556

5657
// Create the pipeline
57-
const put = await api(`/v0/pipelines/${PIPELINE_NAME}`, {
58-
method: 'PUT',
59-
body: JSON.stringify({
60-
name: PIPELINE_NAME,
61-
description: 'Integration test for special character handling',
62-
program_code: programCode,
63-
runtime_config: {}
64-
})
58+
await putPipeline(PIPELINE_NAME, {
59+
name: PIPELINE_NAME,
60+
description: 'Integration test for special character handling',
61+
program_code: programCode,
62+
runtime_config: {}
6563
})
66-
expect(put.ok, `PUT pipeline: ${put.status} ${await put.clone().text()}`).toBe(true)
67-
68-
// Wait for compilation
69-
for (let i = 0; i < 120; i++) {
70-
const res = await api(`/v0/pipelines/${PIPELINE_NAME}`)
71-
const info = await res.json()
72-
if (info.program_status === 'Success') break
73-
if (info.program_status?.SqlError || info.program_status?.RustError || info.program_status?.SystemError) {
74-
throw new Error(`Compilation failed: ${JSON.stringify(info.program_status)}`)
75-
}
76-
await new Promise((r) => setTimeout(r, 1000))
77-
}
78-
79-
// Start the pipeline
80-
const start = await api(`/v0/pipelines/${PIPELINE_NAME}/start`, { method: 'POST' })
81-
expect(start.ok, `POST start: ${start.status} ${await start.clone().text()}`).toBe(true)
8264

83-
// Wait for pipeline to be running
84-
for (let i = 0; i < 60; i++) {
85-
const res = await api(`/v0/pipelines/${PIPELINE_NAME}`)
86-
const info = await res.json()
87-
if (info.deployment_status === 'Running') break
88-
if (info.deployment_error) {
89-
throw new Error(`Deployment failed: ${JSON.stringify(info.deployment_error)}`)
90-
}
91-
await new Promise((r) => setTimeout(r, 1000))
92-
}
65+
await waitForCompilation(PIPELINE_NAME, 120_000)
66+
await startPipelineAndWaitForRunning(PIPELINE_NAME, 60_000)
9367
}, 180_000)
9468

9569
afterAll(async () => {
96-
// Shutdown and delete
97-
await api(`/v0/pipelines/${PIPELINE_NAME}/shutdown`, { method: 'POST' })
98-
// Wait for shutdown
99-
for (let i = 0; i < 30; i++) {
100-
const res = await api(`/v0/pipelines/${PIPELINE_NAME}`)
101-
const info = await res.json()
102-
if (info.deployment_status === 'Shutdown') break
103-
await new Promise((r) => setTimeout(r, 1000))
104-
}
105-
await api(`/v0/pipelines/${PIPELINE_NAME}`, { method: 'DELETE' })
70+
await cleanupPipeline(PIPELINE_NAME)
10671
}, 60_000)
10772

10873
it('getInputConnectorStatus succeeds with dot and URL-unsafe characters in table and connector name', async () => {
109-
const url = `/v0/pipelines/${PIPELINE_NAME}/tables/${encodeURIComponent(TABLE_NAME)}/connectors/${encodeURIComponent(INPUT_CONNECTOR_NAME)}/status`
110-
const res = await api(url)
111-
expect(res.ok, `GET input connector status: ${res.status} ${await res.clone().text()}`).toBe(true)
112-
const body = await res.json()
113-
expect(body).toHaveProperty('num_parse_errors')
114-
expect(body).toHaveProperty('num_transport_errors')
74+
const body = await getInputConnectorStatus(PIPELINE_NAME, TABLE_NAME, INPUT_CONNECTOR_NAME)
75+
console.log('body', JSON.stringify(body))
76+
expect(body).toHaveProperty(['metrics', 'num_parse_errors'])
77+
expect(body).toHaveProperty(['metrics', 'num_transport_errors'])
11578
})
11679

11780
it('getOutputConnectorStatus succeeds with dot and URL-unsafe characters in view and connector name', async () => {
118-
const url = `/v0/pipelines/${PIPELINE_NAME}/views/${encodeURIComponent(VIEW_NAME)}/connectors/${encodeURIComponent(OUTPUT_CONNECTOR_NAME)}/status`
119-
const res = await api(url)
120-
expect(res.ok, `GET output connector status: ${res.status} ${await res.clone().text()}`).toBe(true)
121-
const body = await res.json()
122-
expect(body).toHaveProperty('num_encode_errors')
123-
expect(body).toHaveProperty('num_transport_errors')
81+
const body = await getOutputConnectorStatus(PIPELINE_NAME, VIEW_NAME, OUTPUT_CONNECTOR_NAME)
82+
expect(body).toHaveProperty(['metrics', 'num_encode_errors'])
83+
expect(body).toHaveProperty(['metrics', 'num_transport_errors'])
12484
})
12585
})
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/**
2+
* Shared test helpers for pipeline lifecycle management.
3+
*
4+
* Used by both Playwright e2e tests and Vitest integration tests to avoid
5+
* duplicating pipeline create/compile/start/stop/delete boilerplate.
6+
*/
7+
8+
import { client } from '$lib/services/manager/client.gen'
9+
import {
10+
deletePipeline,
11+
type ExtendedPipeline,
12+
getExtendedPipeline,
13+
getPipelineStatus,
14+
postPipelineAction,
15+
programStatusOf,
16+
putPipeline
17+
} from '$lib/services/pipelineManager'
18+
19+
export { type ExtendedPipeline }
20+
21+
/**
22+
* Configure the API client base URL for tests.
23+
*
24+
* Reads from `FELDERA_TEST_API_ORIGIN` (shared) or `PLAYWRIGHT_API_ORIGIN`
25+
* (Playwright-specific), falling back to `http://localhost:8080`.
26+
*/
27+
export function configureTestClient() {
28+
const origin = (
29+
process.env.FELDERA_TEST_API_ORIGIN ??
30+
process.env.PLAYWRIGHT_API_ORIGIN ??
31+
'http://localhost:8080'
32+
).replace(/\/$/, '')
33+
client.setConfig({ baseUrl: origin })
34+
return origin
35+
}
36+
37+
/**
38+
* Poll `getExtendedPipeline` until `predicate` returns true or `timeoutMs` elapses.
39+
*/
40+
export async function waitForExtendedPipeline(
41+
pipelineName: string,
42+
predicate: (p: ExtendedPipeline) => boolean,
43+
timeoutMs = 120_000
44+
): Promise<ExtendedPipeline> {
45+
const deadline = Date.now() + timeoutMs
46+
while (Date.now() < deadline) {
47+
const pipeline = await getExtendedPipeline(pipelineName)
48+
if (predicate(pipeline)) return pipeline
49+
await new Promise((r) => setTimeout(r, 2000))
50+
}
51+
const pipeline = await getExtendedPipeline(pipelineName)
52+
throw new Error(
53+
`waitForExtendedPipeline timed out for "${pipelineName}". ` +
54+
`deploy=${pipeline.deploymentStatus} status=${JSON.stringify(pipeline.status)} storage=${pipeline.storageStatus}`
55+
)
56+
}
57+
58+
/**
59+
* Poll `getPipelineStatus` until `predicate` returns true or `timeoutMs` elapses.
60+
*/
61+
export async function waitForPipelineStatus(
62+
pipelineName: string,
63+
predicate: (status: string) => boolean,
64+
timeoutMs = 120_000
65+
) {
66+
const deadline = Date.now() + timeoutMs
67+
while (Date.now() < deadline) {
68+
const { status } = await getPipelineStatus(pipelineName)
69+
if (predicate(status as string)) return status
70+
await new Promise((r) => setTimeout(r, 1000))
71+
}
72+
const { status } = await getPipelineStatus(pipelineName)
73+
throw new Error(
74+
`waitForPipelineStatus timed out for "${pipelineName}". status=${JSON.stringify(status)}`
75+
)
76+
}
77+
78+
/**
79+
* Wait for a pipeline's SQL/Rust compilation to finish successfully.
80+
* Throws on compilation errors.
81+
*/
82+
export async function waitForCompilation(pipelineName: string, timeoutMs = 600_000) {
83+
const deadline = Date.now() + timeoutMs
84+
while (Date.now() < deadline) {
85+
const { status } = await getPipelineStatus(pipelineName)
86+
const programStatus = programStatusOf(status)
87+
if (programStatus === 'Success') return
88+
if (
89+
programStatus === 'SqlError' ||
90+
programStatus === 'RustError' ||
91+
programStatus === 'SystemError'
92+
) {
93+
throw new Error(`Compilation failed for "${pipelineName}": ${programStatus}`)
94+
}
95+
await new Promise((r) => setTimeout(r, 2000))
96+
}
97+
throw new Error(`Compilation timed out for "${pipelineName}"`)
98+
}
99+
100+
/**
101+
* Start a pipeline and wait until it reaches the Running state.
102+
* Automatically handles the AwaitingApproval → approve_changes transition.
103+
*/
104+
export async function startPipelineAndWaitForRunning(pipelineName: string, timeoutMs = 60_000) {
105+
await postPipelineAction(pipelineName, 'start')
106+
const deadline = Date.now() + timeoutMs
107+
while (Date.now() < deadline) {
108+
const { status } = await getPipelineStatus(pipelineName)
109+
if (status === 'Running') return
110+
if (status === 'AwaitingApproval') {
111+
await postPipelineAction(pipelineName, 'approve_changes')
112+
}
113+
await new Promise((r) => setTimeout(r, 1000))
114+
}
115+
throw new Error(`Pipeline "${pipelineName}" did not reach Running within ${timeoutMs}ms`)
116+
}
117+
118+
/**
119+
* Stop a pipeline and wait until it reaches the Stopped state.
120+
*/
121+
export async function stopPipelineAndWaitForStopped(pipelineName: string, timeoutMs = 30_000) {
122+
try {
123+
await postPipelineAction(pipelineName, 'stop')
124+
} catch {
125+
// Ignore if already stopped
126+
}
127+
const deadline = Date.now() + timeoutMs
128+
while (Date.now() < deadline) {
129+
const { status } = await getPipelineStatus(pipelineName)
130+
if (status === 'Stopped') return
131+
await new Promise((r) => setTimeout(r, 1000))
132+
}
133+
throw new Error(`Pipeline "${pipelineName}" did not reach Stopped within ${timeoutMs}ms`)
134+
}
135+
136+
/**
137+
* Fully clean up a pipeline: stop → wait → delete. Ignores errors at every step.
138+
*/
139+
export async function cleanupPipeline(pipelineName: string) {
140+
try {
141+
await stopPipelineAndWaitForStopped(pipelineName)
142+
} catch {}
143+
try {
144+
await deletePipeline(pipelineName)
145+
} catch {}
146+
}
147+
148+
const WARMUP_PIPELINE = '__test_warmup__'
149+
150+
/**
151+
* Warm the Rust compilation cache by creating, compiling, and deleting a
152+
* minimal pipeline. The first compilation in a fresh Feldera image compiles
153+
* many Rust crates from scratch, so this prevents individual tests from
154+
* timing out.
155+
*/
156+
export async function warmCompilationCache() {
157+
console.log('Warming Rust compilation cache…')
158+
159+
// Clean up any leftover warmup pipeline
160+
try {
161+
await deletePipeline(WARMUP_PIPELINE)
162+
} catch {}
163+
164+
try {
165+
await putPipeline(WARMUP_PIPELINE, {
166+
name: WARMUP_PIPELINE,
167+
program_code: 'CREATE TABLE _warmup (id INT);',
168+
program_config: { profile: 'unoptimized' }
169+
})
170+
} catch (e) {
171+
console.error('warmCompilationCache: putPipeline failed:', e)
172+
throw e
173+
}
174+
175+
await waitForCompilation(WARMUP_PIPELINE)
176+
console.log('Compilation cache is warm.')
177+
178+
await deletePipeline(WARMUP_PIPELINE)
179+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/**
2+
* Vitest globalSetup for integration test projects.
3+
*
4+
* Configures the API client and warms the Rust compilation cache,
5+
* mirroring what tests/global-setup.ts does for Playwright.
6+
*/
7+
8+
import { configureTestClient, warmCompilationCache } from '$lib/services/testPipelineHelpers'
9+
10+
export async function setup() {
11+
configureTestClient()
12+
await warmCompilationCache()
13+
}

0 commit comments

Comments
 (0)