reym
Streams

SDK

Freym Streams SDK documentation

Installation

To install the SDK, run the following command:

npm i --save @fraym/streams
go get -u github.com/freym/freym-api/go

Configuration

Using ENV Variables

You can configure the SDK by setting the following environment variables:

Variable NameDefaultDescription
STREAMS_CLIENT_ADDRESSAddress of the streams service
STREAMS_CLIENT_GROUP_ID""The group ID that identifies the application

Then use the configuration like this:

import { getEnvConfig } from "@fraym/streams";

...

const config = getEnvConfig();
import (
    "github.com/Becklyn/go-wire-core/env"
    "github.com/fraym/freym-api/go/streams/config"
)

...

_ = env.New() // this reads your environment variables from a `.env` or `.env.local` file
conf := config.NewEnvConfig()

Using Code Configuration

import { useConfigDefaults } from "@fraym/streams";

...

const config = useConfigDefaults({
    serverAddress: "my.streams.service",
    appPrefix: "my-app",
    deploymentId: 0, // optional, used for filtering events by deployment
    // 0 = no deployment filter
    // > 0 = filter by deployment: you will get events of deploymentIds <= this value>
});
import "github.com/fraym/freym-api/go/streams/config"

...

conf := config.NewDefaultConfig()
conf.Address = "my.streams.service"
conf.AppPrefix = "my-app"

Usage

Initialize the Client

import { newClient } from "@fraym/streams";

...

const client = await newClient({
    serverAddress: "my.sync.service",
    appPrefix: "my-app",
});

...

// use your client here

... 

// this will gracefully end the connection to the streams service 
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
await client.close();
import (
	"github.com/fraym/freym-api/go/streams"
	"github.com/fraym/golog"
)

logger := golog.NewZerologLogger()
client, err := streams.NewClient(logger, conf)
if err != nil {
    logger.Fatal().WithError(err).Write()
}

// this will gracefully end the connection to the streams service 
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
defer client.Close()

// use your client here

Publishing Events

The Streams service distinguishes between two types of events: Persistent Events and Messages.

  • Persistent events are stored in the event store and can be retrieved later.
  • Messages are not stored in the event store and are only used for communication between services. We also refer to messages as broadcast events.

Use messages for real-time communication between services where the event history is not important. Use persistent events to store business-critical data where you want to keep track of the event history.

The Streams service allows you to publish multiple events in a transactional manner. If an error occurs while publishing one of the events, none of the events will be published.

While an event must be associated with a topic, it can also be associated with a stream. This enables event streaming functionality for the event.

await client.publish("topic", [
    {
        tenantId: "tenant-id",
        type: "event-type",
        payload: {
            key: "value",
            otherKey: 123,
        },
    },
]);
import "github.com/fraym/freym-api/go/streams/domain/dto"

// ...

if err := client.Publish(
    context.TODO(),
    "topic",
    []*dto.PublishEvent{
        {
            TenantId: "tenant-id",
            Type: "event-type",
            Payload: dto.PublishEventPayloadMap{
                "key": {Value: "value"},
                "otherKey": {Value: 123},
            },
        }
    }
); err != nil {
    panic(err)
}

Subscribing to Events

The Streams service allows subscriptions to both persistent events and messages. It is possible to create a subscription that listens to multiple topics at once.

If the subscription handler fails to process a persistent event, the event is retried until it is successfully processed. Messages are not retried.

const subscription = client.subscribe(["test"]);
subscription.useHandlerForAllTypes(async e => {
    // handle the event here, throw in case of error
});

subscription.start();

// stop the connection when you are done
subscription.stop();
import "github.com/fraym/freym-api/go/streams/domain/dto"

// ...

subscription := client.NewSubscription([]string{"topic"}, false, 0)
subscription.UseHandlerForAllEventTypes(
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // handle the event here
        return nil
    }
)

subscription.Start()

// stop the connection when you are done
subscription.Stop()

Querying Events

The Streams service provides several APIs to query events:

Get an event by its ID

const event = await client.getEvent( "tenant-id", "topic", "event-id");
event, err := client.GetEvent(context.TODO(), "tenant-id", "topic", "event-id")
if err != nil {
    panic(err)
}

Get the latest event within a topic

const lastEvent = await client.getLastEvent("tenant-id", "topic");
lastEvent, err := client.GetLastEvent(context.TODO(), "tenant-id", "topic")
if err != nil {
    panic(err)
}

Get the last event within a topic that belongs to a given list of event types

const lastEvent = await client.getLastEventByTypes("tenant-id", "topic", ["event-type"]);
lastEvent, err := client.GetLastEventByTypes(
    context.TODO(),
    "tenant-id",
    "topic",
    []string{"event-type"}
)
if err != nil {
    panic(err)
}

Paginate through all events in a topic that belong to a given list of event types

const pageSize = 100; // number of events to fetch per request
await client.iterateAllEvents("tenant-id", "topic", ["event-type"], pageSize, event => {
    // process the event here
});
pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory

isEmpty, err := client.IterateAllEvents(
    context.TODO(),
    "tenant-id",
    "topic",
    []string{"event-type"},
    pageSize,
    bufferSize,
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}

