321 lines
11 KiB
Plaintext
321 lines
11 KiB
Plaintext
---
|
|
description:
|
|
globs:
|
|
alwaysApply: false
|
|
---
|
|
This file provides rules and context for generating or understanding Go code related to a custom Domain Specific Language (DSL) for defining Temporal workflows within this project.
|
|
|
|
**DSL Overview:**
|
|
|
|
This project uses a specific Go-based DSL to define Temporal workflows declaratively. The core idea is to represent workflow logic using nested structures rather than imperative Go code directly within the workflow function.
|
|
|
|
**Core DSL Structures (Defined in `dsl` package):**
|
|
|
|
* **`Workflow`**: The top-level structure representing the entire workflow definition.
|
|
* `Variables map[string]string`: Initial variables available to the workflow. These act as the initial state or input.
|
|
* `Root Statement`: The starting point of the workflow logic (can be a single activity, a sequence, or parallel steps).
|
|
* **`Statement`**: A building block representing a single step or a composite structure. A `Statement` can contain one of the following:
|
|
* `Activity *ActivityInvocation`: Represents a call to a Temporal Activity.
|
|
* `Sequence *Sequence`: Represents a series of statements executed sequentially.
|
|
* `Parallel *Parallel`: Represents a set of statements executed in parallel.
|
|
* **`Sequence`**: Contains a slice `Elements []*Statement` which are executed one after another.
|
|
* **`Parallel`**: Contains a slice `Branches []*Statement` which are executed concurrently. The parallel execution waits for all branches to complete. If one branch errors, others are cancelled.
|
|
* **`ActivityInvocation`**: Defines how to invoke a specific Temporal Activity.
|
|
* `Name string`: The registered name of the Temporal Activity to call.
|
|
* `Arguments []string`: A list of variable names (keys from the `bindings` map) whose values should be passed as arguments to the activity.
|
|
* `Result string`: The name of the variable (key in the `bindings` map) where the activity's result should be stored.
|
|
|
|
**Execution Flow (`SimpleDSLWorkflow`):**
|
|
|
|
* The entry point for workflows defined using this DSL is the `SimpleDSLWorkflow` function: `func SimpleDSLWorkflow(ctx workflow.Context, dslWorkflow Workflow) ([]byte, error)`.
|
|
* It initializes a `bindings` map from the `dslWorkflow.Variables`.
|
|
* It sets default `workflow.ActivityOptions` (e.g., `StartToCloseTimeout`).
|
|
* It uses `workflow.GetLogger(ctx)` for logging.
|
|
* It recursively calls the `execute` method on the `Root` statement, passing the `ctx` and `bindings` map.
|
|
|
|
**Data Flow (`bindings`):**
|
|
|
|
* The `bindings map[string]string` acts as the shared state or memory for the workflow execution.
|
|
* Initial values come from `Workflow.Variables`.
|
|
* `ActivityInvocation.Arguments` specifies which values from `bindings` to use as input for an activity.
|
|
* `ActivityInvocation.Result` specifies the key in `bindings` where the activity's return value should be stored.
|
|
* The `makeInput` helper function retrieves argument values from the `bindings` map based on the names listed in `ActivityInvocation.Arguments`.
|
|
|
|
**Concurrency (`Parallel` execution):**
|
|
|
|
* The `Parallel.execute` method uses `workflow.Go` to launch each branch concurrently.
|
|
* It uses `workflow.NewSelector` and `Future.Get` to wait for branches to complete.
|
|
* `workflow.WithCancel` is used to cancel pending branches if one branch encounters an error.
|
|
|
|
**Working with the DSL:**
|
|
|
|
* When asked to create or modify workflows, structure the logic using the `Workflow`, `Statement`, `Sequence`, `Parallel`, and `ActivityInvocation` types.
|
|
* Define the flow of execution by nesting `Sequence` and `Parallel` structures within `Statement`s.
|
|
* Specify activity calls using `ActivityInvocation`, ensuring `Name`, `Arguments`, and `Result` are correctly defined based on the available activities and the desired data flow through the `bindings` map.
|
|
* Remember that the actual activity implementation exists separately (see `activities.mdc`) and is invoked by name.
|
|
```go
|
|
package dsl
|
|
|
|
import (
|
|
"time"
|
|
|
|
"go.temporal.io/sdk/workflow"
|
|
)
|
|
|
|
type (
|
|
// Workflow is the type used to express the workflow definition. Variables are a map of valuables. Variables can be
|
|
// used as input to Activity.
|
|
Workflow struct {
|
|
Variables map[string]string
|
|
Root Statement
|
|
}
|
|
|
|
// Statement is the building block of dsl workflow. A Statement can be a simple ActivityInvocation or it
|
|
// could be a Sequence or Parallel.
|
|
Statement struct {
|
|
Activity *ActivityInvocation
|
|
Sequence *Sequence
|
|
Parallel *Parallel
|
|
}
|
|
|
|
// Sequence consist of a collection of Statements that runs in sequential.
|
|
Sequence struct {
|
|
Elements []*Statement
|
|
}
|
|
|
|
// Parallel can be a collection of Statements that runs in parallel.
|
|
Parallel struct {
|
|
Branches []*Statement
|
|
}
|
|
|
|
// ActivityInvocation is used to express invoking an Activity. The Arguments defined expected arguments as input to
|
|
// the Activity, the result specify the name of variable that it will store the result as which can then be used as
|
|
// arguments to subsequent ActivityInvocation.
|
|
ActivityInvocation struct {
|
|
Name string
|
|
Arguments []string
|
|
Result string
|
|
}
|
|
|
|
executable interface {
|
|
execute(ctx workflow.Context, bindings map[string]string) error
|
|
}
|
|
)
|
|
|
|
// SimpleDSLWorkflow workflow definition
|
|
func SimpleDSLWorkflow(ctx workflow.Context, dslWorkflow Workflow) ([]byte, error) {
|
|
bindings := make(map[string]string)
|
|
//workflowcheck:ignore Only iterates for building another map
|
|
for k, v := range dslWorkflow.Variables {
|
|
bindings[k] = v
|
|
}
|
|
|
|
ao := workflow.ActivityOptions{
|
|
StartToCloseTimeout: 10 * time.Second,
|
|
}
|
|
ctx = workflow.WithActivityOptions(ctx, ao)
|
|
logger := workflow.GetLogger(ctx)
|
|
|
|
err := dslWorkflow.Root.execute(ctx, bindings)
|
|
if err != nil {
|
|
logger.Error("DSL Workflow failed.", "Error", err)
|
|
return nil, err
|
|
}
|
|
|
|
logger.Info("DSL Workflow completed.")
|
|
return nil, err
|
|
}
|
|
|
|
func (b *Statement) execute(ctx workflow.Context, bindings map[string]string) error {
|
|
if b.Parallel != nil {
|
|
err := b.Parallel.execute(ctx, bindings)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if b.Sequence != nil {
|
|
err := b.Sequence.execute(ctx, bindings)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if b.Activity != nil {
|
|
err := b.Activity.execute(ctx, bindings)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a ActivityInvocation) execute(ctx workflow.Context, bindings map[string]string) error {
|
|
inputParam := makeInput(a.Arguments, bindings)
|
|
var result string
|
|
err := workflow.ExecuteActivity(ctx, a.Name, inputParam).Get(ctx, &result)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if a.Result != "" {
|
|
bindings[a.Result] = result
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s Sequence) execute(ctx workflow.Context, bindings map[string]string) error {
|
|
for _, a := range s.Elements {
|
|
err := a.execute(ctx, bindings)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p Parallel) execute(ctx workflow.Context, bindings map[string]string) error {
|
|
//
|
|
// You can use the context passed in to activity as a way to cancel the activity like standard GO way.
|
|
// Cancelling a parent context will cancel all the derived contexts as well.
|
|
//
|
|
|
|
// In the parallel block, we want to execute all of them in parallel and wait for all of them.
|
|
// if one activity fails then we want to cancel all the rest of them as well.
|
|
childCtx, cancelHandler := workflow.WithCancel(ctx)
|
|
selector := workflow.NewSelector(ctx)
|
|
var activityErr error
|
|
for _, s := range p.Branches {
|
|
f := executeAsync(s, childCtx, bindings)
|
|
selector.AddFuture(f, func(f workflow.Future) {
|
|
err := f.Get(ctx, nil)
|
|
if err != nil {
|
|
// cancel all pending activities
|
|
cancelHandler()
|
|
activityErr = err
|
|
}
|
|
})
|
|
}
|
|
|
|
for i := 0; i < len(p.Branches); i++ {
|
|
selector.Select(ctx) // this will wait for one branch
|
|
if activityErr != nil {
|
|
return activityErr
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func executeAsync(exe executable, ctx workflow.Context, bindings map[string]string) workflow.Future {
|
|
future, settable := workflow.NewFuture(ctx)
|
|
workflow.Go(ctx, func(ctx workflow.Context) {
|
|
err := exe.execute(ctx, bindings)
|
|
settable.Set(nil, err)
|
|
})
|
|
return future
|
|
}
|
|
|
|
func makeInput(argNames []string, argsMap map[string]string) []string {
|
|
var args []string
|
|
for _, arg := range argNames {
|
|
args = append(args, argsMap[arg])
|
|
}
|
|
return args
|
|
}
|
|
```
|
|
|
|
**Example Workflows:**
|
|
|
|
Here are some example workflow implementations:
|
|
|
|
* Serial workflow:
|
|
|
|
```yaml
|
|
# This sample workflow execute 3 steps in sequence.
|
|
# 1) sampleActivity1, takes arg1 as input, and put result as result1.
|
|
# 2) sampleActivity2, takes result1 as input, and put result as result2.
|
|
# 3) sampleActivity3, takes args2 and result2 as input, and put result as result3.
|
|
variables:
|
|
arg1: value1
|
|
arg2: value2
|
|
|
|
root:
|
|
sequence:
|
|
elements:
|
|
- activity:
|
|
name: SampleActivity1
|
|
arguments:
|
|
- arg1
|
|
result: result1
|
|
- activity:
|
|
name: SampleActivity2
|
|
arguments:
|
|
- result1
|
|
result: result2
|
|
- activity:
|
|
name: SampleActivity3
|
|
arguments:
|
|
- arg2
|
|
- result2
|
|
result: result3
|
|
```
|
|
|
|
* Parallel workflow:
|
|
|
|
```yaml
|
|
# This sample workflow execute 3 steps in sequence.
|
|
# 1) activity1, takes arg1 as input, and put result as result1.
|
|
# 2) it runs a parallel block which runs below sequence branches in parallel
|
|
# 2.1) sequence 1
|
|
# 2.1.1) activity2, takes result1 as input, and put result as result2
|
|
# 2.1.2) activity3, takes arg2 and result2 as input, and put result as result3
|
|
# 2.2) sequence 2
|
|
# 2.2.1) activity4, takes result1 as input, and put result as result4
|
|
# 2.2.2) activity5, takes arg3 and result4 as input, and put result as result5
|
|
# 3) activity1, takes result3 and result5 as input, and put result as result6.
|
|
variables:
|
|
arg1: value1
|
|
arg2: value2
|
|
arg3: value3
|
|
|
|
root:
|
|
sequence:
|
|
elements:
|
|
- activity:
|
|
name: SampleActivity1
|
|
arguments:
|
|
- arg1
|
|
result: result1
|
|
- parallel:
|
|
branches:
|
|
- sequence:
|
|
elements:
|
|
- activity:
|
|
name: SampleActivity2
|
|
arguments:
|
|
- result1
|
|
result: result2
|
|
- activity:
|
|
name: SampleActivity3
|
|
arguments:
|
|
- arg2
|
|
- result2
|
|
result: result3
|
|
- sequence:
|
|
elements:
|
|
- activity:
|
|
name: SampleActivity4
|
|
arguments:
|
|
- result1
|
|
result: result4
|
|
- activity:
|
|
name: SampleActivity5
|
|
arguments:
|
|
- arg3
|
|
- result4
|
|
result: result5
|
|
- activity:
|
|
name: SampleActivity1
|
|
arguments:
|
|
- result3
|
|
- result5
|
|
result: result6
|
|
``` |