|
2 | 2 | * Integration tests for pipelineManager.ts that require a running Feldera instance. |
3 | 3 | * Run via the 'integration' vitest project: |
4 | 4 | * |
5 | | - * FELDERA_API_URL=http://localhost:8080 bun run test-integration |
| 5 | + * bun run test-integration |
6 | 6 | */ |
7 | 7 | import { afterAll, beforeAll, describe, expect, it } from 'vitest' |
8 | | - |
9 | | -const BASE_URL = process.env.FELDERA_API_URL ?? 'http://localhost:8080' |
| 8 | +import { getInputConnectorStatus, getOutputConnectorStatus, putPipeline } from '$lib/services/pipelineManager' |
| 9 | +import { |
| 10 | + cleanupPipeline, |
| 11 | + configureTestClient, |
| 12 | + startPipelineAndWaitForRunning, |
| 13 | + waitForCompilation |
| 14 | +} from '$lib/services/testPipelineHelpers' |
10 | 15 |
|
11 | 16 | // Pipeline name is URL-safe by convention, but table/view/connector names are not. |
12 | 17 | const PIPELINE_NAME = 'test-special-chars-connector-status' |
13 | 18 |
|
14 | 19 | // Names with dots and URL-unsafe characters |
15 | 20 | const TABLE_NAME = 'my.input.table' |
16 | 21 | const VIEW_NAME = 'my.output.view' |
17 | | -const INPUT_CONNECTOR_NAME = 'http.input/special&name' |
18 | | -const OUTPUT_CONNECTOR_NAME = 'http.output/special&name' |
19 | | - |
20 | | -const api = (path: string, init?: RequestInit) => |
21 | | - fetch(`${BASE_URL}${path}`, { |
22 | | - ...init, |
23 | | - headers: { 'Content-Type': 'application/json', ...init?.headers } |
24 | | - }) |
| 22 | +const INPUT_CONNECTOR_NAME = 'http-input_special-name' |
| 23 | +const OUTPUT_CONNECTOR_NAME = 'http-output_special-name' |
25 | 24 |
|
26 | 25 | describe('pipelineManager connector status with special characters', () => { |
27 | 26 | beforeAll(async () => { |
| 27 | + configureTestClient() |
| 28 | + |
28 | 29 | // Clean up any leftover pipeline from a previous run |
29 | | - const del = await api(`/v0/pipelines/${PIPELINE_NAME}`, { method: 'DELETE' }) |
30 | | - if (del.ok || del.status === 404) { |
31 | | - // ok |
32 | | - } else { |
33 | | - throw new Error(`Cleanup failed: ${del.status} ${await del.text()}`) |
34 | | - } |
| 30 | + await cleanupPipeline(PIPELINE_NAME) |
35 | 31 |
|
36 | 32 | // SQL program with quoted identifiers containing dots |
37 | 33 | const programCode = ` |
38 | 34 | CREATE TABLE "${TABLE_NAME}" (id INT NOT NULL, val VARCHAR) |
39 | 35 | WITH ( |
40 | 36 | 'connectors' = '[{ |
41 | 37 | "name": "${INPUT_CONNECTOR_NAME}", |
42 | | - "transport": { "name": "http_input" }, |
| 38 | + "transport": { "name": "url_input", "config": { "path": "https://feldera.com/test-data.json" } }, |
43 | 39 | "format": { "name": "json", "config": { "update_format": "raw" } } |
44 | 40 | }]' |
45 | 41 | ); |
46 | | - CREATE VIEW "${VIEW_NAME}" AS SELECT * FROM "${TABLE_NAME}" |
| 42 | + CREATE VIEW "${VIEW_NAME}" |
47 | 43 | WITH ( |
48 | 44 | 'connectors' = '[{ |
49 | 45 | "name": "${OUTPUT_CONNECTOR_NAME}", |
50 | | - "transport": { "name": "http_output" }, |
51 | | - "format": { "name": "json", "config": { "update_format": "raw" } } |
| 46 | + "transport": { "name": "file_output", "config": { "path": "/tmp/feldera-test-output.json" } }, |
| 47 | + "format": { "name": "json" } |
52 | 48 | }]' |
53 | | - ); |
| 49 | + ) |
| 50 | + AS SELECT * FROM "${TABLE_NAME}"; |
54 | 51 | ` |
55 | 52 |
|
56 | 53 | // Create the pipeline |
57 | | - const put = await api(`/v0/pipelines/${PIPELINE_NAME}`, { |
58 | | - method: 'PUT', |
59 | | - body: JSON.stringify({ |
60 | | - name: PIPELINE_NAME, |
61 | | - description: 'Integration test for special character handling', |
62 | | - program_code: programCode, |
63 | | - runtime_config: {} |
64 | | - }) |
| 54 | + await putPipeline(PIPELINE_NAME, { |
| 55 | + name: PIPELINE_NAME, |
| 56 | + description: 'Integration test for special character handling', |
| 57 | + program_code: programCode, |
| 58 | + runtime_config: {} |
65 | 59 | }) |
66 | | - expect(put.ok, `PUT pipeline: ${put.status} ${await put.clone().text()}`).toBe(true) |
67 | | - |
68 | | - // Wait for compilation |
69 | | - for (let i = 0; i < 120; i++) { |
70 | | - const res = await api(`/v0/pipelines/${PIPELINE_NAME}`) |
71 | | - const info = await res.json() |
72 | | - if (info.program_status === 'Success') break |
73 | | - if (info.program_status?.SqlError || info.program_status?.RustError || info.program_status?.SystemError) { |
74 | | - throw new Error(`Compilation failed: ${JSON.stringify(info.program_status)}`) |
75 | | - } |
76 | | - await new Promise((r) => setTimeout(r, 1000)) |
77 | | - } |
78 | | - |
79 | | - // Start the pipeline |
80 | | - const start = await api(`/v0/pipelines/${PIPELINE_NAME}/start`, { method: 'POST' }) |
81 | | - expect(start.ok, `POST start: ${start.status} ${await start.clone().text()}`).toBe(true) |
82 | 60 |
|
83 | | - // Wait for pipeline to be running |
84 | | - for (let i = 0; i < 60; i++) { |
85 | | - const res = await api(`/v0/pipelines/${PIPELINE_NAME}`) |
86 | | - const info = await res.json() |
87 | | - if (info.deployment_status === 'Running') break |
88 | | - if (info.deployment_error) { |
89 | | - throw new Error(`Deployment failed: ${JSON.stringify(info.deployment_error)}`) |
90 | | - } |
91 | | - await new Promise((r) => setTimeout(r, 1000)) |
92 | | - } |
| 61 | + await waitForCompilation(PIPELINE_NAME, 120_000) |
| 62 | + await startPipelineAndWaitForRunning(PIPELINE_NAME, 60_000) |
93 | 63 | }, 180_000) |
94 | 64 |
|
95 | 65 | afterAll(async () => { |
96 | | - // Shutdown and delete |
97 | | - await api(`/v0/pipelines/${PIPELINE_NAME}/shutdown`, { method: 'POST' }) |
98 | | - // Wait for shutdown |
99 | | - for (let i = 0; i < 30; i++) { |
100 | | - const res = await api(`/v0/pipelines/${PIPELINE_NAME}`) |
101 | | - const info = await res.json() |
102 | | - if (info.deployment_status === 'Shutdown') break |
103 | | - await new Promise((r) => setTimeout(r, 1000)) |
104 | | - } |
105 | | - await api(`/v0/pipelines/${PIPELINE_NAME}`, { method: 'DELETE' }) |
| 66 | + await cleanupPipeline(PIPELINE_NAME) |
106 | 67 | }, 60_000) |
107 | 68 |
|
108 | 69 | it('getInputConnectorStatus succeeds with dot and URL-unsafe characters in table and connector name', async () => { |
109 | | - const url = `/v0/pipelines/${PIPELINE_NAME}/tables/${encodeURIComponent(TABLE_NAME)}/connectors/${encodeURIComponent(INPUT_CONNECTOR_NAME)}/status` |
110 | | - const res = await api(url) |
111 | | - expect(res.ok, `GET input connector status: ${res.status} ${await res.clone().text()}`).toBe(true) |
112 | | - const body = await res.json() |
113 | | - expect(body).toHaveProperty('num_parse_errors') |
114 | | - expect(body).toHaveProperty('num_transport_errors') |
| 70 | + const body = await getInputConnectorStatus(PIPELINE_NAME, TABLE_NAME, INPUT_CONNECTOR_NAME) |
| 71 | + console.log('body', JSON.stringify(body)) |
| 72 | + expect(body).toHaveProperty(['metrics', 'num_parse_errors']) |
| 73 | + expect(body).toHaveProperty(['metrics', 'num_transport_errors']) |
115 | 74 | }) |
116 | 75 |
|
117 | 76 | it('getOutputConnectorStatus succeeds with dot and URL-unsafe characters in view and connector name', async () => { |
118 | | - const url = `/v0/pipelines/${PIPELINE_NAME}/views/${encodeURIComponent(VIEW_NAME)}/connectors/${encodeURIComponent(OUTPUT_CONNECTOR_NAME)}/status` |
119 | | - const res = await api(url) |
120 | | - expect(res.ok, `GET output connector status: ${res.status} ${await res.clone().text()}`).toBe(true) |
121 | | - const body = await res.json() |
122 | | - expect(body).toHaveProperty('num_encode_errors') |
123 | | - expect(body).toHaveProperty('num_transport_errors') |
| 77 | + const body = await getOutputConnectorStatus(PIPELINE_NAME, VIEW_NAME, OUTPUT_CONNECTOR_NAME) |
| 78 | + expect(body).toHaveProperty(['metrics', 'num_encode_errors']) |
| 79 | + expect(body).toHaveProperty(['metrics', 'num_transport_errors']) |
124 | 80 | }) |
125 | 81 | }) |
0 commit comments