Skip to content

Latest commit

 

History

History
529 lines (415 loc) · 18.6 KB

File metadata and controls

529 lines (415 loc) · 18.6 KB

Workflow Builder Comparison: Python SDK vs JavaScript SDK

1. Executive Summary

The Python SDK provides a rich, object-oriented workflow DSL with operator overloading (>>), fluent chainable methods, dynamic output references, inline sub-workflows, and direct execution. The JavaScript SDK provides stateless factory functions that return plain typed objects -- functional and type-safe, but lacking the compositional power and developer experience of the Python approach.

This document compares both approaches in detail and identifies opportunities for a JavaScript workflow builder that could be even more expressive than Python by leveraging TypeScript's type system, tagged template literals, and Proxy-based APIs.


2. Architecture Comparison

Python: Object-Oriented DSL with Operator Overloading

from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.http_task import HttpTask
from conductor.client.workflow.task.switch_task import SwitchTask

workflow = ConductorWorkflow(executor=executor, name='order_flow', version=1)
workflow.timeout_seconds(300).owner_email('team@company.com')

http = HttpTask('fetch_order', HttpInput(method=HttpMethod.GET, uri='${workflow.input.url}'))
check = SwitchTask('check_status', '${fetch_order.output.response.statusCode}')
check.switch_case('200', [process_task])
check.default_case([error_task])

workflow >> http >> check

workflow.output_parameter('result', '${process_task.output.data}')
workflow.register(overwrite=True)
result = workflow(order_id='123')  # Direct execution via __call__

Architecture:

  • ConductorWorkflow - Mutable builder holding tasks list, config, and executor reference
  • TaskInterface (base class) - Each task is an object with state, methods, and operator overloads
  • Tasks compose via >> operator or .add() method
  • Output references via .output('field') method or dynamic attribute access
  • Workflow is both a definition AND an executor (has .execute(), .register(), .__call__())

JavaScript: Stateless Factory Functions

import { workflow, simpleTask, httpTask, switchTask } from '@io-orkes/conductor-javascript';

const wf = workflow('order_flow', [
  httpTask('fetch_order', {
    method: 'GET',
    uri: '${workflow.input.url}',
  }),
  switchTask('check_status', '${fetch_order.output.response.statusCode}', {
    '200': [simpleTask('process', 'process_order', {})],
  }, [simpleTask('error', 'handle_error', {})]),
]);

// Registration and execution are separate concerns
await executor.registerWorkflow(true, wf);
const id = await executor.startWorkflow({ name: 'order_flow', input: { order_id: '123' } });

Architecture:

  • workflow() - Pure function returning WorkflowDef (plain object)
  • Task builders (simpleTask(), httpTask(), etc.) - Pure functions returning typed task definition objects
  • No mutable state, no classes, no operator overloading
  • Output references: manual string interpolation '${task_ref.output.field}'
  • Workflow definition is separate from execution (WorkflowExecutor is a separate class)

3. Detailed Feature Comparison

3.1 Workflow Definition

Feature Python JavaScript
Constructor ConductorWorkflow(executor, name, version, description) workflow(name, tasks)
Version Configurable property Hardcoded to 1
Schema version SCHEMA_VERSION = 2 Not set
Input parameters .input_parameters(['key1', 'key2']) Not configurable (empty array)
Input template .input_template({'key': 'default'}) Not supported
Output parameters .output_parameter('key', '${task.output}') Not configurable
Timeout policy .timeout_policy(TimeoutPolicy.TIME_OUT_WORKFLOW) Not configurable
Timeout seconds .timeout_seconds(300) Hardcoded to 0
Owner email .owner_email('team@co.com') Not configurable
Failure workflow .failure_workflow('compensation_wf') Not supported
Restartable .restartable(True) Not configurable
Status listener .enable_status_listener('sink') Not supported
Variables .variables({'counter': 0}) Not supported
Fluent API All methods return Self N/A (pure function)
Direct execution workflow(key=value) via __call__ Not supported
Register workflow.register(overwrite=True) Separate: executor.registerWorkflow()
Inline sub-workflow parent >> child_workflow Not supported

3.2 Task Composition

Pattern Python JavaScript
Sequential workflow >> task1 >> task2 workflow('wf', [task1, task2])
Add single workflow.add(task) Include in tasks array
Add multiple workflow.add([task1, task2]) Include in tasks array
Parallel fork workflow >> [[branch1], [branch2]] forkTask('ref', [task1, task2])
Auto-join Automatic with >> on list-of-lists Manual: forkTaskJoin() returns tuple
Inline sub-workflow workflow >> child_workflow subWorkflowTask('ref', 'name', ver)
Operator chaining wf >> t1 >> t2 >> t3 Not available

