
Event-Driven Federated Subscriptions (EDFS) Architecture
Intro
- Subscriptions can only have one single root field
- Subgraphs should be stateless
- Maintaining 3 WebSocket Connections per client is a waste of resources
employeeUpdated field marks the root of a Subscription. With classic Federation, we’d be forced to implement this root Subscription field on a particular Subgraph. This is not ideal, because it ties the ownership of the field to one single Subgraph. If two Subgraphs contribute fields to the Employee type, which is usually the case in federated Graphs, we’d have to communicate across Subgraphs to trigger a Subscription.
In addition to the first problem, Subscriptions also make Subgraphs stateful. Each time a client connects to the Router via WebSockets, the Router has to open another WebSocket connection to the Subgraph. This means that you’re not able to use Serverless infrastructure for your Subgraphs. In addition, the deployment and maintenance simply become more complex as you have to manage a lot of open connections.
Furthermore, classic Subscriptions with Federation are quite expensive when it comes to Memory usage. When a client wants to use Subscriptions, it opens up a WebSocket connection to the Router. The Router then opens a second WebSocket Connection to the Subgraph. The Subgraph itself has to maintain another Connection. If we don’t count the client itself, that totals 3 WebSocket connections per client. Depending on the programming language and framework being used, one connection can cost multiple Megabytes of Memory, making this solution not very scalable. Imagine we had 10k clients connected, this would cost 30GB of memory if each WebSocket connection costs us 1MB of memory.
Specification
Enter Cosmo Streams, a simple way to scale Federated Subscriptions in a resource-efficient manner. Cosmo Streams supports three event providers: Each provider consists of at least Publish and Subscribe capabilities. For NATS, we also provide Request/Reply semantics, and streaming capabilities can be configured in the second argument. This enables JetStream, which adds new functionalities and higher qualities of service on top of the base “Core NATS” features. Our goal is to integrate with various technologies rather than agree on a single unified interface. This approach allows us to leverage the strengths of each technology. This philosophy is reflected in how we structure the directives, naming parameters, exposing features as they would appear in their respective ecosystems. Here is an overview about all Cosmo Streams directives:@edfs__natsRequest directive is a specific NATS directive to extend a Graph through an Event Source. It makes a request to a NATS subject and waits synchronously of the response. Under the hood it uses Request/Reply semantics from NATS.
The @edfs__natsPublish, @edfs__kafkaPublish, and @edfs__redisPublish directive allows you to publish an event through a Mutation.
Using the @edfs__natsSubscribe, @edfs__kafkaSubscribe and @edfs__redisSubscribe directives, you can create a Subscription to the corresponding message bus. By default, all the provider implementations are stateless, meaning every client receives the same events in a broadcast fashion. This behavior can be adjusted. NATS allows you to create a consumer group, resulting in multiple independent streams of the subject, where each client can consume events at their own pace.
The @openfed__subscriptionFilter directive allows you to filter subscription messages based on specified conditions. For more information see Subscription Filter.
An Event-Driven Subgraph does not need to be implemented. It is simply a Subgraph Schema that instructs the Router on how to connect specific root fields to the Event Source. Scroll down for an example.
Prerequisites
To use Cosmo Streams, you need to have an Event Source running and connected to the Router. Currently, we support NATS, Kafka, and Redis. For simplicity, NATS is used to explain the examples. To get started, run a NATS instance and add the following configuration to yourconfig.yaml Router Configuration:
make edfs-demo in the Cosmo Monorepo, you’ll automatically get a NATS instance running on the default port (4222) using Docker.
Example Configuration
Below, you’ll find an example Schema that use the NATS provider directives to connect the@edfs__natsRequest directive to a Query root field (employeeFromEvent), a Mutation root field (updateEmployee) that’s connected to another topic using @edfs__natsPublish and a Subscription root field (employeeUpdated) that’s connected via @edfs__natsSubscribe. Each of these fields is completely independent. Important to notice is that you can’t implement this subgraph because the engine will implement the resolvers based on the router configuration.
Semantics
The “subjects” Argument
The subjects/topics/channels argument of all events Directives allows you to use templating Syntax to use an argument to render the topic. Given the following Schema:Request/Reply
The@edfs_natsRequest directive creates a response topic (internally) and sends the JSON representation of all arguments to the topic specified in the topic argument. The Router then waits synchronously on the response topic for the result. The Router expects all fields to be part of the response that are defined in the Entity type in this Schema, as well as the __typename field to identify the Entity. In the example, the Employee Entity contains an `id` field, so the following response would be valid:
Publish
The@edfs_natsPublish, @edfs_kafkaPublish and @edfs__redisPublish directive sends a JSON representation of all arguments, including arguments being used to render the topic, to the rendered topic. Fields using the eventsPublish directive MUST return the type PublishEventResult with one single field success of type Boolean!, indicating whether publishing the event was successful or not.
Given that we send the following Mutation:
updateEmployee.1:
Subscribe
Given the following Subscription:employeeUpdated.1 and waits for the next message to be published. All fields that are defined in the response entity MUST be sent to the topic to be valid. Additional fields that are not part of this “Events Subgraph” will be resolved by the Router. In addition, it is required to send the __typename field to identify the Entity.
Here’s an example of a valid message:
__typename field is missing:
__typename field because this allows Cosmo Streams to also work for Union and Interface types.
It’s worth noting that the Router will not send any responses before you publish a message on the topic. If you need the most recent result, first make a Query, and then subscribe to the Topic. The Router will send the first response only after a message is published on the rendered topic.
Subscription Filter
The@openfed__subscriptionFilter directive allows you to filter subscription messages based on specified conditions. These conditions are defined as an argument on the directive. You can also nest conditions for negations or OR queries. This directive was developed due to limitations in Kafka, which does not recommend dynamic topic creation at runtime. To segment the event stream into different pieces, a user can express their interest using input arguments, whose values can be used to implement a dynamic filter at runtime.
Given the following subscription:
employeeUpdated and employeeUpdatedTwo only when the payload id matches the values of the IN filter condition.
Variable expansion
You can also use variable expansion to use input arguments in the filter conditions: To illustrate the use case, we do the following query:firstIds and secondIds into a single array, which is then used for the IN check.
We do not validate whether a client is allowed to subscribe to specific events or topics. If this is a blocker for you, please contact us. We have ideas on how to address this issue.
Limitations
To use the current implementation of subscription filters with variables, you’ll have to disable variables remapping.By default, a subscription like the following won’t work, because
filteredEmployeeUpdated is using a filter on firstIds and secondIds that are values resolved using variables: