
Flow Like Water
A TypeScript library for managing complex task workflows with grace and flexibility
While working on provisioning a Kubernetes cluster using Talos, I encountered a common challenge in infrastructure automation: managing a complex sequence of commands that needed to run both synchronously and asynchronously.
Flow Like Water emerged from this need - it's a TypeScript library that transforms complex command sequences into manageable state machines. It allows you to define each command as a discrete state and specify how these states transition between each other, complete with conditional logic and retry mechanisms.
Key Capabilities
- Orchestrate complex sequences of tasks with dependencies
- Handle both synchronous and asynchronous operations seamlessly
- Automatically retry failed operations with configurable backoff
- Group related tasks together for collective execution
- Track progress through event listeners
- Persist state for long-running operations
Visual Demo
Real World Example
import { Task, TaskGroup, FlowControl } from "flow-like-water";
// Example task execution logic
async function sampleTaskExecution() {
console.log("Executing sample task...");
// Add your task logic here
}
// Example condition check
async function sampleConditionCheck() {
console.log("Checking condition for task execution...");
return true; // Return true if condition is met
}
// Create tasks
const task1 = new Task({
id: "task1",
execute: sampleTaskExecution,
checkCondition: sampleConditionCheck,
retries: 2,
waitTime: 1500,
});
const task2 = new Task({
id: "task2",
execute: sampleTaskExecution,
checkCondition: sampleConditionCheck,
retries: 1,
waitTime: 1000,
});
// Create and configure task group
const group = new TaskGroup("group1");
group.addChild(task1);
group.addChild(task2);
// Initialize flow control
const flowControl = new FlowControl();
flowControl.addGroup(group);
// Add event listeners
flowControl.on("taskStarted", (task) => {
console.log(`Task ${task.id} has started.`);
});
flowControl.on("taskCompleted", (task) => {
console.log(`Task ${task.id} has completed.`);
});
// Execute all tasks
flowControl.run()
.then(() => {
console.log("All tasks completed successfully.");
})
.catch((error) => {
console.error("Error during execution:", error);
});
Design Philosophy
Flow Like Water was born from a real need to manage complex infrastructure provisioning tasks. The core design philosophy revolves around treating tasks as state machines and organizing them hierarchically. Let's explore how this works in practice.
Tasks as State Machines
At the heart of Flow Like Water is the concept of tasks as state machines. Each task moves through distinct states: not_started → in_progress → completed (or failed/skipped). Here's how you might define a task that creates a Kubernetes namespace:
const createNamespace = new Task({
id: 'create-namespace',
execute: async () => {
const result = await kubectl.createNamespace('my-app');
return result.success ? 'deploy-app' : undefined;
},
checkCondition: async () => {
const namespaces = await kubectl.listNamespaces();
return !namespaces.includes('my-app');
},
retries: 3,
waitTime: 2000
});
This task includes built-in retry logic, condition checking, and can even specify the next task to run ('deploy-app') based on its success.
Hierarchical Task Organization
Real-world workflows often involve groups of related tasks. Flow Like Water uses TaskGroups to organize tasks hierarchically. Here's an example of setting up a complete application deployment:
const deploymentGroup = new TaskGroup('app-deployment');
// Add tasks in the order they should execute
deploymentGroup.addChild(createNamespace);
deploymentGroup.addChild(deployConfigMaps);
deploymentGroup.addChild(deploySecrets);
deploymentGroup.addChild(deployApplication);
deploymentGroup.addChild(validateDeployment);
// Create a flow controller and add the group
const flow = new FlowControl();
flow.addGroup(deploymentGroup);
Event-Driven Progress Tracking
One of the most powerful features is the event-driven architecture that allows real-time monitoring of task execution. Here's how you might use it to create a progress indicator:
flow.on('taskStarted', (task) => {
console.log(`Starting task: ${task.id}`);
updateProgressBar(task.id, 'in-progress');
});
flow.on('taskComplete', (task) => {
console.log(`Completed task: ${task.id} in ${task.time}ms`);
updateProgressBar(task.id, 'complete');
});
// Start the workflow
await flow.run();
State Persistence and Recovery
Long-running workflows need to be resilient to interruptions. Flow Like Water supports serializing the entire state of your workflow:
// Save the current state
const state = flow.getSerializedState();
await fs.writeFile('workflow-state.json', JSON.stringify(state));
// Later, you can use this state to determine which tasks need to be re-run
const savedState = JSON.parse(await fs.readFile('workflow-state.json'));
const incompleteTask = Object.entries(savedState)
.find(([_, data]) => data.state !== 'completed');
if (incompleteTask) {
await flow.runTask(incompleteTask[0]);
}
Error Handling and Retries
Infrastructure operations often fail for transient reasons. Flow Like Water includes sophisticated retry logic with configurable backoff:
const deployPod = new Task({
id: 'deploy-pod',
execute: async () => {
await kubectl.createPod(podSpec);
},
// Retry up to 5 times with exponential backoff
retries: 5,
waitTime: (attempt) => Math.min(1000 * Math.pow(2, attempt), 30000),
// Ensure pod is actually running before considering task complete
checkCondition: async () => {
const status = await kubectl.getPodStatus('my-pod');
return status === 'Running';
}
});
This design allows for robust error handling while keeping the task definitions clean and declarative. The retry logic is handled automatically by the task runner, letting you focus on the actual business logic.