3.3 Output References

Feature Python JavaScript
Task output task.output('field')'${ref.output.field}' Manual string: '${ref.output.field}'
Task output (all) task.output()'${ref.output}' Manual string: '${ref.output}'
Dynamic access task.my_field'${ref.output.my_field}' Not available
HTTP body http.body('data')'${ref.output.response.body.data}' Manual string
HTTP status http.status_code()'${ref.output.response.statusCode}' Manual string
HTTP headers http.headers('content-type')'${ref.output.response.headers.content-type}' Manual string
Workflow input workflow.input('key')'${workflow.input.key}' Manual string: '${workflow.input.key}'
Workflow output workflow.output('key')'${workflow.output.key}' Manual string

3.4 Task Input Configuration

Feature Python JavaScript
Via constructor SimpleTask('name', 'ref') + .input_parameter('k', 'v') simpleTask('ref', 'name', { k: 'v' })
Fluent input task.input_parameter('k', 'v').input_parameter('k2', 'v2') Not available (all at construction)
Input from other task task.input_parameter('data', other_task.output('result')) simpleTask('ref', 'name', { data: '${other.output.result}' })
Input method task.input(key='k', value='v') or task.input('json_path') Not available

3.5 Conditional (Switch) Tasks

Python:

switch = SwitchTask('evaluate', '${task.output.status}', use_javascript=False)
switch.switch_case('SUCCESS', [task1, task2])
switch.switch_case('FAILURE', [error_task])
switch.default_case([fallback])
workflow >> switch
  • Mutable: add cases after construction
  • Chainable: switch.switch_case('A', [...]).switch_case('B', [...])
  • Supports both value-param and javascript evaluator types

JavaScript:

switchTask('evaluate', '${task.output.status}', {
  'SUCCESS': [simpleTask(...)],
  'FAILURE': [simpleTask(...)],
}, [simpleTask(...)])  // default case
  • Immutable: all cases at construction time
  • Evaluator type hardcoded to "value-param"
  • Expression hardcoded to "switchCaseValue"

3.6 Fork/Join Tasks

Python:

# Via operator (auto-join)
workflow >> [[task1, task2], [task3, task4]]

# Explicit
fork = ForkTask('parallel', [[task1, task2], [task3, task4]], join_on=['task2', 'task4'])
workflow >> fork

# Custom join logic
join = JoinTask('custom', join_on_script='$.fork.output.condition')
  • Auto-join when using >> with list-of-lists
  • Custom join_on list for selective joining
  • join_on_script for JavaScript-based join conditions

JavaScript:

// Basic fork (no auto-join)
forkTask('parallel', [task1, task2])

// Fork with auto-join (returns tuple)
const [fork, join] = forkTaskJoin('parallel', [task1, task2])

// Explicit join
joinTask('custom_join', ['task1_ref', 'task2_ref'])
  • forkTask wraps tasks in [forkTasks] (single branch)
  • forkTaskJoin returns [ForkJoinTaskDef, JoinTaskDef] tuple
  • No join_on_script support

3.7 Loop Tasks

Python:

# Conditional loop
loop = DoWhileTask('retry', '$.retry.iteration < 5', [task1, task2])

# Fixed iterations
repeat = LoopTask('repeat_5', 5, [task])

# For-each
foreach = ForEachTask('process', [task], '${workflow.input.items}')

JavaScript:

// Conditional loop
doWhileTask('retry', 'if ($.retry.iteration < 5) { true; } else { false; }', [task1, task2])

// Fixed iterations
newLoopTask('repeat_5', 5, [task])

// No ForEachTask
Feature Python JavaScript
Do-while DoWhileTask doWhileTask()
Fixed loop LoopTask newLoopTask()
For-each ForEachTask Missing
Loop condition syntax $.ref.iteration < N if ($.ref['iteration'] < $.value) { true; } else { false; }

3.8 Task Caching

Python:

task.cache(cache_key='${workflow.input.user_id}', cache_ttl_second=3600)

JavaScript: Not supported in builders.

3.9 LLM / AI Tasks

Python:

chat = LlmChatComplete('ask', 'openai', 'gpt-4',
    messages=[ChatMessage(Role.USER, 'Hello')],
    tools=[ToolSpec('search', input_schema={...})])
chat.prompt_variable('context', '${task.output.data}')

