SDK
Freym Streams SDK documentation
Installation
To install the SDK, run the following command:
npm i --save @fraym/streams
go get -u github.com/freym/freym-api/go
Configuration
Using ENV Variables
You can configure the SDK by setting the following environment variables:
Variable Name | Default | Description |
---|---|---|
STREAMS_CLIENT_ADDRESS | Address of the streams service | |
STREAMS_CLIENT_GROUP_ID | "" | The group ID that identifies the application |
Then use the configuration like this:
import { getEnvConfig } from "@fraym/streams";
...
const config = getEnvConfig();
import (
"github.com/Becklyn/go-wire-core/env"
"github.com/fraym/freym-api/go/streams/config"
)
...
_ = env.New() // this reads your environment variables from a `.env` or `.env.local` file
conf := config.NewEnvConfig()
Using Code Configuration
import { useConfigDefaults } from "@fraym/streams";
...
const config = useConfigDefaults({
serverAddress: "my.streams.service",
appPrefix: "my-app",
deploymentId: 0, // optional, used for filtering events by deployment
// 0 = no deployment filter
// > 0 = filter by deployment: you will get events of deploymentIds <= this value>
});
import "github.com/fraym/freym-api/go/streams/config"
...
conf := config.NewDefaultConfig()
conf.Address = "my.streams.service"
conf.AppPrefix = "my-app"
Usage
Initialize the Client
import { newClient } from "@fraym/streams";
...
const client = await newClient({
serverAddress: "my.sync.service",
appPrefix: "my-app",
});
...
// use your client here
...
// this will gracefully end the connection to the streams service
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
await client.close();
import (
"github.com/fraym/freym-api/go/streams"
"github.com/fraym/golog"
)
logger := golog.NewZerologLogger()
client, err := streams.NewClient(logger, conf)
if err != nil {
logger.Fatal().WithError(err).Write()
}
// this will gracefully end the connection to the streams service
// ensuring all subscriptions that are still active are closed without needing to wait for a connection timeout
// call this before your service terminates
defer client.Close()
// use your client here
Publishing Events
The Streams service distinguishes between two types of events: Persistent Events and Messages.
- Persistent events are stored in the event store and can be retrieved later.
- Messages are not stored in the event store and are only used for communication between services. We also refer to messages as broadcast events.
Use messages for real-time communication between services where the event history is not important. Use persistent events to store business-critical data where you want to keep track of the event history.
The Streams service allows you to publish multiple events in a transactional manner. If an error occurs while publishing one of the events, none of the events will be published.
While an event must be associated with a topic, it can also be associated with a stream. This enables event streaming functionality for the event.
await client.publish("topic", [
{
tenantId: "tenant-id",
type: "event-type",
payload: {
key: "value",
otherKey: 123,
},
},
]);
import "github.com/fraym/freym-api/go/streams/domain/dto"
// ...
if err := client.Publish(
context.TODO(),
"topic",
[]*dto.PublishEvent{
{
TenantId: "tenant-id",
Type: "event-type",
Payload: dto.PublishEventPayloadMap{
"key": {Value: "value"},
"otherKey": {Value: 123},
},
}
}
); err != nil {
panic(err)
}
Subscribing to Events
The Streams service allows subscriptions to both persistent events and messages. It is possible to create a subscription that listens to multiple topics at once.
If the subscription handler fails to process a persistent event, the event is retried until it is successfully processed. Messages are not retried.
const subscription = client.subscribe(["test"]);
subscription.useHandlerForAllTypes(async e => {
// handle the event here, throw in case of error
});
subscription.start();
// stop the connection when you are done
subscription.stop();
import "github.com/fraym/freym-api/go/streams/domain/dto"
// ...
subscription := client.NewSubscription([]string{"topic"}, false, 0)
subscription.UseHandlerForAllEventTypes(
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// handle the event here
return nil
}
)
subscription.Start()
// stop the connection when you are done
subscription.Stop()
Querying Events
The Streams service provides several APIs to query events:
Get an event by its ID
const event = await client.getEvent( "tenant-id", "topic", "event-id");
event, err := client.GetEvent(context.TODO(), "tenant-id", "topic", "event-id")
if err != nil {
panic(err)
}
Get the latest event within a topic
const lastEvent = await client.getLastEvent("tenant-id", "topic");
lastEvent, err := client.GetLastEvent(context.TODO(), "tenant-id", "topic")
if err != nil {
panic(err)
}
Get the last event within a topic that belongs to a given list of event types
const lastEvent = await client.getLastEventByTypes("tenant-id", "topic", ["event-type"]);
lastEvent, err := client.GetLastEventByTypes(
context.TODO(),
"tenant-id",
"topic",
[]string{"event-type"}
)
if err != nil {
panic(err)
}
Paginate through all events in a topic that belong to a given list of event types
const pageSize = 100; // number of events to fetch per request
await client.iterateAllEvents("tenant-id", "topic", ["event-type"], pageSize, event => {
// process the event here
});
pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory
isEmpty, err := client.IterateAllEvents(
context.TODO(),
"tenant-id",
"topic",
[]string{"event-type"},
pageSize,
bufferSize,
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}
Paginate through all events in a topic that belong to a given list of event types that come after a specific event
const pageSize = 100; // number of events to fetch per request
await client.iterateAllEventsAfterEvent(
"tenant-id",
"topic",
["event-type"],
"event-id",
pageSize,
event => {
// process the event here
}
);
pageSize := 100 // number of events to fetch per request
bufferSize := 1000 // number of events to buffer in memory
isEmpty, err := client.IterateAllEventsAfterEvent(
context.TODO(),
"tenant-id",
"topic",
[]string{"event-type"},
"event-id",
pageSize,
bufferSize,
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}
Querying Streams
The Streams service provides several APIs to query streams:
Check if a stream is empty
const pageSize = 100; // number of events to fetch per request
const streamIterator = await client.getStreamIterator("topic", "tenant-id", "stream", pageSize);
const isEmpty = await streamIterator.isEmpty();
isEmpty, err := client.IsStreamEmpty(
context.TODO(),
"tenant-id",
"topic",
"stream",
)
if err != nil {
panic(err)
}
Paginate through all events of a stream
const streamIterator = await client.getStreamIterator(
"topic",
"tenant-id",
"stream",
100,// number of events to fetch per request
);
await streamIterator.forEach(event => {
// process the event here
});
isEmpty, err := client.IterateStream(
context.TODO(),
"tenant-id",
"topic",
"stream",
0, // id of the deployment, 0 ignores the deployment filter
100, // number of events to fetch per request
1000, // number of events to buffer in memory
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}
Paginate through all events of a stream that come after a specific event
const streamIterator = await client.getStreamIterator(
"topic",
"tenant-id",
"stream",
100, // number of events to fetch per request
);
await streamIterator.forEachAfterEvent("event-id", event => {
// process the event here
});
isEmpty, err := client.IterateStreamAfterEvent(
context.TODO(),
"tenant-id",
"topic",
"stream",
"event-id",
0, // id of the deployment, 0 ignores the deployment filter
100, // number of events to fetch per request
1000, // number of events to buffer in memory
func(ctx context.Context, event *dto.SubscriptionEvent) error {
// process the event here
return nil
}
)
if err != nil {
panic(err)
}
Snapshotting
Snapshotting can be used to optimize performance when reading event streams. When a snapshot event is added to the event stream, all APIs that paginate the stream will not return any events that occurred before the snapshot. Instead, they return the snapshot event and any events that come after the snapshot.
await client.createStreamSnapshot(
"tenant-id",
"topic",
"stream",
"last-snapshotted-event-id",
{
tenantId: "tenant-id",
type: "snapshot-event-type",
payload: {
field: "value",
},
}
);
if err := client.CreateStreamSnapshot(
context.TODO(),
"tenant-id",
"topic",
"stream",
"last-snapshotted-event-id",
&dto.PublishEvent{
TenantId: "tenant-id",
Type: "snapshot-event-type",
Payload: dto.PublishEventPayloadMap{
"field": {
Value: "value",
},
},
}
); err != nil {
panic(err)
}
GDPR
When publishing an event, a payload field can be associated with GDPR logic by specifying a default value for the field. The default will be applied if the field is invalidated.
await client.publish("topic", [
{
tenantId: "tenant-id",
type: "event-type",
payload: {
field: {
value: "value",
gdprDefault: "default-value",
},
},
},
]);
import "github.com/fraym/streams-go/v10/domain/dto"
// ...
if err := client.Publish(
context.TODO(),
"topic",
[]*dto.PublishEvent{
{
TenantId: "tenant-id",
Type: "event-type",
Payload: dto.PublishEventPayloadMap{
"field": {
Value: "value",
Gdpr: &dto.PublishGdprEventPayload{
Default: "default-value",
}
},
},
}
}
); err != nil {
panic(err)
}
await client.introduceGdprOnEventField(
"tenant-id",
"default-value",
"topic",
"event-id",
"field"
);
if err := client.IntroduceGdprOnEventField(
context.TODO(),
"tenant-id",
"topic",
"event-id",
"field",
"default-value"
); err != nil {
panic(err)
}
When GDPR-relevant data is published, an event of type gdpr-data-recorded
is published to the gdpr
topic.
This event contains all the information needed to build the logic to invalidate the data if necessary.
While the CRUD service takes care of automatically invalidating GDPR relevant CRUD data,
you need to implement GDPR logic using the events in the gdpr
topic for any events that your business logic uses.
The invalidation API ensures that the GDPR relevant data is deleted and replaced with the default value. After the invalidation, there is no way to recover the original data as any trace of it is removed.
await client.invalidateGdprData("tenant-id","topic", "gdpr-id");
if err := client.InvalidateGdprData(
context.TODO(),
"tenant-id",
"topic",
"gdpr-id"
); err != nil {
panic(err)
}
Renaming of event types
If you need to rename an event type, you can use the renameEventType
API.
This is an operation where no data is getting lost as the the event type is simply renamed.
await client.renameEventType("topic", "old-event-type", "new-event-type");
if err := client.RenameEventType(
context.TODO(),
"topic",
"old-event-type",
"new-event-type",
); err != nil {
panic(err)
}