Skip to main content
Version: 2.0.0

Activity Development

What is an Activity

An Activity is a normal function or method that executes a single, well-defined action (either short or long-running), such as calling another service, transcoding a media file, or sending an email message. Activity code should be deterministic.

It is highly recommended to read the Official Activity Documentation from Temporal to familiarize yourself with the concept of an activity.

Activity Sample

Before we start writing our own activity, let's take a look at the sample activity implementation to get a better understanding of how it works.

Follow the steps below to run the sample helloworld activity:

  1. First clone the repository
git clone https://github.com/mssfoobar/wfe-activity
  1. Go to the sample/helloworld folder
cd sample/helloworld
important

Temporal server must be running on your local machine with its port serving at localhost:7233 and web UI at http://wf-admin.{DEV_DOMAIN}. If you don't have it running, refer to Quick Start.

  1. Run the worker and let it run
go run worker/main.go
  1. Run the starter in another terminal. You will see the following output
go run starter/main.go
2024/11/03 20:38:30 INFO No logger configured for temporal client. Created default one.
2024/11/03 20:38:30 Started workflow WorkflowID hello_world_workflowID RunID 11274584-4776-448e-b301-f9b8649e4cac
2024/11/03 20:38:30 Workflow result: HelloWorld! Hi!
  1. Visit the Temporal web ui at http://wf-admin.{DEV_DOMAIN} to see the sample activity in action.

If you look at the activity code in helloworld/activity.go, you will see that it is a simple method with Activities receiver that returns "Hello World!" with additional input as an argument.

sample/helloworld/activity.go
type Activities struct{}

func (a *Activities) HelloWorld(ctx context.Context, input string) (string, error) {
return "HelloWorld! " + input, nil
}

Inside the helloworld/workflow.go, you will see how to call the activity inside the workflow. Here we call the HelloWorld activity with an input of Hi.

sample/helloworld/workflow.go
func Workflow(ctx workflow.Context) (string, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, ao)

logger := workflow.GetLogger(ctx)
logger.Info("Workflow started")

var result string
var activities Activities
err := workflow.ExecuteActivity(ctx, activities.HelloWorld, "Hi!").Get(ctx, &result)
if err != nil {
logger.Error("Activity failed.", "Error", err)
return "", err
}

logger.Info("Workflow completed.", "result", result)

return result, nil
}
tip

Using this sample activity, you can quickly write your own activity and call it inside workflow definition as shown above to test functionality.

Activity Development

  1. Create a new repository using the Activity Template

  2. Write your own activity inside the internal/worker/activities folder

internal/worker/activities/activities.go
type Activities struct{}

// Add your own activities here with Activities as the receiver
//
// for example
// func (a *Activities) ActivityName(ctx context.Context, input string) (string, error) {
// }
  1. After writing your own activity, you can build an image using the docker build command.
docker build -f ./docker/worker.Dockerfile .

Cancellable Activities

important

If your activity takes longer than a minute to execute, it must be cancellable. This is because the workflow engine defaults the heartbeat timeout to 1 minute. If your activity does not send a heartbeat signal within this interval, the workflow engine will deem it timed out and will retry the execution until it reaches the default maximum of 10 attempts.

If you want your activity to be cancellable, you need to heartbeat at regular interval. Below is an example HttpCall activity. Note that before and after executing long-running processes, it will heart beat with activity.RecordHeartbeat(ctx, "status-report-to-workflow") and catch cancellation from the workflow by checking ctx.Done().

import "go.temporal.io/sdk/activity"

// HttpCall take in 'method', 'endpoint' and 'payload' as input
// Successful execution return http response
func (a *Activities) HttpCall(ctx context.Context, endpoint, method, payload string) (any, error) {
client := &http.Client{}
req, err := http.NewRequest(method, endpoint, bytes.NewReader([]byte(payload)))
if err != nil {
return nil, err
}

// heartbeat report before long-running process
activity.RecordHeartbeat(ctx, "status-report-to-workflow")
// catch cancellation from workflow by checking ctx.Done
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

res, err := client.Do(req)
if err != nil {
return nil, err
}

// heartbeat report after long-running process
activity.RecordHeartbeat(ctx, "status-report-to-workflow")
// catch cancellation from workflow by checking ctx.Done
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}


body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}
defer func() {
_ = res.Body.Close()
}()

var result interface{}
if err := json.Unmarshal(body, &result); err != nil {
return nil, err
}

return result, nil
}

Another example of cancellable activity is the listener type which has for loop that will keep listening to the external event.

import "go.temporal.io/sdk/activity"

// Listener check for external event every 30 seconds
func (a *Activities) Listener(ctx context.Context) (error) {
for {
activity.RecordHeartbeat(ctx, "status-report-to-workflow")
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if hasEvent() {
return nil
}
time.Sleep(30 * time.Second)
}
}

Activity Registration

Workflow Designer doesn't have visibility of the activities registered in the Temporal server. To make Workflow Designer show the activities, you need to insert the rows representing the activities function declarations in the database. Using the same HttpCall example, its record inside the service_activity table will look like this:

HttpCall

NameDescription
service_namethe name of the service where the activity is registered. this should be the name of the container
activity_typethe name of the activity function
activity_iconthe icon of the activity. refer to enum_activity_icon table for available icons
activity_paramthe parameter of the activity. must be the same as the function argument. here endpoint, method and payload are the arguments of the HttpCall activity. ordering must be the same as function arguments.
activity_result
  • {"object": {}} for returning json object
  • "string" for returning string
  • true or false for returning boolean
  • 0 for returning number
  • null for no return value
  • timeout_in_secondsthe timeout of the activity in seconds
    danger

    When setting activity_result to json object make sure the value is {"object": {}} not just {} which is considered as empty object and will result in validation error in Workflow Designer.

    Retrieving Workflow Metadata

    info

    Refer to workflow execution guide for how to pass metadata when starting a workflow.

    In your activity, you can retrieve the workflow metadata using temporal.ContextValue(ctx) from aoh-golib/temporal package. Here is an example of how to get the workflow metadata.

    This retrieve the metadata key trigger_rule_id.

    import "github.com/mssfoobar/aoh-golib/temporal"

    func (a *Activities) GetTriggerRuleId(ctx context.Context) (string, error) {
    metadata := temporal.ContextValue(ctx)
    return fmt.Sprintf("Hello, %s", metadata["trigger_rule_id"]), nil
    }