Paginate through all events in a topic that belong to a given list of event types that come after a specific event

const pageSize = 100; // number of events to fetch per request
await client.iterateAllEventsAfterEvent(
    "tenant-id", 
    "topic", 
    ["event-type"], 
    "event-id", 
    pageSize, 
    event => {
        // process the event here
    }
);
pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory

isEmpty, err := client.IterateAllEventsAfterEvent(
    context.TODO(),
    "tenant-id",
    "topic",
    []string{"event-type"},
    "event-id",
    pageSize,
    bufferSize,
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}

Querying Streams

The Streams service provides several APIs to query streams:

Check if a stream is empty

const pageSize = 100; // number of events to fetch per request
const streamIterator = await client.getStreamIterator("topic", "tenant-id", "stream", pageSize);

const isEmpty = await streamIterator.isEmpty();
isEmpty, err := client.IsStreamEmpty(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
)
if err != nil {
    panic(err)
}

Paginate through all events of a stream

const streamIterator = await client.getStreamIterator(
    "topic",
    "tenant-id", 
    "stream", 
    100,// number of events to fetch per request
);

await streamIterator.forEach(event => {
    // process the event here
});
isEmpty, err := client.IterateStream(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
    0, // id of the deployment, 0 ignores the deployment filter
    100, // number of events to fetch per request
    1000, // number of events to buffer in memory
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}

Paginate through all events of a stream that come after a specific event

const streamIterator = await client.getStreamIterator(
    "topic", 
    "tenant-id", 
    "stream",
    100, // number of events to fetch per request
);

await streamIterator.forEachAfterEvent("event-id", event => {
// process the event here
});
isEmpty, err := client.IterateStreamAfterEvent(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
    "event-id",
    0, // id of the deployment, 0 ignores the deployment filter
    100, // number of events to fetch per request
    1000, // number of events to buffer in memory
    func(ctx context.Context, event *dto.SubscriptionEvent) error {
        // process the event here
        return nil
    }
)
if err != nil {
    panic(err)
}

Snapshotting

Snapshotting can be used to optimize performance when reading event streams. When a snapshot event is added to the event stream, all APIs that paginate the stream will not return any events that occurred before the snapshot. Instead, they return the snapshot event and any events that come after the snapshot.

await client.createStreamSnapshot(
    "tenant-id", 
    "topic", 
    "stream", 
    "last-snapshotted-event-id", 
    {
        tenantId: "tenant-id",
        type: "snapshot-event-type",
        payload: {
            field: "value",
        },
    }
);
if err := client.CreateStreamSnapshot(
    context.TODO(),
    "tenant-id",
    "topic",
    "stream",
    "last-snapshotted-event-id",
    &dto.PublishEvent{
        TenantId: "tenant-id",
        Type: "snapshot-event-type",
        Payload: dto.PublishEventPayloadMap{
            "field": {
                Value: "value",
            },
        },
    }
); err != nil {
    panic(err)
}

GDPR

When publishing an event, a payload field can be associated with GDPR logic by specifying a default value for the field. The default will be applied if the field is invalidated.

await client.publish("topic", [
    {
        tenantId: "tenant-id",
        type: "event-type",
        payload: {
            field: {
                value: "value",
                gdprDefault: "default-value",
            },
        },
    },
]);
import "github.com/fraym/streams-go/v10/domain/dto"

// ...

if err := client.Publish(
    context.TODO(),
    "topic",
    []*dto.PublishEvent{
        {
            TenantId: "tenant-id",
            Type: "event-type",
            Payload: dto.PublishEventPayloadMap{
                "field": {
                    Value: "value",
                    Gdpr: &dto.PublishGdprEventPayload{
                        Default: "default-value",
                    }
                },
            },
        }
    }
); err != nil {
    panic(err)
}
await client.introduceGdprOnEventField(
    "tenant-id", 
    "default-value", 
    "topic", 
    "event-id", 
    "field"
);
if err := client.IntroduceGdprOnEventField(
    context.TODO(),
    "tenant-id",
    "topic",
    "event-id",
    "field",
    "default-value"
); err != nil {
    panic(err)
}

When GDPR-relevant data is published, an event of type gdpr-data-recorded is published to the gdpr topic. This event contains all the information needed to build the logic to invalidate the data if necessary. While the CRUD service takes care of automatically invalidating GDPR relevant CRUD data, you need to implement GDPR logic using the events in the gdpr topic for any events that your business logic uses.

The invalidation API ensures that the GDPR relevant data is deleted and replaced with the default value. After the invalidation, there is no way to recover the original data as any trace of it is removed.

await client.invalidateGdprData("tenant-id","topic", "gdpr-id");
if err := client.InvalidateGdprData(
    context.TODO(),
    "tenant-id",
    "topic",
    "gdpr-id"
); err != nil {
    panic(err)
}

Renaming of event types

If you need to rename an event type, you can use the renameEventType API. This is an operation where no data is getting lost as the the event type is simply renamed.

await client.renameEventType("topic", "old-event-type", "new-event-type");
if err := client.RenameEventType(
    context.TODO(),
    "topic",
    "old-event-type",
    "new-event-type",
); err != nil {
    panic(err)
}