Activity Development
What is an Activity
An Activity is a normal function or method that executes a single, well-defined action (either short or long-running), such as calling another service, transcoding a media file, or sending an email message. Activity code should be deterministic.
It is highly recommended to read the Official Activity Documentation from Temporal to familiarize yourself with the concept of an activity.
Activity Sample
Before we start writing our own activity, let's take a look at the sample activity implementation to get a better understanding of how it works.
Follow the steps below to run the sample helloworld
activity:
- First clone the repository
git clone https://github.com/mssfoobar/wfe-activity
- Go to the
sample/helloworld
folder
cd sample/helloworld
Temporal server must be running on your local machine with its port serving at localhost:7233
and web UI at
http://wf-admin.{DEV_DOMAIN}
. If you don't have it running, refer to
Quick Start.
- Run the worker and let it run
go run worker/main.go
- Run the starter in another terminal. You will see the following output
go run starter/main.go
2024/11/03 20:38:30 INFO No logger configured for temporal client. Created default one.
2024/11/03 20:38:30 Started workflow WorkflowID hello_world_workflowID RunID 11274584-4776-448e-b301-f9b8649e4cac
2024/11/03 20:38:30 Workflow result: HelloWorld! Hi!
- Visit the Temporal web ui at
http://wf-admin.{DEV_DOMAIN}
to see the sample activity in action.
If you look at the activity code in helloworld/activity.go
, you will see that it is a simple method with
Activities receiver that returns "Hello World!" with additional input as an argument.
type Activities struct{}
func (a *Activities) HelloWorld(ctx context.Context, input string) (string, error) {
return "HelloWorld! " + input, nil
}
Inside the helloworld/workflow.go
, you will see how to call the activity inside the workflow. Here we call the
HelloWorld
activity with an input of Hi
.
func Workflow(ctx workflow.Context) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)
logger := workflow.GetLogger(ctx)
logger.Info("Workflow started")
var result string
var activities Activities
err := workflow.ExecuteActivity(ctx, activities.HelloWorld, "Hi!").Get(ctx, &result)
if err != nil {
logger.Error("Activity failed.", "Error", err)
return "", err
}
logger.Info("Workflow completed.", "result", result)
return result, nil
}
Using this sample activity, you can quickly write your own activity and call it inside workflow definition as shown above to test functionality.
Activity Development
-
Create a new repository using the Activity Template
-
Write your own activity inside the
internal/worker/activities
folder
type Activities struct{}
// Add your own activities here with Activities as the receiver
//
// for example
// func (a *Activities) ActivityName(ctx context.Context, input string) (string, error) {
// }
- After writing your own activity, you can build an image using the docker build command.
docker build -f ./docker/worker.Dockerfile .
Cancellable Activities
If your activity takes longer than a minute to execute, it must be cancellable. This is because the workflow engine defaults the heartbeat timeout to 1 minute. If your activity does not send a heartbeat signal within this interval, the workflow engine will deem it timed out and will retry the execution until it reaches the default maximum of 10 attempts.
If you want your activity to be cancellable, you need to heartbeat at regular interval.
Below is an example HttpCall activity. Note that before and after executing long-running processes, it will
heart beat with activity.RecordHeartbeat(ctx, "status-report-to-workflow")
and catch cancellation from the workflow
by checking ctx.Done()
.
import "go.temporal.io/sdk/activity"
// HttpCall take in 'method', 'endpoint' and 'payload' as input
// Successful execution return http response
func (a *Activities) HttpCall(ctx context.Context, endpoint, method, payload string) (any, error) {
client := &http.Client{}
req, err := http.NewRequest(method, endpoint, bytes.NewReader([]byte(payload)))
if err != nil {
return nil, err
}
// heartbeat report before long-running process
activity.RecordHeartbeat(ctx, "status-report-to-workflow")
// catch cancellation from workflow by checking ctx.Done
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
res, err := client.Do(req)
if err != nil {
return nil, err
}
// heartbeat report after long-running process
activity.RecordHeartbeat(ctx, "status-report-to-workflow")
// catch cancellation from workflow by checking ctx.Done
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
defer func() {
_ = res.Body.Close()
}()
var result interface{}
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}
return result, nil
}
Another example of cancellable activity is the listener type which has for loop that will keep listening to the external event.
import "go.temporal.io/sdk/activity"
// Listener check for external event every 30 seconds
func (a *Activities) Listener(ctx context.Context) (error) {
for {
activity.RecordHeartbeat(ctx, "status-report-to-workflow")
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if hasEvent() {
return nil
}
time.Sleep(30 * time.Second)
}
}
Activity Registration
Workflow Designer doesn't have visibility of the activities registered in the Temporal server. To make Workflow
Designer show the activities, you need to insert the rows representing the activities function declarations in the
database. Using the same HttpCall
example, its record inside the service_activity
table will look like this:
Name | Description |
---|---|
service_name | the name of the service where the activity is registered. this should be the name of the container |
activity_type | the name of the activity function |
activity_icon | the icon of the activity. refer to enum_activity_icon table for available icons |
activity_param | the parameter of the activity. must be the same as the function argument. here endpoint , method and
payload are the arguments of the HttpCall activity. ordering must be the same as function arguments. |
activity_result | {"object": {}} for returning json object"string" for returning stringtrue or false for returning boolean0 for returning numbernull for no return value |
timeout_in_seconds | the timeout of the activity in seconds |
When setting activity_result
to json object make sure the value is {"object": {}}
not just {}
which is
considered as empty object and will result in validation error in Workflow Designer.
Retrieving Workflow Metadata
Refer to workflow execution guide for how to pass metadata when starting a workflow.
In your activity, you can retrieve the workflow metadata using temporal.ContextValue(ctx)
from aoh-golib/temporal
package. Here is an example of how to get the workflow metadata.
This retrieve the metadata key trigger_rule_id
.
import "github.com/mssfoobar/aoh-golib/temporal"
func (a *Activities) GetTriggerRuleId(ctx context.Context) (string, error) {
metadata := temporal.ContextValue(ctx)
return fmt.Sprintf("Hello, %s", metadata["trigger_rule_id"]), nil
}