-
Notifications
You must be signed in to change notification settings - Fork 46
Expand file tree
/
Copy pathgenerators.mjs
More file actions
135 lines (104 loc) · 3.87 KB
/
generators.mjs
File metadata and controls
135 lines (104 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
'use strict';
import { allGenerators } from './generators/index.mjs';
import logger from './logger/index.mjs';
import { isAsyncGenerator, createStreamingCache } from './streaming.mjs';
import createWorkerPool from './threading/index.mjs';
import createParallelWorker from './threading/parallel.mjs';
const generatorsLogger = logger.child('generators');
/**
* Creates a generator orchestration system that manages the execution of
* documentation generators in dependency order, with support for parallel
* processing and streaming results.
*/
const createGenerator = () => {
/** @type {{ [key: string]: Promise<unknown> | AsyncGenerator }} */
const cachedGenerators = {};
const streamingCache = createStreamingCache();
/** @type {import('piscina').Piscina} */
let pool;
/**
* Gets the collected input from a dependency generator.
*
* @param {string | undefined} dependsOn - Dependency generator name
* @returns {Promise<unknown>}
*/
const getDependencyInput = async dependsOn => {
if (!dependsOn) {
return undefined;
}
const result = await cachedGenerators[dependsOn];
if (isAsyncGenerator(result)) {
return streamingCache.getOrCollect(dependsOn, result);
}
return result;
};
/**
* Schedules a generator and its dependencies for execution.
*
* @param {string} generatorName - Generator to schedule
* @param {import('./utils/configuration/types').Configuration} configuration - Runtime options
*/
const scheduleGenerator = async (generatorName, configuration) => {
if (generatorName in cachedGenerators) {
return;
}
const { dependsOn, generate, hasParallelProcessor } =
allGenerators[generatorName];
// Schedule dependency first
if (dependsOn && !(dependsOn in cachedGenerators)) {
await scheduleGenerator(dependsOn, configuration);
}
generatorsLogger.debug(`Scheduling "${generatorName}"`, {
dependsOn: dependsOn || 'none',
streaming: hasParallelProcessor,
});
// Schedule the generator
cachedGenerators[generatorName] = (async () => {
const dependencyInput = await getDependencyInput(dependsOn);
generatorsLogger.debug(`Starting "${generatorName}"`);
// Create parallel worker for streaming generators
const worker = hasParallelProcessor
? createParallelWorker(generatorName, pool, configuration)
: Promise.resolve(null);
const result = await generate(dependencyInput, await worker);
// For streaming generators, "Completed" is logged when collection finishes
// (in streamingCache.getOrCollect), not here when the generator returns
if (!isAsyncGenerator(result)) {
generatorsLogger.debug(`Completed "${generatorName}"`);
}
return result;
})();
};
/**
* Runs all requested generators with their dependencies.
*
* @param {import('./utils/configuration/types').Configuration} options - Runtime options
* @returns {Promise<unknown[]>} Results of all requested generators
*/
const runGenerators = async configuration => {
const { target: generators, threads } = configuration;
generatorsLogger.debug(`Starting pipeline`, {
generators: generators.join(', '),
threads,
});
// Create worker pool
pool = createWorkerPool(threads);
// Schedule all generators
for (const name of generators) {
await scheduleGenerator(name, configuration);
}
// Start all collections in parallel (don't await sequentially)
const resultPromises = generators.map(async name => {
let result = await cachedGenerators[name];
if (isAsyncGenerator(result)) {
result = await streamingCache.getOrCollect(name, result);
}
return result;
});
const results = await Promise.all(resultPromises);
await pool.destroy();
return results;
};
return { runGenerators };
};
export default createGenerator;