embed = LlmGenerateEmbeddings('embed', 'openai', 'ada-002', text='...')
search = LlmSearchIndex('search', 'pinecone', 'ns', 'idx', 'openai', 'ada-002', 'query')
img = GenerateImage('gen', 'openai', 'dall-e-3', 'prompt', width=1024)

Supported LLM task types: LlmChatComplete, LlmTextComplete, LlmGenerateEmbeddings, LlmSearchIndex, LlmIndexDocument, GenerateImage, GenerateAudio

JavaScript: No LLM task builders. Would need manual task definition construction.

3.10 Workflow Execution

Python:

# Direct call
result = workflow(order_id='123')

# Synchronous execution (waits for completion)
result = workflow.execute(
    workflow_input={'order_id': '123'},
    wait_until_task_ref='final_task',
    wait_for_seconds=30
)

# Async start (returns ID immediately)
workflow.register(overwrite=True)
exec_id = workflow.start_workflow_with_input(
    workflow_input={'order_id': '123'},
    correlation_id='user-123',
    idempotency_key='unique-key',
    priority=5
)

JavaScript:

// Separate concerns
const wf = workflow('order_flow', [...tasks]);
await executor.registerWorkflow(true, wf);
const id = await executor.startWorkflow({ name: 'order_flow', input: { order_id: '123' } });

// Synchronous execution
const run = await executor.executeWorkflow(request, 'order_flow', 1, 'req-id', 'final_task');

4. Feature Gap Summary

Features Python Has That JavaScript Lacks

# Feature Category Impact
1 >> operator for task chaining Composition High (DX)
2 task.output('field') reference helpers Composition High (DX)
3 Dynamic attribute access for outputs (task.my_field) Composition Medium (DX)
4 Fluent/chainable configuration methods Configuration High (DX)
5 Configurable workflow properties (timeout, email, failure_wf, etc.) Configuration High
6 Inline sub-workflow embedding Composition Medium
7 __call__ for direct execution Execution Medium (DX)
8 Workflow as both definition AND executor Architecture Medium
9 ForEachTask Task type Medium
10 Task caching (.cache()) Feature Low
11 LLM task types (7 types) Task type Medium
12 SwitchTask with JavaScript evaluator Feature Low
13 JoinTask with join_on_script Feature Low
14 Idempotency key/strategy on execution Feature Medium
15 Correlation ID support on execution Feature Medium
16 HumanTask builder Task type Low
17 Input template / workflow variables Configuration Medium

5. JavaScript Innovation Opportunities

JavaScript/TypeScript offers unique language features that could make the workflow builder more powerful than Python's:

5.1 TypeScript Type-Safe Output References via Proxy

Instead of string interpolation, use Proxy objects for type-safe, autocomplete-friendly references:

const http = httpTask('fetch_order', { method: 'GET', uri: '...' });

// Instead of: '${fetch_order.output.response.body.order_id}'
// Could be:
http.output.response.body.order_id  // Returns typed reference string
http.output.response.statusCode     // Returns typed reference string

Implementation concept:

function createRef(basePath: string): any {
  return new Proxy({}, {
    get(_, prop: string) {
      if (prop === 'toString' || prop === Symbol.toPrimitive)
        return () => `\${${basePath}}`;
      return createRef(`${basePath}.${prop}`);
    }
  });
}

// Usage:
const task = simpleTask('my_ref', 'my_task', {});
task.ref.output.someField  // → '${my_ref.output.someField}'

5.2 Tagged Template Literals for Expressions

// Instead of: '${workflow.input.url}/api/${fetch_order.output.response.body.id}'
// Could be:
const url = ref`${workflow.input.url}/api/${fetchOrder.output.id}`;

// Or for switch conditions:
const condition = ref`${httpTask.output.response.statusCode}`;

5.3 Builder Pattern with Method Chaining

const wf = new WorkflowBuilder('order_flow')
  .version(2)
  .timeout(300)
  .ownerEmail('team@company.com')
  .failureWorkflow('compensation')
  .add(httpTask('fetch', { method: 'GET', uri: '...' }))
  .add(switchTask('check', '${fetch.output.status}', {
    '200': [processTask],
  }, [errorTask]))
  .outputParam('result', '${process.output.data}')
  .build();

// Register and execute
await wf.register(executor, { overwrite: true });
const result = await wf.execute(executor, { order_id: '123' });

5.4 Functional Pipe/Compose Operators

