The Python SDK provides a rich, object-oriented workflow DSL with operator overloading (>>), fluent chainable methods, dynamic output references, inline sub-workflows, and direct execution. The JavaScript SDK provides stateless factory functions that return plain typed objects -- functional and type-safe, but lacking the compositional power and developer experience of the Python approach.
This document compares both approaches in detail and identifies opportunities for a JavaScript workflow builder that could be even more expressive than Python by leveraging TypeScript's type system, tagged template literals, and Proxy-based APIs.
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.switch_task import SwitchTask
workflow = ConductorWorkflow(executor=executor, name='order_flow', version=1)
workflow.timeout_seconds(300).owner_email('team@company.com')
http = HttpTask('fetch_order', HttpInput(method=HttpMethod.GET, uri='${workflow.input.url}'))
check = SwitchTask('check_status', '${fetch_order.output.response.statusCode}')
check.switch_case('200', [process_task])
check.default_case([error_task])
workflow >> http >> check
workflow.output_parameter('result', '${process_task.output.data}')
workflow.register(overwrite=True)
result = workflow(order_id='123') # Direct execution via __call__Architecture:
ConductorWorkflow- Mutable builder holding tasks list, config, and executor referenceTaskInterface(base class) - Each task is an object with state, methods, and operator overloads- Tasks compose via
>>operator or.add()method - Output references via
.output('field')method or dynamic attribute access - Workflow is both a definition AND an executor (has
.execute(),.register(),.__call__())
import { workflow, simpleTask, httpTask, switchTask } from '@io-orkes/conductor-javascript';
const wf = workflow('order_flow', [
httpTask('fetch_order', {
method: 'GET',
uri: '${workflow.input.url}',
}),
switchTask('check_status', '${fetch_order.output.response.statusCode}', {
'200': [simpleTask('process', 'process_order', {})],
}, [simpleTask('error', 'handle_error', {})]),
]);
// Registration and execution are separate concerns
await executor.registerWorkflow(true, wf);
const id = await executor.startWorkflow({ name: 'order_flow', input: { order_id: '123' } });Architecture:
workflow()- Pure function returningWorkflowDef(plain object)- Task builders (
simpleTask(),httpTask(), etc.) - Pure functions returning typed task definition objects - No mutable state, no classes, no operator overloading
- Output references: manual string interpolation
'${task_ref.output.field}' - Workflow definition is separate from execution (WorkflowExecutor is a separate class)
| Feature | Python | JavaScript |
|---|---|---|
| Constructor | ConductorWorkflow(executor, name, version, description) |
workflow(name, tasks) |
| Version | Configurable property | Hardcoded to 1 |
| Schema version | SCHEMA_VERSION = 2 |
Not set |
| Input parameters | .input_parameters(['key1', 'key2']) |
Not configurable (empty array) |
| Input template | .input_template({'key': 'default'}) |
Not supported |
| Output parameters | .output_parameter('key', '${task.output}') |
Not configurable |
| Timeout policy | .timeout_policy(TimeoutPolicy.TIME_OUT_WORKFLOW) |
Not configurable |
| Timeout seconds | .timeout_seconds(300) |
Hardcoded to 0 |
| Owner email | .owner_email('team@co.com') |
Not configurable |
| Failure workflow | .failure_workflow('compensation_wf') |
Not supported |
| Restartable | .restartable(True) |
Not configurable |
| Status listener | .enable_status_listener('sink') |
Not supported |
| Variables | .variables({'counter': 0}) |
Not supported |
| Fluent API | All methods return Self |
N/A (pure function) |
| Direct execution | workflow(key=value) via __call__ |
Not supported |
| Register | workflow.register(overwrite=True) |
Separate: executor.registerWorkflow() |
| Inline sub-workflow | parent >> child_workflow |
Not supported |
| Pattern | Python | JavaScript |
|---|---|---|
| Sequential | workflow >> task1 >> task2 |
workflow('wf', [task1, task2]) |
| Add single | workflow.add(task) |
Include in tasks array |
| Add multiple | workflow.add([task1, task2]) |
Include in tasks array |
| Parallel fork | workflow >> [[branch1], [branch2]] |
forkTask('ref', [task1, task2]) |
| Auto-join | Automatic with >> on list-of-lists |
Manual: forkTaskJoin() returns tuple |
| Inline sub-workflow | workflow >> child_workflow |
subWorkflowTask('ref', 'name', ver) |
| Operator chaining | wf >> t1 >> t2 >> t3 |
Not available |
| Feature | Python | JavaScript |
|---|---|---|
| Task output | task.output('field') → '${ref.output.field}' |
Manual string: '${ref.output.field}' |
| Task output (all) | task.output() → '${ref.output}' |
Manual string: '${ref.output}' |
| Dynamic access | task.my_field → '${ref.output.my_field}' |
Not available |
| HTTP body | http.body('data') → '${ref.output.response.body.data}' |
Manual string |
| HTTP status | http.status_code() → '${ref.output.response.statusCode}' |
Manual string |
| HTTP headers | http.headers('content-type') → '${ref.output.response.headers.content-type}' |
Manual string |
| Workflow input | workflow.input('key') → '${workflow.input.key}' |
Manual string: '${workflow.input.key}' |
| Workflow output | workflow.output('key') → '${workflow.output.key}' |
Manual string |
| Feature | Python | JavaScript |
|---|---|---|
| Via constructor | SimpleTask('name', 'ref') + .input_parameter('k', 'v') |
simpleTask('ref', 'name', { k: 'v' }) |
| Fluent input | task.input_parameter('k', 'v').input_parameter('k2', 'v2') |
Not available (all at construction) |
| Input from other task | task.input_parameter('data', other_task.output('result')) |
simpleTask('ref', 'name', { data: '${other.output.result}' }) |
| Input method | task.input(key='k', value='v') or task.input('json_path') |
Not available |
Python:
switch = SwitchTask('evaluate', '${task.output.status}', use_javascript=False)
switch.switch_case('SUCCESS', [task1, task2])
switch.switch_case('FAILURE', [error_task])
switch.default_case([fallback])
workflow >> switch- Mutable: add cases after construction
- Chainable:
switch.switch_case('A', [...]).switch_case('B', [...]) - Supports both
value-paramandjavascriptevaluator types
JavaScript:
switchTask('evaluate', '${task.output.status}', {
'SUCCESS': [simpleTask(...)],
'FAILURE': [simpleTask(...)],
}, [simpleTask(...)]) // default case- Immutable: all cases at construction time
- Evaluator type hardcoded to
"value-param" - Expression hardcoded to
"switchCaseValue"
Python:
# Via operator (auto-join)
workflow >> [[task1, task2], [task3, task4]]
# Explicit
fork = ForkTask('parallel', [[task1, task2], [task3, task4]], join_on=['task2', 'task4'])
workflow >> fork
# Custom join logic
join = JoinTask('custom', join_on_script='$.fork.output.condition')- Auto-join when using
>>with list-of-lists - Custom
join_onlist for selective joining join_on_scriptfor JavaScript-based join conditions
JavaScript:
// Basic fork (no auto-join)
forkTask('parallel', [task1, task2])
// Fork with auto-join (returns tuple)
const [fork, join] = forkTaskJoin('parallel', [task1, task2])
// Explicit join
joinTask('custom_join', ['task1_ref', 'task2_ref'])forkTaskwraps tasks in[forkTasks](single branch)forkTaskJoinreturns[ForkJoinTaskDef, JoinTaskDef]tuple- No
join_on_scriptsupport
Python:
# Conditional loop
loop = DoWhileTask('retry', '$.retry.iteration < 5', [task1, task2])
# Fixed iterations
repeat = LoopTask('repeat_5', 5, [task])
# For-each
foreach = ForEachTask('process', [task], '${workflow.input.items}')JavaScript:
// Conditional loop
doWhileTask('retry', 'if ($.retry.iteration < 5) { true; } else { false; }', [task1, task2])
// Fixed iterations
newLoopTask('repeat_5', 5, [task])
// No ForEachTask| Feature | Python | JavaScript |
|---|---|---|
| Do-while | DoWhileTask |
doWhileTask() |
| Fixed loop | LoopTask |
newLoopTask() |
| For-each | ForEachTask |
Missing |
| Loop condition syntax | $.ref.iteration < N |
if ($.ref['iteration'] < $.value) { true; } else { false; } |
Python:
task.cache(cache_key='${workflow.input.user_id}', cache_ttl_second=3600)JavaScript: Not supported in builders.
Python:
chat = LlmChatComplete('ask', 'openai', 'gpt-4',
messages=[ChatMessage(Role.USER, 'Hello')],
tools=[ToolSpec('search', input_schema={...})])
chat.prompt_variable('context', '${task.output.data}')
embed = LlmGenerateEmbeddings('embed', 'openai', 'ada-002', text='...')
search = LlmSearchIndex('search', 'pinecone', 'ns', 'idx', 'openai', 'ada-002', 'query')
img = GenerateImage('gen', 'openai', 'dall-e-3', 'prompt', width=1024)Supported LLM task types: LlmChatComplete, LlmTextComplete, LlmGenerateEmbeddings, LlmSearchIndex, LlmIndexDocument, GenerateImage, GenerateAudio
JavaScript: No LLM task builders. Would need manual task definition construction.
Python:
# Direct call
result = workflow(order_id='123')
# Synchronous execution (waits for completion)
result = workflow.execute(
workflow_input={'order_id': '123'},
wait_until_task_ref='final_task',
wait_for_seconds=30
)
# Async start (returns ID immediately)
workflow.register(overwrite=True)
exec_id = workflow.start_workflow_with_input(
workflow_input={'order_id': '123'},
correlation_id='user-123',
idempotency_key='unique-key',
priority=5
)JavaScript:
// Separate concerns
const wf = workflow('order_flow', [...tasks]);
await executor.registerWorkflow(true, wf);
const id = await executor.startWorkflow({ name: 'order_flow', input: { order_id: '123' } });
// Synchronous execution
const run = await executor.executeWorkflow(request, 'order_flow', 1, 'req-id', 'final_task');| # | Feature | Category | Impact |
|---|---|---|---|
| 1 | >> operator for task chaining |
Composition | High (DX) |
| 2 | task.output('field') reference helpers |
Composition | High (DX) |
| 3 | Dynamic attribute access for outputs (task.my_field) |
Composition | Medium (DX) |
| 4 | Fluent/chainable configuration methods | Configuration | High (DX) |
| 5 | Configurable workflow properties (timeout, email, failure_wf, etc.) | Configuration | High |
| 6 | Inline sub-workflow embedding | Composition | Medium |
| 7 | __call__ for direct execution |
Execution | Medium (DX) |
| 8 | Workflow as both definition AND executor | Architecture | Medium |
| 9 | ForEachTask |
Task type | Medium |
| 10 | Task caching (.cache()) |
Feature | Low |
| 11 | LLM task types (7 types) | Task type | Medium |
| 12 | SwitchTask with JavaScript evaluator |
Feature | Low |
| 13 | JoinTask with join_on_script |
Feature | Low |
| 14 | Idempotency key/strategy on execution | Feature | Medium |
| 15 | Correlation ID support on execution | Feature | Medium |
| 16 | HumanTask builder |
Task type | Low |
| 17 | Input template / workflow variables | Configuration | Medium |
JavaScript/TypeScript offers unique language features that could make the workflow builder more powerful than Python's:
Instead of string interpolation, use Proxy objects for type-safe, autocomplete-friendly references:
const http = httpTask('fetch_order', { method: 'GET', uri: '...' });
// Instead of: '${fetch_order.output.response.body.order_id}'
// Could be:
http.output.response.body.order_id // Returns typed reference string
http.output.response.statusCode // Returns typed reference stringImplementation concept:
function createRef(basePath: string): any {
return new Proxy({}, {
get(_, prop: string) {
if (prop === 'toString' || prop === Symbol.toPrimitive)
return () => `\${${basePath}}`;
return createRef(`${basePath}.${prop}`);
}
});
}
// Usage:
const task = simpleTask('my_ref', 'my_task', {});
task.ref.output.someField // → '${my_ref.output.someField}'// Instead of: '${workflow.input.url}/api/${fetch_order.output.response.body.id}'
// Could be:
const url = ref`${workflow.input.url}/api/${fetchOrder.output.id}`;
// Or for switch conditions:
const condition = ref`${httpTask.output.response.statusCode}`;const wf = new WorkflowBuilder('order_flow')
.version(2)
.timeout(300)
.ownerEmail('team@company.com')
.failureWorkflow('compensation')
.add(httpTask('fetch', { method: 'GET', uri: '...' }))
.add(switchTask('check', '${fetch.output.status}', {
'200': [processTask],
}, [errorTask]))
.outputParam('result', '${process.output.data}')
.build();
// Register and execute
await wf.register(executor, { overwrite: true });
const result = await wf.execute(executor, { order_id: '123' });// Using pipe operator (TC39 proposal, or polyfill)
const wf = pipe(
workflow('order_flow'),
addTask(httpTask('fetch', { ... })),
addTask(switchTask('check', '${fetch.output.status}', { ... })),
withTimeout(300),
withOwner('team@company.com'),
);// Pythonic fork via array syntax
const wf = workflow('parallel_flow', [
httpTask('fetch1', { ... }),
// Fork: array of arrays = parallel branches
fork('parallel', [
[processA, transformA], // Branch 1
[processB, transformB], // Branch 2
]),
// Auto-join implied
simpleTask('merge', 'merge_results', { ... }),
]);// Define input/output types
interface OrderInput { orderId: string; amount: number }
interface OrderOutput { status: string; receipt: string }
const wf = workflow<OrderInput, OrderOutput>('order_flow', [
simpleTask('validate', 'validate_order', {
orderId: ref.input.orderId, // TypeScript autocomplete!
amount: ref.input.amount, // Type checked!
}),
]);
// Type-safe execution
const result = await wf.execute(executor, {
orderId: '123', // ✅ TypeScript validates
amount: 99.99, // ✅ TypeScript validates
invalid: true, // ❌ TypeScript error!
});// Workers define their interface
@worker({ taskDefName: 'process_order' })
async function processOrder(task: Task): Promise<TaskResult> { ... }
// Workflow builder can reference workers directly
const wf = workflow('order_flow', [
processOrder.asTask('process_ref', {
orderId: '${workflow.input.orderId}'
}),
]);
// Task name, type info, and input schema all derived from the workerThis bridges the gap between worker definition and workflow definition, similar to Python's dual-mode @worker_task decorator.
// Express workflow logic as async function (compiled to Conductor definition)
const orderFlow = defineWorkflow('order_flow', async (input: OrderInput) => {
const order = await httpTask.call({ method: 'GET', uri: `/orders/${input.orderId}` });
if (order.status === 'valid') {
const payment = await simpleTask.call('charge', { amount: input.amount });
return { receipt: payment.receiptId };
} else {
throw new TerminateError('Invalid order');
}
});This would compile the async function into a Conductor workflow definition with appropriate switch/fork/terminate tasks, giving developers a natural imperative programming model while generating the declarative workflow behind the scenes.
Based on the analysis, here's a recommended phased approach:
Enhance existing factory functions to support all Python workflow configuration options:
const wf = workflow('order_flow', [task1, task2], {
version: 2,
timeoutSeconds: 300,
timeoutPolicy: 'TIME_OUT_WF',
ownerEmail: 'team@company.com',
failureWorkflow: 'compensation_wf',
outputParameters: { result: '${task2.output.data}' },
inputParameters: ['orderId', 'amount'],
variables: { counter: 0 },
restartable: true,
});Add missing task types: ForEachTask, HumanTask, LLM tasks.
A WorkflowBuilder class with fluent chaining:
const wf = new WorkflowBuilder('order_flow')
.version(2)
.timeout(300)
.add(httpTask('fetch', { ... }))
.add(switchTask('check', '${fetch.output.status}', { ... }))
.output('result', '${process.output.data}')
.build();Includes .register() and .execute() convenience methods.
Proxy-based output references that eliminate string interpolation errors:
const fetch = httpTask('fetch_order', { ... });
const ref = taskRef(fetch); // Returns Proxy
simpleTask('process', 'process_order', {
statusCode: ref.output.response.statusCode, // Type-safe!
body: ref.output.response.body, // Autocomplete!
});Enable @worker decorated functions to generate workflow tasks:
@worker({ taskDefName: 'process_order' })
function processOrder(task: Task): Promise<TaskResult> { ... }
// Use in workflow
processOrder.task('ref', { orderId: '${workflow.input.orderId}' });This is the JavaScript equivalent of Python's dual-mode @worker_task decorator.