External Storage - Go SDK
External Storage is in Pre-Release. APIs and configuration may change before the stable release. Join the #large-payloads Slack channel to provide feedback or ask for help.
The Temporal Service enforces a 2 MB per-payload limit by default. This limit is configurable on self-hosted deployments. When your Workflows or Activities handle data larger than the limit, you can offload payloads to external storage, such as Amazon S3, and pass a small reference token through the Event History instead. This page shows you how to set up External Storage with Amazon S3 and how to implement a custom storage driver.
For a conceptual overview of External Storage and its use cases, see External Storage.
Store and retrieve large payloads with Amazon S3
The Go SDK includes an S3 storage driver. Follow these steps to set it up:
Prerequisites
- An Amazon S3 bucket that you have read and write access to. Refer to lifecycle management to ensure that your payloads remain available for the entire lifetime of the Workflow.
- Install the S3 driver module:
go get go.temporal.io/sdk/contrib/aws/s3driver
Procedure
-
Load your AWS configuration and create the S3 storage driver. The driver uses your standard AWS credentials from the environment (environment variables, IAM role, or AWS config file):
import (
"github.com/aws/aws-sdk-go-v2/config"
awss3 "github.com/aws/aws-sdk-go-v2/service/s3"
"go.temporal.io/sdk/contrib/aws/s3driver"
"go.temporal.io/sdk/contrib/aws/s3driver/awssdkv2"
)
cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithRegion("us-east-2"),
)
if err != nil {
log.Fatalf("load AWS config: %v", err)
}
driver, err := s3driver.NewDriver(s3driver.Options{
Client: awssdkv2.NewClient(awss3.NewFromConfig(cfg)),
Bucket: s3driver.StaticBucket("my-temporal-payloads"),
})
if err != nil {
log.Fatalf("create S3 driver: %v", err)
} -
Configure the driver on
ExternalStorageand pass it in your Client options:import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/worker"
)
c, err := client.Dial(client.Options{
HostPort: "localhost:7233",
ExternalStorage: converter.ExternalStorage{
Drivers: []converter.StorageDriver{driver},
},
})
if err != nil {
log.Fatalf("connect to Temporal: %v", err)
}
defer c.Close()
w := worker.New(c, "my-task-queue", worker.Options{})By default, payloads larger than 256 KiB are offloaded to external storage. You can adjust this with the
PayloadSizeThresholdoption, even setting it to 0 to externalize all payloads regardless of size. Refer to Configure payload size threshold for more information.All Workflows and Activities running on the Worker use the storage driver automatically without changes to your business logic. The driver uploads and downloads payloads concurrently and validates payload integrity on retrieve.
Implement a custom storage driver
If you need a storage backend other than what the built-in drivers allow, you can implement your own storage driver. Store payloads durably so that they survive process crashes and remain available for debugging and auditing after the Workflow completes. Refer to lifecycle management for retention requirements.
The following example shows a custom driver that uses local disk as the backing store. This example is for local development and testing only. In production, use a durable storage system that is accessible to all Workers:
package main
import (
"fmt"
"os"
"path/filepath"
"github.com/google/uuid"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
"google.golang.org/protobuf/proto"
)
type LocalDiskStorageDriver struct {
storeDir string
}
func NewLocalDiskStorageDriver(storeDir string) *LocalDiskStorageDriver {
return &LocalDiskStorageDriver{storeDir: storeDir}
}
func (d *LocalDiskStorageDriver) Name() string {
return "local-disk"
}
func (d *LocalDiskStorageDriver) Type() string {
return "local-disk"
}
func (d *LocalDiskStorageDriver) Store(
ctx converter.StorageDriverStoreContext,
payloads []*commonpb.Payload,
) ([]converter.StorageDriverClaim, error) {
dir := d.storeDir
if info, ok := ctx.Target.(converter.StorageDriverWorkflowInfo); ok && info.WorkflowID != "" {
dir = filepath.Join(d.storeDir, info.Namespace, info.WorkflowID)
}
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, fmt.Errorf("create store directory: %w", err)
}
claims := make([]converter.StorageDriverClaim, len(payloads))
for i, payload := range payloads {
key := uuid.NewString() + ".bin"
filePath := filepath.Join(dir, key)
data, err := proto.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("marshal payload: %w", err)
}
if err := os.WriteFile(filePath, data, 0o644); err != nil {
return nil, fmt.Errorf("write payload: %w", err)
}
claims[i] = converter.StorageDriverClaim{
ClaimData: map[string]string{"path": filePath},
}
}
return claims, nil
}
func (d *LocalDiskStorageDriver) Retrieve(
ctx converter.StorageDriverRetrieveContext,
claims []converter.StorageDriverClaim,
) ([]*commonpb.Payload, error) {
payloads := make([]*commonpb.Payload, len(claims))
for i, claim := range claims {
filePath := claim.ClaimData["path"]
data, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("read payload: %w", err)
}
payload := &commonpb.Payload{}
if err := proto.Unmarshal(data, payload); err != nil {
return nil, fmt.Errorf("unmarshal payload: %w", err)
}
payloads[i] = payload
}
return payloads, nil
}
The following sections walk through the key parts of the driver implementation.
1. Implement the StorageDriver interface
A custom driver implements the converter.StorageDriver interface with four methods:
Name()returns a unique string that identifies the driver. The SDK stores this name in the claim check reference so it can route retrieval requests to the correct driver. Changing the name after payloads have been stored breaks retrieval.Type()returns a string that identifies the driver implementation. UnlikeName(), this must be the same across all instances of the same driver type regardless of configuration.Store()receives a slice of payloads and returns oneStorageDriverClaimper payload. A claim is a set of string key-value pairs that the driver uses to locate the payload later.Retrieve()receives the claims thatStore()produced and returns the original payloads.
2. Store payloads
In Store(), marshal each Payload protobuf message to bytes with proto.Marshal(payload) and write the bytes to
your storage system. The application data has already been serialized by the Payload Converter
and Payload Codec before it reaches the driver.
See the data conversion pipeline for more details.
Return a StorageDriverClaim for each payload with enough information to retrieve it later. The ctx.Target
provides identity information (namespace, Workflow ID) depending on the operation. Use a type switch on
StorageDriverWorkflowInfo and StorageDriverActivityInfo to access the concrete values. Consider structuring
your storage keys to include this information so that you can identify which Workflow owns each payload.
3. Retrieve payloads
In Retrieve(), download the bytes using the claim data, then reconstruct the Payload protobuf message with
proto.Unmarshal(data, payload). The Payload Converter handles deserializing the application data after the driver
returns the payload.
4. Configure the Client
Pass an ExternalStorage struct with your driver in the Client options:
c, err := client.Dial(client.Options{
ExternalStorage: converter.ExternalStorage{
Drivers: []converter.StorageDriver{NewLocalDiskStorageDriver("/tmp/temporal-payload-store")},
},
})
Configure payload size threshold
You can configure the payload size threshold that triggers external storage. By default, payloads larger than 256 KiB
are offloaded to external storage. You can adjust this with the PayloadSizeThreshold option, or set it to 0 to
externalize all payloads regardless of size.
import "go.temporal.io/sdk/converter"
c, err := client.Dial(client.Options{
ExternalStorage: converter.ExternalStorage{
Drivers: []converter.StorageDriver{driver},
PayloadSizeThreshold: 0,
},
})
Use multiple storage drivers
When you register multiple drivers, you must provide a DriverSelector that implements the StorageDriverSelector
interface. The SDK returns an error at client creation if multiple drivers are registered without a selector. The
selector chooses which driver stores each payload. Any driver in the list that is not selected for storing is still
available for retrieval, which is useful when migrating between storage backends. Return nil from the selector to
keep a specific payload inline in Event History.
Multiple drivers are useful in scenarios such as:
- Driver migration. Your Worker needs to retrieve payloads created by clients that use a different driver than the one you prefer. Register both drivers and use the selector to always pick your preferred driver for new payloads. The old driver remains available for retrieving existing claims.
- Latency vs. durability tradeoffs. Some Workflow types may benefit from a faster storage backend at the cost of durability, while others require a durable backend like S3. Use the selector to route based on Workflow context.
The following example registers two drivers but always selects preferredDriver for new payloads. The legacyDriver
is only registered so the Worker can retrieve payloads that were previously stored with it:
import (
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/sdk/converter"
)
type PreferredSelector struct {
preferred converter.StorageDriver
}
func (s *PreferredSelector) SelectDriver(
ctx converter.StorageDriverStoreContext,
payload *commonpb.Payload,
) (converter.StorageDriver, error) {
return s.preferred, nil
}
// Usage:
converter.ExternalStorage{
Drivers: []converter.StorageDriver{preferredDriver, legacyDriver},
DriverSelector: &PreferredSelector{preferred: preferredDriver},
}