// Using pipe operator (TC39 proposal, or polyfill)
const wf = pipe(
  workflow('order_flow'),
  addTask(httpTask('fetch', { ... })),
  addTask(switchTask('check', '${fetch.output.status}', { ... })),
  withTimeout(300),
  withOwner('team@company.com'),
);

5.5 Array Spread for Fork/Join

// Pythonic fork via array syntax
const wf = workflow('parallel_flow', [
  httpTask('fetch1', { ... }),
  // Fork: array of arrays = parallel branches
  fork('parallel', [
    [processA, transformA],  // Branch 1
    [processB, transformB],  // Branch 2
  ]),
  // Auto-join implied
  simpleTask('merge', 'merge_results', { ... }),
]);

5.6 TypeScript Generics for Type-Safe Workflows

// Define input/output types
interface OrderInput { orderId: string; amount: number }
interface OrderOutput { status: string; receipt: string }

const wf = workflow<OrderInput, OrderOutput>('order_flow', [
  simpleTask('validate', 'validate_order', {
    orderId: ref.input.orderId,  // TypeScript autocomplete!
    amount: ref.input.amount,    // Type checked!
  }),
]);

// Type-safe execution
const result = await wf.execute(executor, {
  orderId: '123',   // ✅ TypeScript validates
  amount: 99.99,    // ✅ TypeScript validates
  invalid: true,    // ❌ TypeScript error!
});

5.7 Decorator Integration with Worker Tasks

// Workers define their interface
@worker({ taskDefName: 'process_order' })
async function processOrder(task: Task): Promise<TaskResult> { ... }

// Workflow builder can reference workers directly
const wf = workflow('order_flow', [
  processOrder.asTask('process_ref', {
    orderId: '${workflow.input.orderId}'
  }),
]);
// Task name, type info, and input schema all derived from the worker

This bridges the gap between worker definition and workflow definition, similar to Python's dual-mode @worker_task decorator.

5.8 Async/Await-Based Workflow Definition

// Express workflow logic as async function (compiled to Conductor definition)
const orderFlow = defineWorkflow('order_flow', async (input: OrderInput) => {
  const order = await httpTask.call({ method: 'GET', uri: `/orders/${input.orderId}` });

  if (order.status === 'valid') {
    const payment = await simpleTask.call('charge', { amount: input.amount });
    return { receipt: payment.receiptId };
  } else {
    throw new TerminateError('Invalid order');
  }
});

This would compile the async function into a Conductor workflow definition with appropriate switch/fork/terminate tasks, giving developers a natural imperative programming model while generating the declarative workflow behind the scenes.


6. Recommended JavaScript Workflow Builder Design

Based on the analysis, here's a recommended phased approach:

Phase 1: Enhanced Builders (Low effort, High impact)

Enhance existing factory functions to support all Python workflow configuration options:

const wf = workflow('order_flow', [task1, task2], {
  version: 2,
  timeoutSeconds: 300,
  timeoutPolicy: 'TIME_OUT_WF',
  ownerEmail: 'team@company.com',
  failureWorkflow: 'compensation_wf',
  outputParameters: { result: '${task2.output.data}' },
  inputParameters: ['orderId', 'amount'],
  variables: { counter: 0 },
  restartable: true,
});

Add missing task types: ForEachTask, HumanTask, LLM tasks.

Phase 2: Workflow Builder Class (Medium effort, High impact)

A WorkflowBuilder class with fluent chaining:

const wf = new WorkflowBuilder('order_flow')
  .version(2)
  .timeout(300)
  .add(httpTask('fetch', { ... }))
  .add(switchTask('check', '${fetch.output.status}', { ... }))
  .output('result', '${process.output.data}')
  .build();

Includes .register() and .execute() convenience methods.

Phase 3: Type-Safe References (Medium effort, Very High impact)

Proxy-based output references that eliminate string interpolation errors:

const fetch = httpTask('fetch_order', { ... });
const ref = taskRef(fetch);  // Returns Proxy

simpleTask('process', 'process_order', {
  statusCode: ref.output.response.statusCode,   // Type-safe!
  body: ref.output.response.body,               // Autocomplete!
});

Phase 4: Worker-Workflow Bridge (High effort, Very High impact)

Enable @worker decorated functions to generate workflow tasks:

@worker({ taskDefName: 'process_order' })
function processOrder(task: Task): Promise<TaskResult> { ... }

// Use in workflow
processOrder.task('ref', { orderId: '${workflow.input.orderId}' });

This is the JavaScript equivalent of Python's dual-mode @worker_task decorator.