--- url: 'https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core.md' --- # @refluxo/core ## Classes * [WorkflowEngine](classes/WorkflowEngine.md) ## Interfaces * [Context](interfaces/Context.md) * [Edge](interfaces/Edge.md) * [MiddlewareContext](interfaces/MiddlewareContext.md) * [Node](interfaces/Node.md) * [NodeDefinition](interfaces/NodeDefinition.md) * [NodeResult](interfaces/NodeResult.md) * [RetryPolicy](interfaces/RetryPolicy.md) * [Snapshot](interfaces/Snapshot.md) * [WorkflowDefinition](interfaces/WorkflowDefinition.md) ## Type Aliases * [Middleware](type-aliases/Middleware.md) * [NextFunction](type-aliases/NextFunction.md) * [NodesDefinition](type-aliases/NodesDefinition.md) --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/jexl-middleware.md --- # @refluxo/jexl-middleware --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/classes/WorkflowEngine.md description: 'It is stateless and operates step-by-step, producing immutable snapshots.' --- # Class: WorkflowEngine\ Defined in: index.ts:154 ## Type Parameters ### T `T` *extends* [`NodesDefinition`](../type-aliases/NodesDefinition.md) = [`NodesDefinition`](../type-aliases/NodesDefinition.md) The type of NodesDefinition used. ## Constructors ### Constructor > **new WorkflowEngine**<`T`>(`options`): `WorkflowEngine`<`T`> Defined in: index.ts:171 #### Parameters ##### options Configuration options. ###### middlewares? [`Middleware`](../type-aliases/Middleware.md)\[] = `[]` Array of middlewares to use. ###### nodeDefinitions `T` The definitions for the nodes used in the workflow. ###### workflow [`WorkflowDefinition`](../interfaces/WorkflowDefinition.md)<`T`> The workflow definition. #### Returns `WorkflowEngine`<`T`> ## Properties ### nodeDefinitions > **nodeDefinitions**: `T` Defined in: index.ts:156 *** ### workflow > **workflow**: `Workflow` Defined in: index.ts:155 ## Methods ### execute() #### Call Signature > **execute**(`args`): `Promise`<[`Snapshot`](../interfaces/Snapshot.md)> Defined in: index.ts:241 ##### Parameters ###### args Execution arguments. ###### externalPayload? `unknown` External data to pass to the execution. ###### globals? `unknown` Global variables to pass to transformers and executors. ###### snapshot [`Snapshot`](../interfaces/Snapshot.md) The snapshot to resume from. ###### stepLimit? `number` Maximum number of steps to execute (default: 100). ##### Returns `Promise`<[`Snapshot`](../interfaces/Snapshot.md)> The resulting snapshot after execution. #### Call Signature > **execute**(`args`): `Promise`<[`Snapshot`](../interfaces/Snapshot.md)> Defined in: index.ts:247 ##### Parameters ###### args Execution arguments. ###### externalPayload? `unknown` External data to pass to the execution. ###### globals? `unknown` Global variables to pass to transformers and executors. ###### initialNodeId `string` The ID of the node to start from (if starting new). ###### stepLimit? `number` Maximum number of steps to execute (default: 100). ###### workflowId? `string` The ID of the workflow (if starting new). ##### Returns `Promise`<[`Snapshot`](../interfaces/Snapshot.md)> The resulting snapshot after execution. *** ### executeStep() > **executeStep**(`snapshot`, `externalPayload?`, `globals?`): `Promise`<[`Snapshot`](../interfaces/Snapshot.md)> Defined in: index.ts:348 #### Parameters ##### snapshot [`Snapshot`](../interfaces/Snapshot.md) The current snapshot. ##### externalPayload? `unknown` External payload for the step. ##### globals? `unknown` Global variables. #### Returns `Promise`<[`Snapshot`](../interfaces/Snapshot.md)> The new snapshot after the step execution. *** ### use() > **use**(`middleware`): `this` Defined in: index.ts:194 #### Parameters ##### middleware [`Middleware`](../type-aliases/Middleware.md) The middleware function to add. #### Returns `this` The engine instance for chaining. *** ### validateWorkflow() > **validateWorkflow**(): `Promise`<`void`> Defined in: index.ts:205 #### Returns `Promise`<`void`> #### Throws Error if a node type is missing from the definitions. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/conditionals.md' description: >- Workflows often require conditional logic to execute different branches based on certain criteria. The Refluxo engine handles this not by having a complex "if-node", but by using output handles on nodes and edges. --- # Conditionals and Branching Workflows often require conditional logic to execute different branches based on certain criteria. The Refluxo engine handles this not by having a complex "if-node", but by using output handles on nodes and edges. This approach keeps the engine simple and makes the workflow's branching logic visible and explicit in its definition. ## How it Works: `sourceHandle` 1. **The Node Decides**: A node's `executor` function can return a `nextHandle` property in its result. This string acts as a label for the "exit port" that the execution should take. 2. **The Edge Listens**: An `Edge` definition can include a `sourceHandle` property. 3. **The Engine Connects**: After a node executes, the engine looks for an edge whose `source` is the current node and whose `sourceHandle` matches the `nextHandle` returned by the executor. If no `nextHandle` is returned, the engine will look for an edge that also has no `sourceHandle` defined (or has it set to `undefined`), which acts as the default path. ## Example: A Simple "If" Condition Here's a visual representation of a conditional workflow: ```mermaid graph TD A[Start] --> B{Check Condition} B -- True --> C[Path for True] B -- False --> D[Path for False] C --> E(End) D --> E ``` Let's create a workflow that checks if a number is positive or negative. ### 1. The Conditional Node Definition We'll create a `check-number` node. Its executor will check the input and return a different `nextHandle` based on the result. ```typescript const nodeDefinitions: NodesDefinition = { "check-number": { input: { type: "object", properties: { value: { type: "number" } } }, output: { type: "object" }, // This node only directs flow, doesn't output much executor: async (data) => { const { value } = data as { value: number }; if (value > 0) { return { data: { check: "positive" }, nextHandle: "positive" }; } else { return { data: { check: "non-positive" }, nextHandle: "negative" }; } }, }, "log-positive": { /* ... executor that logs a positive message ... */ }, "log-negative": { /* ... executor that logs a negative message ... */ }, }; ``` ### 2. The Workflow Definition In the workflow, we define two edges originating from our `check-number` node. Each edge listens for a specific `sourceHandle`. ```typescript const workflow: WorkflowDefinition = { nodes: [ { id: "start", type: "check-number", data: { value: -10 } }, { id: "positive_branch", type: "log-positive", data: {} }, { id: "negative_branch", type: "log-negative", data: {} }, ], edges: [ // Edge for the "positive" branch { id: "e1", source: "start", target: "positive_branch", sourceHandle: "positive", // Listens for the "positive" handle }, // Edge for the "negative" branch { id: "e2", source: "start", target: "negative_branch", sourceHandle: "negative", // Listens for the "negative" handle }, ], }; ``` When this workflow runs: 1. The `start` node (`check-number`) executes with `value: -10`. 2. The executor logic finds that the value is not positive and returns `{ data: { ... }, nextHandle: "negative" }`. 3. The engine searches for an edge from `source: "start"` with `sourceHandle: "negative"`. 4. It finds edge `e2` and sets the next node to `negative_branch`. This pattern can be used to build everything from simple if/else branches to complex multi-path switches. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/cookbook/api-aggregation.md' description: >- A common use case for a workflow engine is to orchestrate calls to multiple microservices or third-party APIs and then aggregate the results into a single, combined output. --- # Cookbook: API Data Aggregation A common use case for a workflow engine is to orchestrate calls to multiple microservices or third-party APIs and then aggregate the results into a single, combined output. This recipe demonstrates a workflow that fetches user data from one API and their recent orders from another, then combines them into a single object. ### The Scenario 1. A workflow is triggered with a `userId`. 2. **In parallel**, two nodes are executed: * One fetches user profile details from `/api/users/{userId}`. * One fetches a list of orders from `/api/orders/{userId}`. 3. A final node waits for both API calls to complete, then uses the expression engine to combine their outputs into a single `customerSummary` object. *Note: The Refluxo engine executes nodes sequentially. True parallelism would require running multiple engine instances or is a feature that could be built on top of the core engine. For this recipe, we will simulate parallelism by running the nodes one after another, as the order does not matter.* ### Visualizing the Workflow ```mermaid graph TD A[Start] --> B(Fetch User Profile) A --> C(Fetch User Orders) B --> D{Combine Data} C --> D D --> E((End)) ``` *(While the diagram shows parallel paths, remember the engine executes them sequentially. The "Combine Data" node will simply be the last one to run.)* ### 1. Node Definitions We need a reusable node for making HTTP requests and a final node for the aggregation. ```typescript import { object, string, array } from "valibot"; const nodeDefinitions = { "fetch-api": { input: object({ url: string([url()]) }), executor: async (data) => { // In a real-world scenario, handle errors properly const response = await fetch(data.url); return { data: await response.json() }; }, }, "combine-data": { // This node's executor doesn't do much. // The real work is done by the expression engine on its data property. input: object({ profile: object({}), orders: array(object({})) }), executor: async (data) => { // The resolved data is simply returned return { data }; }, }, }; ``` ### 2. Workflow Definition This is where the magic happens. The `combine-data` node uses expressions to pull data from the outputs of the two `fetch-api` nodes. ```typescript const workflow: WorkflowDefinition = { nodes: [ { id: "trigger", type: "webhook-trigger", // Receives { userId: "user-123" } data: {}, }, { id: "fetchProfile", type: "fetch-api", data: { // URL is constructed dynamically from the trigger payload url: "https://my-api.com/api/users/{{ trigger.last.data.userId }}", }, }, { id: "fetchOrders", type: "fetch-api", data: { url: "https://my-api.com/api/orders/{{ trigger.last.data.userId }}", }, }, { id: "combine", type: "combine-data", data: { // This object is constructed by the expression engine before // the 'combine-data' executor is even called. profile: "{{ fetchProfile.last.data }}", orders: "{{ fetchOrders.last.data.orders }}", }, }, ], edges: [ { source: "trigger", target: "fetchProfile" }, { source: "fetchProfile", target: "fetchOrders" }, { source: "fetchOrders", target: "combine" }, ], }; ``` ### How it Works ::: v-pre When the engine prepares to execute the `combine` node: 1. It looks at its `data` definition. 2. It finds the expression `{{ fetchProfile.last.data }}`. It looks into the `Context`, finds the last result of the `fetchProfile` node, and gets its output. 3. It does the same for `{{ fetchOrders.last.data.orders }}`, getting the output from the `fetchOrders` node. 4. It constructs a new object: `{ profile: { ... }, orders: [ ... ] }`. 5. This fully resolved object is then passed as the `data` argument to the `combine-data` executor. ::: This pattern is incredibly powerful. It allows you to create generic, reusable nodes (`fetch-api`, `combine-data`) and then perform complex, specific business logic declaratively within the workflow definition itself. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/cookbook/content-approval.md' description: >- This recipe demonstrates how to build a practical, multi-step content approval workflow. A new blog post is submitted, a manager is notified to approve it, and depending on the decision, the post is either published or sent back to the author. --- # Cookbook: Content Approval Workflow This recipe demonstrates how to build a practical, multi-step content approval workflow. A new blog post is submitted, a manager is notified to approve it, and depending on the decision, the post is either published or sent back to the author. This example combines several core concepts: * A webhook trigger * Custom nodes for business logic * Pausing for human-in-the-loop * Conditional branching ### The Scenario 1. A user submits a new blog post via a frontend application. 2. The frontend sends a webhook to our application to start the workflow. 3. The workflow saves the draft post to a database, marking its status as `pending_approval`. 4. It then pauses, waiting for a manager's decision. 5. A manager, via a separate UI, approves or rejects the post. This action resumes the workflow, providing the decision as a payload. 6. If approved, a node updates the post's status to `published`. 7. If rejected, a different node sends an email/notification back to the author. ### Visualizing the Workflow ```mermaid graph TD A[Webhook: New Post] --> B(Save Draft to DB) B --> C{Wait for Approval} C -- Approved --> D[Update Status to 'Published'] C -- Rejected --> E[Notify Author] D --> F((End)) E --> F((End)) ``` ### 1. Node Definitions We'll need several custom nodes for this. ```typescript import { object, string, boolean } from "valibot"; // Assume `db` and `email` are your own imported service clients. const nodeDefinitions = { "webhook-trigger": { // A generic trigger node executor: async (_, __, payload) => ({ data: payload || {} }), }, "save-draft": { input: object({ authorId: string(), title: string(), content: string() }), executor: async (data) => { const post = await db.posts.create({ ...data, status: "pending_approval" }); return { data: { postId: post.id } }; }, }, "wait-for-approval": { // This node pauses if no decision is provided in the payload executor: async (_, __, payload) => { if (!payload || typeof (payload as any).approved !== 'boolean') { return { data: {}, __pause: true }; } const { approved } = payload as { approved: boolean }; return { data: { decision: approved ? "approved" : "rejected" }, nextHandle: approved ? "approved" : "rejected", }; }, }, "publish-post": { input: object({ postId: string() }), executor: async (data) => { await db.posts.update({ where: { id: data.postId }, data: { status: "published" } }); return { data: { published: true } }; }, }, "notify-author": { input: object({ authorId: string() }), executor: async (data) => { await email.send({ to: data.authorId, message: "Your post was rejected." }); return { data: { notified: true } }; }, }, }; ``` ### 2. Definição do Workflow The workflow wires these nodes together, using expressions to pass data between them. ```typescript const workflow: WorkflowDefinition = { nodes: [ { id: "trigger", type: "webhook-trigger", data: {}, }, { id: "save", type: "save-draft", // Data comes from the initial webhook payload data: { authorId: "{{ trigger.last.data.authorId }}", title: "{{ trigger.last.data.title }}", content: "{{ trigger.last.data.content }}", }, }, { id: "approval", type: "wait-for-approval", data: {}, }, { id: "publish", type: "publish-post", // The postId comes from the 'save' node's output data: { postId: "{{ save.last.data.postId }}" }, }, { id: "notify", type: "notify-author", // The authorId also comes from the initial payload data: { authorId: "{{ trigger.last.data.authorId }}" }, }, ], edges: [ { source: "trigger", target: "save" }, { source: "save", target: "approval" }, { source: "approval", target: "publish", sourceHandle: "approved" }, { source: "approval", target: "notify", sourceHandle: "rejected" }, ], }; ``` This recipe shows how you can combine simple, single-purpose nodes into a sophisticated, resilient, and long-running business process. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/custom-nodes.md' description: >- The power of the Refluxo engine comes from its extensibility. You can define your own custom nodes to perform any action you need, from calling an API to processing data or interacting with a database. This is done by creating a `NodeDefinition`. --- # Custom Nodes The power of the Refluxo engine comes from its extensibility. You can define your own custom nodes to perform any action you need, from calling an API to processing data or interacting with a database. This is done by creating a `NodeDefinition`. ## The `NodeDefinition` Object A `NodeDefinition` is a JavaScript object that defines the behavior and contract of a node type. Let's look at the structure of a `NodeDefinition` for a node that fetches data from an API, using **Valibot** for schema definition. ```typescript import { NodeDefinition } from "refluxo-engine"; import { object, string, number, union, literal, optional, url, parse } from "valibot"; const fetchApiNode: NodeDefinition = { // 1. Input Schema (optional) input: object({ url: string([url("Please provide a valid URL")]), method: optional(union([literal("GET"), literal("POST")]), "GET"), }), // 2. Output Schema (optional) output: object({ status: number(), body: object({}), // You can define a more specific schema for the body }), // 3. Retry Policy (optional) retryPolicy: { maxAttempts: 3, interval: 1000, // 1 second backoff: "exponential", }, // 4. The Executor executor: async (data, context, externalPayload) => { // The `data` type is inferred from the `input` schema const { url, method } = data; try { const response = await fetch(url, { method }); const body = await response.json(); if (!response.ok) { throw new Error(`API request failed with status ${response.status}`); } // The `data` returned here will be the node's output return { data: { status: response.status, body, }, }; } catch (error: any) { // It's good practice to re-throw the error to let the engine's // retry mechanism handle it. throw new Error(error.message); } }, }; ``` ### 1. Input Schema The `input` property defines a schema using a **Standard Schema** compatible library like Valibot. The engine uses this schema to validate the resolved `data` of the node *before* executing it. This provides a safety layer and ensures your node receives data in the correct format. ### 2. Output Schema Similarly, the `output` property defines a schema for the data that the `executor` function is expected to return. The engine validates the return value of the executor against this schema, ensuring that the node produces a consistent and predictable output for other nodes to consume. ### 3. Retry Policy This optional property defines how the engine should handle failures in this node's executor. For more details, see the [Error Handling guide](./error-handling.md). ### 4. The Executor This is the core logic of your node. It's an `async` function with three parameters: * `data`: The resolved input data for the node, already validated against the `input` schema. All expressions from the `WorkflowDefinition` have been processed at this point. * `context`: The full execution `Context` object. You should avoid using this directly if possible, relying on the resolved `data` instead. However, it can be useful for advanced scenarios where a node needs to inspect the history of the workflow. * `externalPayload`: If the workflow was started or resumed with an external payload, that data is available here for the first step of the execution. The executor must return an object. This object can contain: * `data`: (Required) The output of the node. This will be stored in the context and made available to subsequent nodes. * `nextHandle`: (Optional) A string to specify which output handle to follow, enabling [conditional logic](./conditionals.md). * `__pause`: (Optional) A boolean flag. If `true`, the engine will pause the workflow. See the [Human in the Loop guide](./external-events.md) for more. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/deployment/security.md' description: >- When deploying an application built with the Refluxo engine, it's important to consider the security implications of executing workflows, especially if those workflows can be defined by end-users. --- # Deployment: Security Best Practices When deploying an application built with the Refluxo engine, it's important to consider the security implications of executing workflows, especially if those workflows can be defined by end-users. This guide provides a checklist of security best practices. ## 1. Never Store Secrets in Workflow Definitions A `WorkflowDefinition` is a JSON object that you will likely store in a database. It should be considered non-secure. **Never** store sensitive information like API keys, passwords, or tokens directly in a node's `data` property. ```typescript // ❌ Bad Practice: Storing a secret directly const workflow = { nodes: [ { id: "n1", type: "api-call", data: { apiKey: "sk_live_very_secret_key" // 😱 } } ] //... } ``` ### Solution: Use a Secure Secret Management System The recommended approach is to store secrets in a secure, external system and have your custom nodes fetch them at runtime. * **Environment Variables**: For secrets that are static for the entire application, use environment variables (e.g., `process.env.STRIPE_API_KEY`). Your node executor can then access `process.env`. * **Secret Management Services**: For user-specific secrets or more complex scenarios, use a dedicated service like AWS Secrets Manager, Google Secret Manager, or HashiCorp Vault. Your node executor would receive an ID or a name, and then make a call to the secret manager to retrieve the sensitive value. ```typescript // ✅ Good Practice: The executor fetches the secret const secureApiNode: NodeDefinition = { //... executor: async (data) => { // The executor is trusted server-side code const apiKey = process.env.STRIPE_API_KEY; // or: const apiKey = await getSecretFromVault(data.secretId); const response = await fetch("...", { headers: { "Authorization": `Bearer ${apiKey}` } }); //... } }; ``` ## 2. Sanitize and Validate All Inputs The engine provides robust validation via `StandardSchema` for the `input` of each node. Use it diligently. * **Always Define Schemas**: Even if a node takes simple input, define a schema. This prevents unexpected data types from causing runtime errors in your executor. * **Sanitize `externalPayload`**: Data coming from triggers (like webhooks) is untrusted. The very first node in your workflow should have a strict input schema to validate this `externalPayload` and ensure it only contains expected data in the correct format. ## 3. Isolate the Expression Engine Context The Jexl expression engine is secure and does not allow access to the file system or `process`. However, it has access to the data you provide in its context. The engine already prepares a "flattened" context containing only the outputs of previous nodes. This is a good security measure as it prevents expressions from accessing the entire `Snapshot` object or other internal properties of the engine. Be mindful if you ever customize the context provided to the expression engine. Only expose the data that the user absolutely needs to reference. ## 4. Protect the Executor Code The `executor` functions are the heart of your application's logic and have access to your application's environment. In a platform where users can define workflows but not custom nodes, your security boundary is clear: the `executor` code is trusted, while the `WorkflowDefinition` (the JSON) is not. If you ever build a system where users can provide their own `executor` code (e.g., an online code editor), you **must** run that code in a sandboxed environment (e.g., using `vm2`, a Docker container, or a micro-VM service like Firecracker). Executing arbitrary user-provided code in your main process is a major security vulnerability. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/deployment/serverless.md' description: >- The Refluxo engine's stateless, step-by-step execution model is a perfect match for serverless environments like AWS Lambda, Google Cloud Functions, or Cloudflare Workers. These platforms are designed for short-lived, event-driven computations, which is exactly how the engine operates. --- # Deployment: Serverless Patterns The Refluxo engine's stateless, step-by-step execution model is a perfect match for serverless environments like AWS Lambda, Google Cloud Functions, or Cloudflare Workers. These platforms are designed for short-lived, event-driven computations, which is exactly how the engine operates. This guide outlines a common and effective pattern for deploying a Refluxo-based application in a serverless architecture. ## The Core Pattern: The "Dispatcher" Function Instead of having one long-running process, you have a single, stateless serverless function (let's call it the "Dispatcher") that is responsible for executing **one step** of a workflow. The architecture relies on a persistent database (for state) and a message queue (for scheduling). Here's the flow: 1. **Trigger**: An initial event (e.g., an HTTP request to an API Gateway, a new file in an S3 bucket) invokes the Dispatcher function with an initial payload. 2. **Create or Load State**: The Dispatcher creates a new `Snapshot` or loads an existing one from the database (e.g., DynamoDB). 3. **Execute One Step**: It calls `engine.executeStep()` with the snapshot. This is a crucial difference from `engine.execute()`, as we only want to process a single node to keep the function's execution time short. 4. **Process the Result**: The Dispatcher inspects the new snapshot returned by `executeStep()`. 5. **Save State**: It saves the new snapshot to the database. 6. **Schedule Next Step**: * If the new snapshot's status is `active`, the Dispatcher sends a message **to itself** via a message queue (e.g., SQS) to trigger the next step immediately. The message payload is simply the `workflowId`. * If the status is `error` (pending a retry), it sends a message to itself with a **delay**, calculated from the `retryState.nextRetryAt` timestamp. * If the status is `paused`, `completed`, or `failed`, it does nothing. The workflow execution stops until another external event (like a webhook) triggers it again. ### Visual Representation ```mermaid graph TD subgraph "External World" A[API Call / Event] end subgraph "Serverless Environment" B(Dispatcher Function) C[Database] D{Message Queue} end A -- triggers --> B B -- reads/writes --> C[Snapshot in DB] B -- inspects status --> B B -- sends message --> D D -- triggers --> B ``` ## Example: AWS Lambda + SQS + DynamoDB Here is a simplified example of what a Dispatcher function might look like on AWS. ```typescript import { SQSClient, SendMessageCommand } from "@aws-sdk/client-sqs"; import { DynamoDBClient } from "@aws-sdk/client-dynamodb"; // Assume you have service files for interacting with DynamoDB import { getSnapshot, saveSnapshot } from "./database"; import { engine } from "./engine"; // Your configured engine instance const sqsClient = new SQSClient({}); const queueUrl = process.env.QUEUE_URL; export async function handler(event) { // The event could come from API Gateway, SQS, etc. const { workflowId, externalPayload, isNew } = parseEvent(event); let snapshot; if (isNew) { snapshot = engine.createInitialSnapshot(workflowId, "start-node"); } else { snapshot = await getSnapshot(workflowId); // If the workflow is already finished, do nothing. if (["completed", "failed"].includes(snapshot.status)) return; } // Set status to active if it was paused or in a retry state if (["paused", "error"].includes(snapshot.status)) { snapshot.status = "active"; } // Execute just one step const newSnapshot = await engine.executeStep(snapshot, externalPayload); // Save the new state await saveSnapshot(newSnapshot); // Schedule the next action if (newSnapshot.status === "active") { // Trigger next step immediately await sqsClient.send(new SendMessageCommand({ QueueUrl: queueUrl, MessageBody: JSON.stringify({ workflowId: newSnapshot.workflowId }) })); } else if (newSnapshot.status === "error" && newSnapshot.retryState) { // Trigger next step with a delay for the retry const delaySeconds = Math.ceil( (newSnapshot.retryState.nextRetryAt - Date.now()) / 1000 ); await sqsClient.send(new SendMessageCommand({ QueueUrl: queueUrl, MessageBody: JSON.stringify({ workflowId: newSnapshot.workflowId }), DelaySeconds: Math.max(0, Math.min(delaySeconds, 900)), // SQS max delay is 15 mins })); } // If paused, completed, or failed, the process stops here. } ``` This pattern is highly scalable, resilient, and cost-effective, as you only pay for the brief moments when your workflow is actively processing a step. --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/deployment/state-management.md description: >- The stateless nature of the Refluxo engine means that it does not manage state persistence itself. Your application is responsible for saving and loading the `Snapshot` object. This design choice gives you complete freedom to choose the persistence strategy that best fits your infrastructure. --- # Deployment: State Management The stateless nature of the Refluxo engine means that it does not manage state persistence itself. Your application is responsible for saving and loading the `Snapshot` object. This design choice gives you complete freedom to choose the persistence strategy that best fits your infrastructure. This guide covers best practices for managing workflow state in a production environment. ## Where to Store Snapshots? You can store snapshots in any database or storage system that can handle JSON objects. Common choices include: * **Relational Databases (e.g., PostgreSQL, MySQL):** A good choice for many applications. You can store the snapshot in a `JSONB` or `JSON` column. This allows you to query metadata or context properties if needed. * **NoSQL Databases (e.g., MongoDB, DynamoDB):** Excellent for storing document-like objects such as snapshots. They often provide high performance for reads and writes. * **In-Memory Caches (e.g., Redis):** Suitable for workflows that are performance-critical but where long-term persistence is less of a concern. You can combine Redis for active workflows with a persistent database for completed or long-paused ones. ### Example Schema (PostgreSQL) A simple table for storing workflow executions might look like this: ```sql CREATE TABLE workflow_executions ( id VARCHAR(255) PRIMARY KEY, -- Corresponds to workflowId snapshot JSONB NOT NULL, status VARCHAR(50) NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -- Index for efficient lookup of active workflows CREATE INDEX idx_active_workflows ON workflow_executions (status); ``` Here, `id` would be your unique `workflowId`, and the entire `Snapshot` object is stored in the `snapshot` column. The `status` column is duplicated for easier querying. ## Handling Concurrency: Optimistic Locking In a distributed system, it's possible for two separate workers (e.g., two serverless function instances) to try to process the same snapshot at the same time. This can lead to race conditions and corrupted state. The `Snapshot` object includes a `version` number specifically to prevent this. You can use this for **optimistic locking**. The flow is as follows: 1. **Read and Lock**: When a worker fetches a snapshot from the database, it reads both the snapshot data and its `version` number. 2. **Execute Step**: The worker executes the next step of the workflow. The Refluxo engine produces a new snapshot with an incremented `version` number. 3. **Conditional Write**: The worker attempts to save the new snapshot back to the database with a conditional `UPDATE` statement. The update only succeeds if the `version` number in the database is the same as it was when the worker first read it. ### Example (SQL-like pseudocode) ```sql -- `readVersion` is the version number the worker initially read. -- `newSnapshot` is the snapshot object with the incremented version. -- `workflowId` is the ID of the execution. UPDATE workflow_executions SET snapshot = :newSnapshot, status = :newSnapshotStatus, version = :newSnapshot.version, -- This is the incremented version updated_at = NOW() WHERE id = :workflowId AND version = :readVersion; ``` If this `UPDATE` statement affects 0 rows, it means another worker has already processed this step and updated the record. The current worker should then discard its result and terminate gracefully, preventing a double-execution. This mechanism is crucial for building reliable distributed systems. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/api/packages.md' --- # Documentation ## Packages * [@refluxo/core](@refluxo/core/index.md) * [@refluxo/jexl-middleware](@refluxo/jexl-middleware/index.md) --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/dynamic-schemas.md' description: >- When building a platform on top of the Refluxo engine, you might want to store your `NodeDefinition` schemas (`input` and `output`) in an external source, like a database. Typically, these would be stored in a standard format like **JSON Schema**. --- # Dynamic Schemas from External Sources When building a platform on top of the Refluxo engine, you might want to store your `NodeDefinition` schemas (`input` and `output`) in an external source, like a database. Typically, these would be stored in a standard format like **JSON Schema**. However, the engine's constructor expects a `StandardSchema` compatible object, not a raw JSON object. This is to ensure type inference and interoperability between different validation libraries. This presents a challenge: how do you convert a JSON Schema object fetched from a database at runtime into a `StandardSchema` object that the engine can use? ## The Adapter Solution The solution is to create a small runtime adapter. This function takes a JSON Schema object and wraps it in a `StandardSchema` compatible interface. The example below uses `ajv` to compile the JSON Schema and produce the validation logic. First, ensure you have `ajv` installed: ```bash pnpm add ajv ``` ### Creating the Adapter This function takes a JSON Schema and returns an object that fulfills the `StandardSchema` contract. The `validate` method uses the compiled `ajv` validator to check the data. ```typescript // /utils/create-json-validator.ts import Ajv from "ajv"; const ajv = new Ajv(); export function createJsonValidator(jsonSchema: any) { const validate = ajv.compile(jsonSchema); // Returns a Standard Schema V1 compatible object return { "~standard": { version: 1, vendor: "refluxo-ajv-adapter", validate: (value: any) => { const valid = validate(value); if (valid) { return { value }; } return { issues: validate.errors?.map(err => ({ message: err.message || "Invalid input", path: [err.instancePath], })) }; } } }; } ``` ## Using the Adapter Now, when you are constructing your `NodeDefinition`s map (perhaps after fetching the definitions from your database), you can use this adapter to prepare the schemas. ```typescript import { createJsonValidator } from "./utils/create-json-validator"; import { httpNodeExecutor } from "./executors/http"; // 1. Fetch raw node definitions from your database // const rawDefinitions = await db.getNodeDefinitions(); const rawDefinitions = [ { type: "http-request", input: { type: "object", properties: { url: { type: "string" } }, required: ["url"] }, output: { type: "object" } } ]; // 2. Process the raw definitions into a format the engine understands const nodeDefinitions = Object.fromEntries( rawDefinitions.map(def => [ def.type, { // Use the adapter to convert the JSON schemas input: createJsonValidator(def.input), output: createJsonValidator(def.output), // The executor can be stored separately or loaded by name executor: httpNodeExecutor, } ]) ); // 3. The resulting nodeDefinitions object can now be passed to the engine // const engine = new WorkflowEngine({ workflow, nodeDefinitions }); ``` This adapter pattern provides a powerful bridge between externally stored, static JSON Schema definitions and the dynamic, code-first approach of `StandardSchema`, giving you the flexibility to build dynamic, user-configurable platforms. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/error-handling.md' description: >- A robust workflow must be able to handle transient failures, such as network issues or temporary API unavailability. The Refluxo engine provides a powerful, declarative `RetryPolicy` that you can attach to any `NodeDefinition` to control its behavior on failure. --- # Error Handling and Retries A robust workflow must be able to handle transient failures, such as network issues or temporary API unavailability. The Refluxo engine provides a powerful, declarative `RetryPolicy` that you can attach to any `NodeDefinition` to control its behavior on failure. ## The Retry Policy The `RetryPolicy` is an object that specifies how many times to retry a failing node and how long to wait between attempts. ```typescript export interface RetryPolicy { maxAttempts: string | number; interval: string | number; backoff: "fixed" | "exponential"; } ``` * `maxAttempts`: The maximum number of times to attempt execution (including the first attempt). * `interval`: The base wait time in milliseconds. * `backoff`: The strategy for increasing the wait time on subsequent retries. * `"fixed"`: The wait time is always equal to `interval`. * `"exponential"`: The wait time doubles with each attempt (`interval * 2 ^ (attempt - 1)`). ## Example: A Node with a Retry Policy Let's add a `RetryPolicy` to a node that might fail. ```typescript const fallibleNode: NodeDefinition = { input: { type: "object" }, output: { type: "object" }, retryPolicy: { maxAttempts: 3, interval: 1000, // 1 second backoff: "exponential", }, executor: async (data) => { // Simulate a failing API call if (Math.random() > 0.3) { // 70% chance of failure throw new Error("API is currently unavailable"); } return { data: { success: true } }; }, }; ``` ## The Error Handling Cycle When the `executor` of `fallibleNode` throws an error: 1. The `WorkflowEngine` catches the error. 2. It checks if the `NodeDefinition` has a `retryPolicy`. 3. It determines if the number of `maxAttempts` has been exceeded. 4. **If a retry is possible**: * The engine calculates the `delay` based on the `backoff` strategy. * It returns a `Snapshot` with `status: "error"`. * The `Snapshot` includes a `retryState` object with `nodeId`, `attempts`, and `nextRetryAt` (a timestamp indicating when the next attempt should run). 5. **If no retries are left** (or there's no policy): * The engine returns a `Snapshot` with `status: "failed"`. * The error is recorded in the `Context` for that node. ### The Role of the Runner The Refluxo engine **does not wait**. It is stateless. It simply returns a snapshot indicating that a retry is needed and when it should happen. It is the responsibility of the **execution environment** (the "runner") to respect the `nextRetryAt` timestamp. * In a long-running process, you might use `setTimeout`. * In a serverless environment like AWS Lambda, you might use a message queue (like SQS) with a `DelaySeconds` property. * In a runtime like [Trigger.dev](https://trigger.dev/), you can use `await io.sleep()` which handles this scheduling for you. When the delay has passed, the runner simply calls `engine.execute({ snapshot })` with the snapshot that has the `"error"` status. The engine will then automatically re-activate it and retry the failing node. ## Advanced Error Handling ### Conditional Retries Not all errors are created equal. A `401 Unauthorized` error from an API is a permanent failure that should not be retried, while a `503 Service Unavailable` error is transient and a perfect candidate for a retry. The engine's `RetryPolicy` is only triggered when an `executor` **throws an error**. You can leverage this to create sophisticated error handling logic. Instead of throwing an error, you can catch it and redirect the workflow down a different path using a `nextHandle`. **The Pattern:** * For **retriable errors** (e.g., 5xx status codes, network timeouts), **throw an error** from your executor to trigger the `RetryPolicy`. * For **non-retriable, logical errors** (e.g., 4xx status codes), **catch the error** and return a specific `nextHandle` to branch the workflow. #### Example: Handling HTTP Status Codes Here is an HTTP request node that retries on server errors but follows a separate path for authentication errors. ```typescript const httpRequestNode: NodeDefinition = { // ... input, output schemas ... retryPolicy: { maxAttempts: 3, backoff: "exponential", interval: 2000, }, executor: async (data) => { try { const response = await fetch(data.url); if (response.status === 401 || response.status === 403) { // Don't throw, this is a logical path return { data: { error: "Authentication failed" }, nextHandle: "auth-error", }; } if (!response.ok) { // For other errors (like 5xx), throw to trigger a retry throw new Error(`Request failed with status ${response.status}`); } return { data: await response.json() }; } catch (error) { // Re-throw network errors or 5xx errors to trigger the retry policy throw error; } }, }; ``` Your workflow would then have a separate edge for the `auth-error` handle: ```typescript const workflow: WorkflowDefinition = { nodes: [ { id: "request", type: "http-request-node", data: { ... } }, { id: "handle_success", type: "process-data", data: { ... } }, { id: "handle_auth_error", type: "notify-admin", data: { ... } }, ], edges: [ // Success path (default handle) { source: "request", target: "handle_success" }, // Auth error path { source: "request", target: "handle_auth_error", sourceHandle: "auth-error" }, ], }; ``` ### User-Configurable Retries You can make this behavior even more flexible by allowing the user of your platform to define what constitutes a retriable error. This can be done by passing the configuration as part of the node's `data`. ```typescript // Input schema now includes a field for retriable status codes const inputSchema = object({ url: string([url()]), retriableCodes: optional(array(number()), [500, 502, 503, 504]), }); // Executor logic const executor = async (data) => { const { url, retriableCodes } = data; const response = await fetch(url); if (!response.ok) { // Check if the user wants to retry on this status code if (retriableCodes.includes(response.status)) { // Throw to trigger retry throw new Error(`Retriable error: ${response.status}`); } else { // Otherwise, it's a logical failure path return { data: { status: response.status, body: await response.text() }, nextHandle: 'fail' }; } } //... } ``` This empowers the end-user of your platform to customize the workflow's resilience without needing to change the node's core code. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/expressions.md' description: >- To make workflows truly dynamic, the engine includes a powerful expression engine powered by **Jexl**. Expressions allow you to reference and manipulate data from previous nodes, enabling you to pass data between nodes, make dynamic configurations, and implement complex logic without writing custom code for every scenario. --- # Expressions To make workflows truly dynamic, the engine includes a powerful expression engine powered by **[Jexl](https://github.com/TomFrost/jexl)**. This is implemented as a default **Transformer** within the engine. Expressions allow you to reference and manipulate data from previous nodes, enabling you to pass data between nodes, make dynamic configurations, and implement complex logic without writing custom code for every scenario. ## Syntax ::: v-pre Expressions are embedded within strings in your node's `data` object using the syntax `{{ ... }}`. ::: ```javascript v-pre { id: "n2", type: "send-message", data: { // Simple property access message: "{{ `n1`.last.data.text }}", // Manipulate data with Jexl operators subject: "{{ `n1`.last.data.title + ' - Priority' }}", // Use conditional logic channel: "{{ `n1`.last.data.isUrgent ? 'sms' : 'email' }}" } } ``` The engine recursively walks through the `data` object of a node before execution and resolves any expressions it finds. ### Resolving Types * If an expression is the **only thing** in a string (e.g., "{{ `n1`.last.data.userObject }}"), the engine will resolve it to its original type (e.g., an `Object`, `Number`, or `Boolean`). * If an expression is **part of a string** (e.g., "User ID: {{ `n1`.last.data.id }}"), the result of the expression will be converted to a string. ## The Expression Context When an expression is evaluated, it has access to a "flattened" context object that makes it easy to reference other nodes' results. For a node with the ID `my_node`, you can use the following properties: * `my_node.last.data`: (Most common) The output data from the **last** execution of `my_node`. * `my_node.last.timestamp`: The timestamp of the last execution. * `my_node.last.error`: The error message if the last execution failed. * `my_node.all`: An array containing the full history of executions for `my_node`. Each element in the array is an object with `data`, `timestamp`, and `error` properties. * `my_node.all[0].data`: Accessing a specific execution's data from the history. ## Jexl Features Jexl supports a rich set of features that you can use inside your expressions: * **Mathematical operators**: `+`, `-`, `*`, `/`, `%` * **Comparison operators**: `==`, `!=`, `>`, `<`, `>=`, `<=` * **Logical operators**: `&&`, `||`, `!` * **Conditional expressions**: `condition ? value_if_true : value_if_false` * **Array and object literals**: `[1, 2, 3]`, `{ a: 1 }` * **Filters/Transforms**: `my_array | join(',')` (Jexl can be extended with custom filters) This powerful combination allows for sophisticated data manipulation directly within your workflow definition, keeping your `executor` functions clean and focused on their core task. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/getting-started.md' description: >- This guide will walk you through setting up and running your first workflow with the Refluxo engine. We'll create a simple workflow that takes a name as input, greets the person, and outputs the greeting. --- # Getting Started This guide will walk you through setting up and running your first workflow with the Refluxo engine. We'll create a simple workflow that takes a name as input, greets the person, and outputs the greeting. ## 1. Instalação First, add the engine to your project: ```bash npm install refluxo-engine # or yarn add refluxo-engine # or pnpm add refluxo-engine ``` ## 2. Defining the Nodes We need two types of nodes: one to start the workflow and process the input, and another to generate the greeting. Let's define their behaviors using Valibot for our schemas. ```typescript import { NodesDefinition } from "refluxo-engine"; import { object, string } from "valibot"; const nodeDefinitions: NodesDefinition = { // A simple node to receive and forward data "process-input": { input: object({ name: string() }), output: object({ name: string() }), executor: async (data) => { // The resolved data from the node's `data` property is passed here. // We'll see how to provide it in the workflow definition. return { data }; }, }, // A node that constructs a greeting message "create-greeting": { input: object({ name: string() }), output: object({ greeting: string() }), executor: async (data) => { // Here, `data.name` will be dynamically supplied from the previous node. const name = data.name; return { data: { greeting: `Hello, ${name}! Welcome to Refluxo.`, }, }; }, }, }; ``` ## 3. Defining the Workflow Now, let's wire up the nodes in a `WorkflowDefinition`. We'll configure the `process-input` node to get its name from an expression, and the `create-greeting` node to get its data from the output of the first node. ```typescript import { WorkflowDefinition } from "refluxo-engine"; const workflow: WorkflowDefinition = { nodes: [ { id: "inputNode", type: "process-input", // We'll get the name from the external payload when we start the execution. data: { name: "{{ `trigger`.last.data.name }}" }, }, { id: "greetingNode", type: "create-greeting", // We use an expression to get the output from the previous node. data: { name: "{{ `inputNode`.last.data.name }}" }, }, ], edges: [ // A simple, unconditional connection between the two nodes. { id: "e1", source: "inputNode", target: "greetingNode" }, ], }; ``` *Nota: We are using a special node id `trigger` in the expression. The engine doesn't have a real node with this ID; we will provide its data via `externalPayload` when we call `execute`.* ## 4. Executing the Engine Finally, let's instantiate the `WorkflowEngine` and run our workflow. ```typescript import { WorkflowEngine } from "refluxo-engine"; async function main() { const engine = new WorkflowEngine({ workflow, nodeDefinitions, }); console.log("Starting workflow..."); const finalSnapshot = await engine.execute({ // We need to tell the engine where to start. initialNodeId: "inputNode", // This payload will be available to the first node. // Our expression `{{ trigger.last.data.name }}` will resolve to "World". externalPayload: { name: "World" }, }); if (finalSnapshot.status === "completed") { console.log("Workflow completed successfully!"); // You can inspect the context to see the final output. const finalOutput = finalSnapshot.context.greetingNode[0].output; console.log("Final Output:", finalOutput); // Expected Output: { greeting: 'Hello, World! Welcome to Refluxo.' } } else { console.error("Workflow failed with status:", finalSnapshot.status); } } main(); ``` And that's it! You have successfully defined and executed a workflow. From here, you can explore more advanced topics like creating [custom nodes](./custom-nodes.md), using [conditionals](./conditionals.md), and [handling errors](./error-handling.md). --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/loops.md' description: >- The Refluxo engine supports loops by simply having an edge point to a node that has already been executed. The engine's design, which stores execution history in the `Context`, is built to handle this scenario gracefully without losing state. --- # Handling Loops The Refluxo engine supports loops by simply having an edge point to a node that has already been executed. The engine's design, which stores execution history in the `Context`, is built to handle this scenario gracefully without losing state. ## The Strategy: Iterator Node The most common and manageable way to implement a loop is by creating an "Iterator" node. This node takes an array as input and processes one item at a time, looping back to itself until all items are processed. An iterator node typically has two output handles: * `"loop"`: Followed for each item in the array. This handle connects to the part of the workflow that processes the item. * `"done"`: Followed when all items have been processed. Here's a visual representation of a loop: ```mermaid graph TD A[Start] --> B(Iterator Node) B -- Loop --> C[Process Item] C --> B B -- Done --> D[End] ``` ## Example: Processing an Array Let's create a workflow that iterates over an array of numbers and logs each one. ### 1. The Iterator Node Definition The iterator's logic relies on the `Context` to know which item to process next. It checks the history of its own executions to determine the current index. ```typescript const nodeDefinitions: NodesDefinition = { "iterator": { input: { type: "object", properties: { items: { type: "array" } }, required: ["items"], }, output: { type: "object" }, executor: async (data, context) => { // Get the number of times this node has already run const executionCount = context["iteratorNode"]?.length || 0; const items = (data as { items: any[] }).items; if (executionCount < items.length) { // There are more items to process return { data: { // Output the current item and its index currentItem: items[executionCount], currentIndex: executionCount, }, nextHandle: "loop", // Follow the "loop" path }; } else { // All items have been processed return { data: { totalItems: items.length, }, nextHandle: "done", // Follow the "done" path }; } }, }, "log-item": { /* ... executor that receives and logs an item ... */ input: { type: "object" }, output: { type: "object" }, executor: async (data) => { console.log("Processing Item:", data); return { data }; }, }, }; ``` *Note: We use the node's own ID (`iteratorNode`) to check its execution history in the context.* ### 2. The Workflow Definition The workflow definition shows the circular data flow. The `log-item` node connects back to the `iterator` node, creating the loop. ```typescript const workflow: WorkflowDefinition = { nodes: [ { id: "iteratorNode", type: "iterator", data: { items: [10, 20, 30] }, // The array to iterate over }, { id: "logNode", type: "log-item", // Get the item to log from the iterator's output data: { item: "{{ `iteratorNode`.last.data.currentItem }}" }, }, { id: "endNode", type: "log-final", data: {} }, // Some node to execute after the loop ], edges: [ // 1. The "loop" path: iterator -> log -> iterator { id: "e1", source: "iteratorNode", target: "logNode", sourceHandle: "loop", }, { id: "e2", source: "logNode", target: "iteratorNode", // Connects back to the iterator }, // 2. The "done" path: iterator -> end { id: "e3", source: "iteratorNode", target: "endNode", sourceHandle: "done", }, ], }; ``` When this workflow runs, the `iteratorNode` will execute 4 times. The first 3 times, it will follow the `loop` path. On the 4th execution, it will find that all items have been processed and will follow the `done` path, breaking the loop. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/handling-secrets.md' --- # Handling Secrets In many workflows, you need to access sensitive information like API keys, database credentials, or other secrets. Refluxo provides a secure way to handle secrets using `globals` and custom transformers, ensuring that secrets are not stored in snapshots. ## The `globals` Object The `WorkflowEngine.execute` method accepts a `globals` object. This object is a secure, read-only container for data that you want to make available to the execution context without persisting it in the snapshot. It's the ideal place to store secrets. ```typescript import { WorkflowEngine } from "refluxo-engine"; const engine = new WorkflowEngine({ workflow, nodeDefinitions }); await engine.execute({ initialNodeId: "start", globals: { secrets: { STRIPE_KEY: process.env.STRIPE_KEY, AWS_KEY: process.env.AWS_KEY } } }); ``` ## Accessing Secrets with Transformers To access these secrets within your workflow, you can use a custom **Transformer** that reads from the `globals` object passed to `transformInput`. ### Example: Secret Resolution Transformer This transformer looks for strings starting with `SECRET:` and resolves them using the `globals` object. ```typescript import { ITransformEngine } from "refluxo-engine"; class SecretResolver implements ITransformEngine { async transformInput(data: unknown, context: unknown, globals: unknown) { if (typeof data === 'string' && data.startsWith('SECRET:')) { const secretName = data.replace('SECRET:', ''); const secrets = (globals as any)?.secrets || {}; return secrets[secretName]; } return data; } } const engine = new WorkflowEngine({ workflow, nodeDefinitions, transformers: [new SecretResolver(), new JexlEngine()] }); ``` Now, you can reference secrets in your node data: ```typescript // Node data { apiKey: "SECRET:STRIPE_KEY" } ``` ### Advanced: Dynamic Secret Resolution For more complex scenarios, you might want to resolve secrets dynamically without exposing them all to the context. For example, you might want to fetch a secret from a vault (like AWS Secrets Manager) only when requested. ```typescript class VaultSecretResolver implemexport interface ITransformEngine { transformInput?( data: unknown, context: unknown, globals?: unknown, metadata?: unknown ): Promise transformOutput?( data: unknown, context: unknown, globals?: unknown, metadata?: unknown ): Promise }ents ITransformEngine { async transformInput(data: unknown, context: unknown, globals: unknown) { if (typeof data === 'string' && data.startsWith('VAULT:')) { const secretId = data.replace('VAULT:', ''); // You can use globals to pass configuration for the vault client const vaultConfig = (globals as any)?.vaultConfig; // Assume fetchSecretFromVault is a function available in your environment return await fetchSecretFromVault(secretId, vaultConfig); } return data; } } ``` In your workflow, you would use the prefix: ```typescript { apiKey: "VAULT:my-production-stripe-key" } ``` This approach is highly secure because the actual secret value is never present in the `WorkflowDefinition` and is only resolved momentarily during execution. --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/Context.md --- # Interface: Context Defined in: index.ts:107 ## Indexable \[`nodeId`: `string`]: [`NodeResult`](NodeResult.md)\[] --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/Edge.md --- # Interface: Edge Defined in: index.ts:4 ## Properties ### id > **id**: `string` Defined in: index.ts:6 #### Description Unique identifier for the edge. *** ### source > **source**: `string` Defined in: index.ts:8 #### Description The ID of the source node where the edge originates. *** ### sourceHandle? > `optional` **sourceHandle**: `string` Defined in: index.ts:12 #### Description Optional handle ID on the source node, used for conditional branching. *** ### target > **target**: `string` Defined in: index.ts:10 #### Description The ID of the target node where the edge ends. --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/MiddlewareContext.md --- # Interface: MiddlewareContext Defined in: index.ts:131 ## Properties ### definition > **definition**: [`NodeDefinition`](NodeDefinition.md) Defined in: index.ts:133 *** ### error? > `optional` **error**: `unknown` Defined in: index.ts:139 *** ### externalPayload? > `optional` **externalPayload**: `unknown` Defined in: index.ts:136 *** ### globals? > `optional` **globals**: `unknown` Defined in: index.ts:135 *** ### input > **input**: `unknown` Defined in: index.ts:137 *** ### node > **node**: [`Node`](Node.md) Defined in: index.ts:132 *** ### output? > `optional` **output**: `unknown` Defined in: index.ts:138 *** ### snapshot > **snapshot**: [`Snapshot`](Snapshot.md) Defined in: index.ts:134 *** ### state > **state**: `Record`<`string`, `unknown`> Defined in: index.ts:140 --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/Node.md --- # Interface: Node\ Defined in: index.ts:69 ## Type Parameters ### TType `TType` = `string` The type identifier of the node. ## Properties ### data > **data**: `unknown` Defined in: index.ts:75 #### Description Static configuration data for the node. Can contain expressions. *** ### id > **id**: `string` Defined in: index.ts:71 #### Description Unique identifier for the node within the workflow. *** ### metadata? > `optional` **metadata**: `unknown` Defined in: index.ts:77 #### Description Metadata for the node, useful for transformers or UI. *** ### type > **type**: `TType` Defined in: index.ts:73 #### Description The type of the node, corresponding to a key in NodesDefinition. --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/NodeDefinition.md --- # Interface: NodeDefinition\ Defined in: index.ts:33 ## Type Parameters ### TInput `TInput` = `unknown` The schema or validator for the input data. ### TOutput `TOutput` = `unknown` The schema or validator for the output data. ## Properties ### executor() > **executor**: (`data`, `context`, `externalPayload?`, `globals?`) => `Promise`<{ `__pause?`: `true`; `data`: `TOutput`; `nextHandle?`: `string`; }> Defined in: index.ts:45 #### Parameters ##### data `TInput` The validated input data. ##### context [`Context`](Context.md) The execution context containing results from previous nodes. ##### externalPayload? `unknown` Optional payload passed to the execution. ##### globals? `unknown` Global variables passed to the execution (e.g., secrets). #### Returns `Promise`<{ `__pause?`: `true`; `data`: `TOutput`; `nextHandle?`: `string`; }> A promise resolving to the node's result, including data and optional control flags. *** ### retryPolicy? > `optional` **retryPolicy**: [`RetryPolicy`](RetryPolicy.md) Defined in: index.ts:35 #### Description Policy for handling errors and retrying execution. --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/NodeResult.md --- # Interface: NodeResult Defined in: index.ts:100 ## Properties ### attempt > **attempt**: `number` Defined in: index.ts:104 *** ### error? > `optional` **error**: `string` Defined in: index.ts:103 *** ### output > **output**: `unknown` Defined in: index.ts:101 *** ### timestamp > **timestamp**: `number` Defined in: index.ts:102 --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/RetryPolicy.md --- # Interface: RetryPolicy Defined in: index.ts:18 ## Properties ### backoff > **backoff**: `string` Defined in: index.ts:24 #### Description Backoff strategy: 'fixed' for constant interval, 'exponential' for increasing interval. *** ### interval > **interval**: `string` | `number` Defined in: index.ts:22 #### Description Interval between retries in milliseconds. Can be a number or an expression string. *** ### maxAttempts > **maxAttempts**: `string` | `number` Defined in: index.ts:20 #### Description Maximum number of retry attempts. Can be a number or an expression string. --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/Snapshot.md --- # Interface: Snapshot Defined in: index.ts:111 ## Properties ### context > **context**: [`Context`](Context.md) Defined in: index.ts:115 *** ### currentNodeId > **currentNodeId**: `string` | `null` Defined in: index.ts:114 *** ### lastStartedAt? > `optional` **lastStartedAt**: `number` Defined in: index.ts:117 *** ### metadata > **metadata**: `object` Defined in: index.ts:119 #### Index Signature \[`key`: `string`]: `unknown` *** ### retryState? > `optional` **retryState**: `object` Defined in: index.ts:122 #### attempts > **attempts**: `number` #### nextRetryAt? > `optional` **nextRetryAt**: `number` #### nodeId > **nodeId**: `string` *** ### status > **status**: `"active"` | `"paused"` | `"error"` | `"completed"` | `"failed"` Defined in: index.ts:113 *** ### totalExecutionTime? > `optional` **totalExecutionTime**: `number` Defined in: index.ts:118 *** ### version > **version**: `number` Defined in: index.ts:116 *** ### workflowId > **workflowId**: `string` Defined in: index.ts:112 --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/interfaces/WorkflowDefinition.md description: Contains the nodes and edges that define the process. --- # Interface: WorkflowDefinition\ Defined in: index.ts:91 ## Type Parameters ### T `T` *extends* [`NodesDefinition`](../type-aliases/NodesDefinition.md) = [`NodesDefinition`](../type-aliases/NodesDefinition.md) The type of NodesDefinition used in this workflow. ## Properties ### edges > **edges**: [`Edge`](Edge.md)\[] Defined in: index.ts:97 #### Description Array of edges connecting the nodes. *** ### nodes > **nodes**: [`Node`](Node.md)\\[] Defined in: index.ts:95 #### Description Array of nodes in the workflow. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/introduction.md' description: >- Welcome to the documentation for Refluxo, a stateless, serverless-first workflow engine for JavaScript and TypeScript. --- # Introduction Welcome to the documentation for Refluxo, a stateless, serverless-first workflow engine for JavaScript and TypeScript. This library was born out of the need for a modern, lightweight, and highly flexible orchestration tool that is not tied to a specific platform or runtime. Our goal is to provide a powerful engine that allows you to define and execute complex workflows while giving you full control over state management and execution environment. ## Core Principles * **Stateless Execution**: The engine itself holds no state. Everything needed to resume a workflow is contained within a serializable `Snapshot` object. * **Step-by-Step Transition**: Workflows are not long-running processes. The engine executes one node at a time, making it a perfect fit for serverless functions with short execution limits. * **Declarative & Extensible**: Workflows are defined as simple JSON objects. The behavior of each node is implemented via pluggable `executor` functions, making it easy to extend the engine's capabilities. * **Resilience First**: With built-in, declarative retry policies and detailed context history, building robust and fault-tolerant workflows is simple and intuitive. ## How to Use These Docs * **Core Concepts**: If you want a deep dive into the architecture, start with the [Core Concepts](./concepts/engine.md) section to understand the fundamentals like the Engine, Snapshot, and Context. * **Guides**: To see practical examples and learn how to implement specific patterns, check out the [Guides](./guides/getting-started.md). * **API Reference**: For a detailed look at the available classes, types, and interfaces, head to the [API Reference](./api/). --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/metadata.md' --- # Node Metadata Every node in a `WorkflowDefinition` has an optional `metadata` property. This is a free-form object (key-value pairs) that allows you to attach auxiliary information to a node. Crucially, **metadata is not processed by the node's executor**. It is intended for the "environment" around the engine, such as the UI, the runner, or custom Transformers. ```typescript interface Node { id: string; type: string; data: unknown; // Processed by the executor metadata?: Record; // Processed by the environment/transformers } ``` ## Use Case 1: UI Configuration When building a visual workflow editor (like a low-code platform), you need to store information about how the node should be displayed. This data has no effect on the execution logic but is vital for the user experience. ```typescript { id: "node-1", type: "email-sender", data: { ... }, metadata: { // Visual coordinates for the canvas position: { x: 100, y: 200 }, // Custom label set by the user label: "Send Welcome Email", // UI-specific settings color: "#ff0000", icon: "mail-outline", isLocked: true } } ``` Your frontend application reads this metadata to render the node correctly on the canvas. ## Use Case 2: Execution Hints You can use metadata to pass "hints" or configuration to the system running the engine (the Runner). * **Timeouts**: Tell the runner to kill the process if this specific node takes longer than 5 seconds. * **Runner Tags**: Specify that a node requires a specific environment (e.g., "requires-gpu", "region-us-east"). ```typescript metadata: { timeoutMs: 5000, runnerTag: "high-memory" } ``` ## Use Case 3: Transformer Configuration As detailed in the [Transformers](./transformers.md) guide, metadata is the perfect place to store configuration for your custom transformers. Since `metadata` is passed as the third argument to `transformInput` and `transformOutput`, you can use it to control how data is processed without polluting the node's actual `data` input. ```typescript // In the workflow definition metadata: { // Instructs a custom transformer to mask this node's output in logs logPrivacy: "sensitive", // Instructs a custom transformer to filter the output before saving to snapshot resultFilter: "response.data.id" } ``` This separation of concerns—`data` for the business logic, `metadata` for the infrastructure logic—keeps your workflows clean and maintainable. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/guides/external-events.md' description: >- The Refluxo engine offers a fundamental capability for managing long-running processes the ability to pause a workflow and wait for external input. This allows workflows to react to the outside world, making them highly flexible for various asynchronous scenarios. --- # Pausing, Resuming, and Triggers The Refluxo engine offers a fundamental capability for managing long-running processes: the ability to pause a workflow and wait for external input. This allows workflows to react to the outside world, making them highly flexible for various asynchronous scenarios. **Common Use Cases:** * **Human-in-the-Loop (e.g., Manual Approval)**: Waiting for a user to approve or deny a request. * **Webhook Callbacks**: Pausing after making a request to an external service (e.g., a payment gateway) and waiting for that service to call your webhook with the result. * **Scheduled Delays**: Waiting for a specific amount of time (e.g., "wait 1 hour"). * **Message Queues**: Waiting for a message to arrive from a queue (e.g., SQS, RabbitMQ, Kafka). This is all achieved by returning a special `__pause` flag from a node's `executor`. ## The Pause & Resume Cycle 1. **The Node Signals a Pause**: A node's executor returns `{ __pause: true }`. 2. **The Engine Pauses**: The engine stops and returns a `Snapshot` with `status: "paused"`. 3. **The Application Waits**: Your application saves this snapshot. The workflow is dormant, consuming no resources. The "waiting" logic lives outside the engine (e.g., in a database, a scheduled task runner, or a message queue's visibility timeout). 4. **The Workflow Resumes**: When the external event occurs, your application calls `engine.execute()` again, providing the `paused` snapshot and the event's data in the `externalPayload` property. 5. **Execution Continues**: The engine re-executes the *same node* that paused, but this time passes the `externalPayload` to it. The node's logic can then process the payload and continue the workflow. ## Implementing Triggers While the Refluxo engine is responsible for *orchestrating* a workflow, it does not handle *triggering* it. Your application is responsible for listening for events (like an HTTP request) and starting a workflow in response. A "trigger" is simply the **first node** in a workflow, and its job is to process the initial `externalPayload`. ### Example: A Webhook Trigger Let's imagine you want to start a workflow whenever your application receives a POST request to `/webhooks/github`. **1. The Application Server (e.g., with Express.js)** This code lives in your application, not inside the engine. It listens for HTTP requests. ```typescript import express from "express"; import { WorkflowEngine } from "refluxo-engine"; import { workflow, nodeDefinitions } from "./workflow"; // Your definitions const app = express(); app.use(express.json()); const engine = new WorkflowEngine({ workflow, nodeDefinitions }); app.post("/webhooks/github", async (req, res) => { console.log("GitHub webhook received. Starting workflow..."); const finalSnapshot = await engine.execute({ initialNodeId: "github-trigger-node", // The ID of our trigger node workflowId: `github-event-${Date.now()}`, // The request body is passed as the external payload externalPayload: req.body, }); // Acknowledge the webhook immediately res.status(202).send("Accepted"); // You can then handle the finalSnapshot asynchronously console.log(`Workflow finished with status: ${finalSnapshot.status}`); }); app.listen(3000, () => console.log("Listening for webhooks...")); ``` **2. The Trigger Node Definition** The `github-trigger` node itself is extremely simple. Its only job is to take the data from the `externalPayload` and pass it on as its own output. ```typescript const nodeDefinitions: NodesDefinition = { "github-trigger": { input: object({}), // No static input needed output: object({}), // The output will be the dynamic webhook body executor: async (data, context, externalPayload) => { // The trigger node's main purpose is to inject the external // payload into the workflow's context. console.log("Processing trigger data..."); return { data: externalPayload || {} }; }, }, // ... other nodes in your workflow that process the GitHub event }; ``` By separating the trigger mechanism (the web server) from the orchestration logic (Refluxo), you gain immense flexibility. You could easily add more triggers (e.g., a cron job, a message queue consumer) that start the same workflow, simply by calling `engine.execute()` with the appropriate payload. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/api.md' --- # Refluxo 🔀⚙️🔁🛠️ [![Commitizen friendly](https://img.shields.io/badge/commitizen-friendly-brightgreen.svg)](http://commitizen.github.io/cz-cli/) ![NPM Version](https://img.shields.io/npm/v/refluxo-engine) ![License](https://img.shields.io/github/license/vadolasi/refluxo-engine) ![Bundle Size](https://img.shields.io/bundlephobia/minzip/refluxo-engine) ![NPM Downloads](https://img.shields.io/npm/dm/refluxo-engine) [![codecov](https://codecov.io/gh/vadolasi/refluxo-engine/graph/badge.svg)](https://codecov.io/gh/vadolasi/refluxo-engine) A stateless, snapshot‑based, and serverless‑ready workflow engine for JavaScript. Refluxo is a lightweight engine designed for modern distributed environments. Unlike traditional engines that "run" a process, Refluxo transitions states. It takes a snapshot, processes a step, and returns a new snapshot. This makes it the perfect backbone for "Human-in-the-loop" systems, long-running processes, and serverless architectures. > "Build your own n8n or Zapier-like automation platform with ease." ## ✨ Features ### 🌐 Runs anywhere Refluxo is isomorphic. It runs wherever JavaScript runs: * **Serverless:** Designed to run on AWS Lambda, Cloudflare Workers, Vercel, etc. No background processes required. * **Runtime:** Node.js, Bun, Deno. * **Browser:** React, Vue, Svelte, or vanilla JS. ### 📸 Snapshot-Based Every execution step is serializable. Pause a flow, save it to a database, and resume it days later. ### 🛡️ Validation & Strict Schemasype Safety Refluxo supports any validation library that implements [Standard Schema](https://standardschema.dev). You can use the popular [Zod](https://zod.dev), [Valibot](https://valibot.dev) for a small bundle size, [TypeBox](https://github.com/sinclairzx81/typebox) or [ArkType](https://arktype.io) for better peformance, or any other. If you store your schema outside code, like in a database, you can write a logic to convert it to a Standard Schema object, for a example, [read the docs](https://refluxo-engine.vitor036daniel.workers.dev/guides/dynamic-schemas) ### 🧠 Powerful Expressions Uses JEXL to allow dynamic data mapping similar to n8n. ### ⏸ Human-in-the-loop Built-in support for external triggers and manual approvals via externalPayload. ### 🔁 Smart Retries Define dynamic retry policies (fixed or exponential backoff) using expressions. ## 📦 Installation ```bash pnpm add refluxo-engine ``` ## 💡 Usage Examples ### 🔧 1. Defining Node Executors Executors are pure logic. They receive resolved data and return an output. ```typescript import { NodeDefinition } from 'refluxo-engine'; import * as v from 'valibot'; // or any other const httpRequest: NodeDefinition = { input: v.object({ url: v.pipe(v.string(), v.url()) }), async executor(data, context) { const response = await fetch(data.url); const json = await response.json(); return { data: json }; }, }; ``` ### 🏗️ 2. Workflow Definition Workflows are plain JSON objects, making them easy to store and fetch from a frontend or database. ```typescript const workflow = { nodes: [ { id: 'start', type: 'http_request', data: { url: 'https://api.example.com/data' } }, data: { check: '{{ nodes.start.last.data.status === "ok" }}' } ], edges: [ { source: 'start', target: 'check_status' }, { source: 'check_status', target: 'notify_node', sourceHandle: 'true' } ] }; ``` ### ⏸ 3. Pause & Resume (Human-in-the-loop) You can pause execution to wait for an event or manual approval. ```typescript // In your executor async executor(data, context, externalPayload) { if (!externalPayload) { return { data: {}, __pause: true }; // Engine will stop here } return { data: externalPayload }; // Resumes with the payload } // Executing const engine = new WorkflowEngine({ workflow, nodeDefinitions }); let snapshot = await engine.execute({ initialNodeId: 'start' }); // ... later, when the user approves ... snapshot = await engine.execute({ snapshot, externalPayload: { approvedBy: 'admin_id' } }); ``` ### 🔄 Smart Retry Policies Retries can be static or dynamic, driven by expressions. ```typescript const apiNode = { type: 'api_call', retryPolicy: { maxAttempts: 'nodes.config.last.data.retryCount', interval: 5000, backoff: 'exponential' }, // ... }; ``` ## 🚀 Why Refluxo? * Stateless by Design: No need for a persistent event loop. The state is in your database, not in memory. * Highly Extensible: Replace the Expression Engine or the Validation logic easily. * Traceable: Metadata tracks totalExecutionTime, attempts, and every node output, making debugging a breeze. * Developer Friendly: Built with TypeScript for full type safety. ## 📝 Use Cases * Automation Platforms: Build a custom Zapier/n8n for your niche. * Approval Workflows: Systems that require human intervention (e.g., Expense approval). * Scheduled Tasks: Flows that wait for a specific date or time to continue. * Complex Orchestration: Microservices coordination with automatic retries. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/context.md' description: >- The `Context` is the memory of your workflow. It's a key-value store within the `Snapshot` that holds the results of every node that has been executed. This allows nodes to access data produced by previous nodes, enabling complex data flows. --- # The Context The `Context` is the memory of your workflow. It's a key-value store within the `Snapshot` that holds the results of every node that has been executed. This allows nodes to access data produced by previous nodes, enabling complex data flows. ## Estrutura do Contexto The `Context` is an object where each key is a `nodeId` and the value is an **array** of `NodeResult` objects. ```typescript interface Context { [nodeId: string]: NodeResult[]; } interface NodeResult { output: unknown | null; timestamp: number; error?: string; attempt: number; } ``` ### Por que um Array? Storing results in an array is a crucial design decision to properly support loops and retries. If a node is executed multiple times (e.g., inside a loop), each execution will append a new `NodeResult` to the array. This preserves the complete history of the execution, preventing data from being overwritten and allowing for detailed inspection and debugging. ## Acessando Dados do Contexto You don't access the `Context` object directly within your nodes' `executor` functions. Instead, you use the [Expression Engine](./expressions.md) in your `Node.data` configuration to declare what data your node needs. The engine prepares a simplified, "flattened" version of the context to make expressions clean and intuitive. ### Exemplo Imagine a node with `id: "fetch-user"` that outputs `{ "name": "John Doe" }`. In a subsequent node, you can configure its `data` property to access this output: ```javascript // In your WorkflowDefinition { id: "send-email", type: "email-sender", data: { // The expression engine will resolve this string recipientName: "{{ `fetch-user`.last.data.name }}" } } ``` The expression engine provides helpers to access the results: * **`my_node.last.data`**: Accesses the `output` of the most recent execution of the node with id `my_node`. This is the most common accessor. * **`my_node.all`**: Accesses the full array of results for `my_node`, useful for aggregation after a loop. * **`my_node.all[0].data`**: Accesses the output of the first execution of `my_node`. By using expressions, your node's logic (`executor`) remains decoupled from the structure of the workflow. It simply receives the data it needs, already resolved. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/engine.md' description: >- The `WorkflowEngine` is the heart of the library. It is a stateless class responsible for orchestrating the execution of a workflow based on a given `WorkflowDefinition`. Its primary goal is to transition between states, generating a new, immutable `Snapshot` at every step. --- # The Engine The `WorkflowEngine` is the heart of the library. It is a stateless class responsible for orchestrating the execution of a workflow based on a given `WorkflowDefinition`. Its primary goal is to transition between states, generating a new, immutable `Snapshot` at every step. ## Execution Model: Step-by-Step To ensure compatibility with serverless and other ephemeral environments, the engine does not run an entire workflow in a single, long-running process. Instead, it operates on a step-by-step basis. The main method, `execute`, takes the current state (either a `Snapshot` object or an `initialNodeId`) and runs the workflow step by step until a pause, error, or completion is encountered. ```typescript import { WorkflowEngine } from "refluxo"; const engine = new WorkflowEngine({ workflow, nodeDefinitions }); // Starting a new execution let snapshot = await engine.execute({ initialNodeId: "start-node", workflowId: "my-first-workflow" }); // Resuming a paused execution let resumedSnapshot = await engine.execute({ snapshot: pausedSnapshot, externalPayload: { approved: true } // Data for the waiting node }); ``` ### How it Works 1. **Inicialização**: The `execute` method receives the initial state. If it's a new execution, it creates a fresh `Snapshot`. If it's resuming, it loads the provided snapshot and sets its status to `active`. 2. **Execution Loop**: It runs a `while` loop that continues as long as the `Snapshot.status` is `"active"`. 3. **`executeStep`**: Inside the loop, it calls `executeStep`, which is responsible for executing a single node. * It runs the input data through the configured **Transformers** (e.g., resolving expressions). * It validates the input against the node's schema. * It calls the node's `executor` function. * It validates the output. * It runs the output data through the configured **Transformers**. * It determines the next node to execute. * It returns a new `Snapshot` with the updated state. ### The `globals` Object The `execute` method also accepts an optional `globals` object. This data is passed directly to the `transformInput` and `transformOutput` methods of all transformers but is **not** stored in the `Snapshot`. This is the mechanism for injecting sensitive data (secrets) or environment-specific configuration into the workflow execution without persisting it. ```typescript await engine.execute({ snapshot, globals: { API_KEY: process.env.API_KEY, DB_CONNECTION: dbConnection } }); ``` 4. **Completion**: The loop terminates when the `status` changes to `paused`, `completed`, `failed`, or `error`. The final `Snapshot` is then returned. This model ensures that each step is an atomic transaction, making the entire process highly resilient and observable. ## Transformers The engine uses a pipeline of **Transformers** to process data before and after node execution. This allows for dynamic behavior, such as variable substitution, encryption/decryption, or custom data manipulation. The `WorkflowEngine` accepts an array of transformers in its constructor: ```typescript const engine = new WorkflowEngine({ workflow, nodeDefinitions, transformers: [new JexlEngine(), myCustomTransformer] }); ``` ### The `ITransformEngine` Interface A transformer implements the `ITransformEngine` interface, which has two optional methods: 1. **`transformInput(data, context, globals, metadata)`**: Called before a node is executed. It processes the node's input data (e.g., resolving `{{ expressions }}`). 2. **`transformOutput(data, context, globals, metadata)`**: Called after a node is executed. It processes the node's output data. By default, the engine includes the `JexlEngine`, which handles expression resolution. You can add your own transformers to extend the engine's capabilities. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/snapshot.md' description: >- The `Snapshot` is the most critical component for achieving statelessness. It is a serializable JSON object that captures the entire state of a workflow execution at any given moment. By saving and rehydrating this object, you can pause, resume, and retry workflows, even across different processes or machines. --- # The Snapshot The `Snapshot` is the most critical component for achieving statelessness. It is a serializable JSON object that captures the entire state of a workflow execution at any given moment. By saving and rehydrating this object, you can pause, resume, and retry workflows, even across different processes or machines. ## Anatomy of a Snapshot ```typescript interface Snapshot { workflowId: string; status: "active" | "paused" | "error" | "completed" | "failed"; currentNodeId: string | null; context: Context; version: number; lastStartedAt?: number; totalExecutionTime?: number; metadata: { [key: string]: unknown }; retryState?: { nodeId: string; attempts: number; nextRetryAt?: number; }; } ``` * `workflowId`: The ID of the workflow being executed. * `status`: The current status of the execution. * `active`: The workflow is currently running. * `paused`: The workflow is waiting for an external event (e.g., human input or a delay). * `error`: The workflow encountered a recoverable error and is waiting for a retry. * `completed`: The workflow finished successfully. * `failed`: The workflow encountered a non-recoverable error or exhausted its retries. * `currentNodeId`: The ID of the node that is about to be executed or that has just been executed. * `context`: A record of all data produced by the executed nodes. See [Context](./context.md) for more details. * `version`: A number that increments with each step. This is crucial for implementing optimistic locking when persisting the snapshot in a database, preventing race conditions in distributed environments. * `lastStartedAt` / `totalExecutionTime`: Timestamps for monitoring and performance tracking. * `metadata`: An open object to store any custom data related to the execution. * `retryState`: If the workflow is in an `error` status, this object contains information about the pending retry, such as the number of attempts and when the next attempt should be scheduled. ## The Role of the Snapshot in the Execution Cycle 1. **Start**: A new workflow begins by creating an initial snapshot with `status: "active"`. 2. **Execute Step**: The `WorkflowEngine` takes a snapshot, executes the `currentNodeId`, and produces a **new snapshot** with the updated state (`context`, `version`, `currentNodeId`, etc.). 3. **Pause**: If a node returns `__pause: true`, the engine returns a snapshot with `status: "paused"`. This snapshot can be saved to a database. 4. **Resume**: To resume, you pass the saved snapshot back to the `engine.execute()` method, optionally with an `externalPayload`. The engine sets the status back to `active` and continues from where it left off. 5. **Error & Retry**: If a node fails, the `handleError` method checks its `RetryPolicy`. If a retry is warranted, it returns a snapshot with `status: "error"` and the `retryState`. An external system can then decide when to re-execute based on the `nextRetryAt` timestamp. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/transformers.md' --- # Transformers Transformers are a powerful architectural feature of the Refluxo engine that allows you to intercept and modify data at key points in the execution lifecycle. They provide a layer of flexibility that sits "outside" the core node logic, enabling cross-cutting concerns like data sanitization, optimization, and dynamic resolution. ## The `ITransformEngine` Interface Any class that implements the `ITransformEngine` interface can be injected into the `WorkflowEngine`. ```typescript export interface ITransformEngine { transformInput?( data: unknown, context: unknown, globals?: unknown, metadata?: unknown ): Promise transformOutput?( data: unknown, context: unknown, globals?: unknown, metadata?: unknown ): Promise } ``` ### Lifecycle Hooks 1. **`transformInput`**: Runs before a node's `executor` is called. Used to resolve expressions (like Jexl) or decrypt incoming data. 2. **`transformOutput`**: Runs after a node's `executor` finishes. Used to filter results, encrypt data, or optimize storage. ## Accessing Globals and Secrets The `WorkflowEngine` constructor accepts a `globals` object. This is a read-only container for data that should be available to the engine but **not persisted in the snapshot**. This is the ideal place for secrets, API keys, or environment-specific configuration. Transformers receive this `globals` object directly in the `transformInput` and `transformOutput` methods, allowing them to resolve values securely. ### Example: Secure Secret Resolution Instead of passing a secret directly to a node (which would expose it in the snapshot), you can use a transformer to resolve a "secret reference" at runtime. ```typescript // 1. Define a transformer that looks for a specific prefix class SecretResolver implements ITransformEngine { async transformInput(data: unknown, context: unknown, globals: unknown): Promise { if (typeof data === 'string' && data.startsWith('SECRET:')) { const secretName = data.replace('SECRET:', ''); // Access the secrets directly from the globals object const secrets = (globals as any)?.secrets || {}; return secrets[secretName]; } return data; } } // 2. Use it in the workflow const workflow = { nodes: [ { id: "api-call", type: "http-request", data: { apiKey: "SECRET:STRIPE_KEY" // This string is safe to store in the DB } } ], edges: [] }; ``` ## Use Case: Snapshot Optimization One of the most valuable uses of Transformers is to reduce the size of the `Snapshot`. In serverless environments or when using databases with size limits (like DynamoDB), storing the full output of every HTTP request can be costly and inefficient. ### The Scenario Imagine an `http-request` node that fetches a large JSON payload from an external API (e.g., a list of 1000 users), but your workflow only needs the ID of the first user. Storing the entire 1000-user list in the `Snapshot` history is wasteful. By combining **Node Metadata** with a custom **Transformer**, we can filter this data *before* it gets saved to the state. ### Implementation First, we define a custom transformer that looks for a specific metadata field (e.g., `resultFilter`) and uses it to transform the output. ```typescript import { ITransformEngine, JexlEngine } from "refluxo-engine"; export class OutputFilterTransformer implements ITransformEngine { private jexl: JexlEngine; constructor() { this.jexl = new JexlEngine(); } async transformOutput(data: unknown, context: unknown, metadata?: any): Promise { // Check if the node has a filter defined in its metadata if (metadata?.resultFilter) { // Use Jexl to evaluate the filter expression against the data // Example: metadata.resultFilter = "data.users[0].id" return this.jexl.resolve(metadata.resultFilter, { data }); } // If no filter is defined, return data as is return data; } } ``` ### Usage in Workflow Now, when defining the workflow, we can attach the `resultFilter` to the node's metadata. ```typescript const workflow = { nodes: [ { id: "fetch-users", type: "http-request", data: { url: "https://api.example.com/users" }, // This metadata instructs our custom transformer metadata: { resultFilter: "{{ data.users[0].id }}" } } ], edges: [] }; // Inject the transformer const engine = new WorkflowEngine({ workflow, nodeDefinitions, transformers: [new JexlEngine(), new OutputFilterTransformer()] }); ``` **The Result:** The `http-request` executor fetches the full list, but the `OutputFilterTransformer` intercepts the result. Only the single ID is stored in the `Snapshot` context. ## Other Possibilities The flexibility of Transformers opens up many other patterns: * **Security**: Automatically encrypt sensitive fields in `transformOutput` and decrypt them in `transformInput`. * **Logging/Auditing**: Intercept every input/output to send telemetry to an external observability platform. * **Legacy Compatibility**: Transform data formats from old node versions to match new schema requirements on the fly. --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/type-aliases/Middleware.md --- # Type Alias: Middleware() > **Middleware** = (`context`, `next`) => `Promise`<`void`> Defined in: index.ts:143 ## Parameters ### context [`MiddlewareContext`](../interfaces/MiddlewareContext.md) ### next [`NextFunction`](NextFunction.md) ## Returns `Promise`<`void`> --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/type-aliases/NextFunction.md --- # Type Alias: NextFunction() > **NextFunction** = () => `Promise`<`void`> Defined in: index.ts:129 ## Returns `Promise`<`void`> --- --- url: >- https://refluxo-engine.vitor036daniel.workers.dev/api/@refluxo/core/type-aliases/NodesDefinition.md --- # Type Alias: NodesDefinition > **NodesDefinition** = `Record`<`string`, [`NodeDefinition`](../interfaces/NodeDefinition.md)> Defined in: index.ts:62 --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/use-cases.md' description: >- While the Refluxo engine is a powerful tool for orchestrating workflows directly in your code, one of its primary design goals is to serve as a foundational layer for building higher-level platforms, such as --- # Use Cases: Building Platforms with Refluxo While the Refluxo engine is a powerful tool for orchestrating workflows directly in your code, one of its primary design goals is to serve as a foundational layer for building higher-level platforms, such as: * **Low-code/No-code platforms** similar to N8N or Zapier. * **Internal Business Process Management (BPM)** tools. * **CI/CD pipeline orchestrators**. * **IoT (Internet of Things)** automation systems. The core architectural model of Refluxo provides several key advantages that make it exceptionally well-suited for these use cases. ## The Flexibility Advantage ### 1. Decoupled State (The Snapshot) The engine is stateless. The entire state of a workflow is encapsulated in a single, serializable JSON object: the **`Snapshot`**. * **Benefit for Platforms**: This is a game-changer. A backend service can execute a single step of a workflow, save the resulting snapshot to any database (like PostgreSQL, MongoDB, or Redis), and shut down. A separate frontend application can then read that same snapshot to render a real-time visualization of the workflow's progress, inspect the output of each node, and display execution history. There is no need for complex state synchronization between the backend executor and the frontend UI. ### 2. Frontend-Friendly Definitions A `WorkflowDefinition` is a declarative JSON object. * **Benefit for Platforms**: This structure can be easily generated and manipulated by a visual interface. You can build a drag-and-drop editor using libraries like **React Flow** or **Svelte Flow** that outputs a `WorkflowDefinition` JSON. This JSON is then sent to the backend to be executed by the Refluxo engine. The engine becomes the invisible power behind your visual automation platform. ### 3. Pluggable and Dynamic Nodes Node behaviors are not hardcoded into the engine. They are provided as `NodeDefinition` objects. * **Benefit for Platforms**: This allows you to build a plug-in architecture. Your platform could dynamically load new node definitions from different files or even from a database. Users or developers could contribute new nodes (e.g., "Send a Tweet", "Add a row to Google Sheets") by simply providing a new `NodeDefinition`, making your platform highly extensible. ### 4. Embeddable and Portable Refluxo is a library, not a standalone service. * **Benefit for Platforms**: This gives you ultimate flexibility. You can embed the engine within a multi-tenant SaaS application, a desktop app built with Electron, an internal command-line tool, or a network of distributed edge workers. The core logic remains the same, allowing you to build a consistent automation experience across different products and environments. By leveraging these principles, you can focus on building your unique user experience and business logic, while relying on Refluxo to provide the robust, scalable, and flexible orchestration core. --- --- url: 'https://refluxo-engine.vitor036daniel.workers.dev/concepts/workflow.md' description: >- A workflow is the blueprint for an execution. It's a declarative JSON structure that defines the tasks (Nodes) and the connections between them (Edges). --- # Workflow & Node Definition A workflow is the blueprint for an execution. It's a declarative JSON structure that defines the tasks (Nodes) and the connections between them (Edges). ## WorkflowDefinition This is the main object that you provide to the `WorkflowEngine`. It consists of two key parts: * `nodes`: An array of `Node` objects. * `edges`: An array of `Edge` objects that connect the nodes. ```typescript const workflowDefinition: WorkflowDefinition = { nodes: [ { id: "n1", type: "my-custom-node", data: { message: "Hello" } }, { id: "n2", type: "another-node", data: { value: 123 } } ], edges: [ { id: "e1", source: "n1", target: "n2" } ] }; ``` Here's a visual representation of a simple workflow: ```mermaid graph TD A[Start Node] --> B(Intermediate Node) B --> C[End Node] ``` ### Node A `Node` is a single unit of work in your workflow. It has: * `id`: A unique identifier for the node within the workflow. * `type`: A string that maps to a `NodeDefinition`. * `data`: A JSON object containing the static configuration for this node instance. This data can be made dynamic using [expressions](./expressions.md). ### Edge An `Edge` defines the flow of execution between nodes. It has: * `id`: A unique identifier for the edge. * `source`: The ID of the node where the edge originates. * `target`: The ID of the node where the edge terminates. * `sourceHandle` (optional): A key for implementing conditional logic. For more details, see the [Conditionals guide](../guides/conditionals.md). ## NodeDefinition While `WorkflowDefinition` describes the structure, `NodeDefinition` describes the behavior. It's a JavaScript object that tells the engine how a certain `type` of node should operate. You provide an object containing all your `NodeDefinition`s when you instantiate the `WorkflowEngine`. A schema definition library compatible with **Standard Schema** (like Valibot or Zod) should be used. ```typescript import { object, string, parse } from "valibot"; // Using valibot syntax const nodeDefinitions = { "my-custom-node": { // Schema for input validation input: object({ message: string() }), // Schema for output validation output: object({ status: string() }), // The execution logic executor: async (data) => { // The 'data' type is inferred from the 'input' schema console.log(data.message); // "Hello" return { data: { status: "ok" } }; } }, // ... other node definitions }; ``` A `NodeDefinition` contains: * `input`: A Standard Schema to validate the node's resolved input data. * `output`: A Standard Schema to validate the data returned by the executor. * `retryPolicy`: An optional policy for [error handling](../guides/error-handling.md). * `executor`: An `async` function that contains the node's business logic. It receives the resolved `data` and the full `context`, and must return an object containing the `data` to be passed on.