Event-Driven Federated Subscriptions (EDFS)

EDFS combines the power of GraphQL Federation and Event-Driven Architecture (Kafka, NATS, SQS) to update a user GraphQL Subscription after an event occurs in your system.

Event Driven Federated Subscriptions (EDFS) solves 3 major problems when it comes to GraphQL Federation and Subscriptions by directly connecting the Router to an event source like Kafka, NATS, SQS and making it a part of the Graph.

Intro

  1. Subscriptions can only have one single root field

  2. Subgraphs should be stateless

  3. Maintaining 3 WebSocket Connections per client is a waste of resources

Let me explain the three Problems:

type Subscription {
    employeeUpdated(employeeID: ID!): Employee!
}

type Employee {
    id: ID!
    name: String!
}

As you can see, the employeeUpdated field marks the root of a Subscription. With classic Federation, we'd be forced to implement this root Subscription field on a particular Subgraph. This is not ideal, because it ties the ownership of the field to one single Subgraph. If two Subgraphs contribute fields to the Employee type, which is usually the case in federated Graphs, we'd have to communicate across Subgraphs to trigger a Subscription.

In addition to the first problem, Subscriptions also make Subgraphs stateful. Each time a client connects to the Router via WebSockets, the Router has to open another WebSocket connection to the Subgraph. This means that you're not able to use Serverless infrastructure for your Subgraphs. In addition, the deployment and maintenance simply become more complex as you have to manage a lot of open connections.

Furthermore, classic Subscriptions with Federation are quite expensive when it comes to Memory usage. When a client wants to use Subscriptions, it opens up a WebSocket connection to the Router. The Router then opens a second WebSocket Connection to the Subgraph. The Subgraph itself has to maintain another Connection. If we don't count the client itself, that totals 3 WebSocket connections per client. Depending on the programming language and framework being used, one connection can cost multiple Megabytes of Memory, making this solution not very scalable. Imagine we had 10k clients connected, this would cost 30GB of memory if each WebSocket connection costs us 1MB of memory.

Specification

Enter Event-Driven Federated Subscriptions, a simple way to scale Federated Subscriptions in a resource-efficient manner.

EDFS supports two event providers:

Each provider consists of at least Publish and Subscribe capabilities. For NATS, we also provide Request/Reply semantics, and streaming capabilities can be configured in the second argument. This enables JetStream, which adds new functionalities and higher qualities of service on top of the base "Core NATS" features.

Our goal is to integrate with various technologies rather than agree on a single unified interface. This approach allows us to leverage the strengths of each technology. This philosophy is reflected in how we structure the directives, naming parameters, exposing features as they would appear in their respective ecosystems.

Here is an overview about all EDFS directives:

# Nats and JetStream integration
directive @edfs__natsRequest(subject: String!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__natsPublish(subject: String!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__natsSubscribe(subjects: [String!]!, providerId: String! = "default", streamConfiguration: edfs__NatsStreamConfiguration) on FIELD_DEFINITION
# Kafka integration
directive @edfs__kafkaPublish(topic: String!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__kafkaSubscribe(topics: [String!]!, providerId: String! = "default") on FIELD_DEFINITION
# OpenFederation directive to filter subscription events
directive @openfed__subscriptionFilter(condition: openfed__SubscriptionFilterCondition!) on FIELD_DEFINITION

Let's explain each directive in detail:

The @edfs__natsRequest directive is a specific NATS directive to extend a Graph through an Event Source. It makes a request to a NATS subject and waits synchronously of the response. Under the hood it uses Request/Reply semantics from NATS.

The @edfs__natsPublish and @edfs__kafkaPublish directive allows you to publish an event through a Mutation.

Using the @edfs__natsSubscribe and @edfs__kafkaSubscribe directives, you can create a Subscription to the corresponding message bus. By default, both provider implementations are stateless, meaning every client receives the same events in a broadcast fashion. This behavior can be adjusted. NATS allows you to create a consumer group, resulting in multiple independent streams of the subject, where each client can consume events at their own pace.

The @openfed__subscriptionFilter directive allows you to filter subscription messages based on specified conditions. For more information see Subscription Filter.

An Event-Driven Subgraph does not need to be implemented. It is simply a Subgraph Schema that instructs the Router on how to connect specific root fields to the Event Source. Scroll down for an example.

Prerequisites

To use EDFS, you need to have an Event Source running and connected to the Router. Currently, we support NATS and Kafka. For simplicity, NATS is used to explain the examples.

To get started, run a NATS instance and add the following configuration to your config.yaml Router Configuration:

config.yaml
events:
  providers:
    nats:
      - id: default
        url: "nats://localhost:4222"
        authentication: 
          token: "my-token" # or
          user_info: 
            username: "my-username"
            password: "my-password"

We've intentionally moved this part of the configuration out of the Schema to keep the directives clean and for security reasons. Additionally, separating the implementation details of the Event Source from the Schema maintains a clear separation of concerns. Infrastructure teams can focus on Event Sources and their configuration, while API Developers can concentrate on using the Event Source and connecting Event Topics to their Subgraph Schema.

If you run make edfs-demo in the Cosmo Monorepo, you'll automatically get a NATS instance running on the default port (4222) using Docker.

Example Configuration

Below, you'll find an example Schema that use the NATS provider directives to connect the @edfs__natsRequest directive to a Query root field (employeeFromEvent), a Mutation root field (updateEmployee) that's connected to another topic using @edfs__natsPublish and a Subscription root field (employeeUpdated) that's connected via @edfs__natsSubscribe. Each of these fields is completely independent. Important to notice is that you can't implement this subgraph because the engine will implement the resolvers based on the router configuration.

directive @edfs__natsRequest(subject: String!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__natsPublish(subject: String!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__natsSubscribe(subjects: [String!]!, providerId: String! = "default", streamConfiguration: edfs__NatsStreamConfiguration) on FIELD_DEFINITION

input edfs__NatsStreamConfiguration {
    consumerName: String!
    streamName: String!
}

type PublishEventResult {
    success: Boolean!
}

type Query {
    employeeFromEvent(id: ID!): Employee! @edfs__natsRequest(subject: "getEmployee.{{ args.id }}")
}

input UpdateEmployeeInput {
    name: String
    email: String
}

type Mutation {
    updateEmployee(id: ID!, update: UpdateEmployeeInput!): PublishEventResult! @edfs__natsPublish(subject: "updateEmployee.{{ args.id }}")
}

type Subscription {
    employeeUpdated(employeeID: ID!): Employee! @edfs__natsSubscribe(subjects: ["employeeUpdated.{{ args.employeeID }}"])
}

type Employee @key(fields: "id", resolvable: false) {
  id: Int! @external
}

Semantics

The "subjects" Argument

The subjects/topics argument of all events Directives allows you to use templating Syntax to use an argument to render the topic.

Given the following Schema:

type Subscription {
    employeeUpdated(employeeID: ID!): Employee! @edfs__natsSubscribe(subjects: ["employeeUpdated.{{ args.employeeID }}"])
}

If we send a Subscription with the `employeeID` argument `1`, the topic would render as "employeeUpdated.1".

Variable expansion is only supported for NATS on the subject/topic argument.

Request/Reply

The @edfs_natsRequest directive creates a response topic (internally) and sends the JSON representation of all arguments to the topic specified in the topic argument. The Router then waits synchronously on the response topic for the result. The Router expects all fields to be part of the response that are defined in the Entity type in this Schema, as well as the __typename field to identify the Entity. In the example, the Employee Entity contains an `id` field, so the following response would be valid:

valid.json
{"__typename": "Employee", "id":1}

The following response is invalid:

invalid.json
{"id":1}

Once the initial result is coming back from the "Event Subgraph", the Router is capable of extending the response with fields from other "regular" Subgraphs.

Publish

The @edfs_natsPublish and @edfs_kafkaPublish directive sends a JSON representation of all arguments, including arguments being used to render the topic, to the rendered topic. Fields using the eventsPublish directive MUST return the type PublishEventResult with one single field success of type Boolean!, indicating whether publishing the event was successful or not.

Given that we send the following Mutation:

mutation UpdateEmployee($id: ID!, $update: UpdateEmployeeInput!) {
    updateEmployee(id: $id, update: $update){
        id
    }
}

...with the following Variables JSON:

{"id":1, "update": {"name": "Jannik", "email": "jannik@neuse.de"}}

...the Router would send the following JSON object to the topic updateEmployee.1:

{"id":1, "update": {"name": "Jannik", "email": "jannik@neuse.de"}}

Subscribe

Given the following Subscription:

subscription EmployeeUpdates($id: ID!){
    employeeUpdated(employeeID: $id){
        id
        details {
            forename
            surname
        }
    }
}

...with the following Variables JSON:

{"id": 1}

The Router connects to the topic employeeUpdated.1 and waits for the next message to be published. All fields that are defined in the response entity MUST be sent to the topic to be valid. Additional fields that are not part of this "Events Subgraph" will be resolved by the Router. In addition, it is required to send the __typename field to identify the Entity.

Here's an example of a valid message:

valid.json
{"__typename": "Employee", "id: 1}

Here's an invalid message as the __typename field is missing:

invalid.json
{"id: 1}

It's important to send the __typename field because this allows EDFS to also work for Union and Interface types.

It's worth noting that the Router will not send any responses before you publish a message on the topic. If you need the most recent result, first make a Query, and then subscribe to the Topic. The Router will send the first response only after a message is published on the rendered topic.

Subscription Filter

The @openfed__subscriptionFilter directive allows you to filter subscription messages based on specified conditions. These conditions are defined as an argument on the directive. You can also nest conditions for negations or OR queries. This directive was developed due to limitations in Kafka, which does not recommend dynamic topic creation at runtime. To segment the event stream into different pieces, a user can express their interest using input arguments, whose values can be used to implement a dynamic filter at runtime.

Given the following subscription:

scalar openfed__SubscriptionFilterValue

input openfed__SubscriptionFieldCondition {
    fieldPath: String!
    values: [openfed__SubscriptionFilterValue]!
}

input openfed__SubscriptionFilterCondition {
    AND: [openfed__SubscriptionFilterCondition!]
    IN: openfed__SubscriptionFieldCondition
    NOT: openfed__SubscriptionFilterCondition
    OR: [openfed__SubscriptionFilterCondition!]
}

type Subscription {
    filteredEmployeeUpdated(id: Int!): Employee!
      @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
      @openfed__subscriptionFilter(condition: { IN: { fieldPath: "id", values: [1, 3, 4, 7, 11] } })
}

and a user who subscribes to the following:

subscription {
    filteredEmployeeUpdated(employeeID: 1) {
        id
        details {
            forename
            surname
        }
    }
}

will receive Kafka events published on the topics employeeUpdated and employeeUpdatedTwo only when the payload id matches the values of the IN filter condition.

filteredEmployeeUpdated(firstIds: [ID!]!, secondIds: [ID!]!): Employee!
        @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
        @openfed__subscriptionFilter(condition: { IN: { fieldPath: "id", values: ["{{ args.firstIds }}", "{{ args.secondIds }}"] } })

Variable expansion

You can also use variable expansion to use input arguments in the filter conditions:

To illustrate the use case, we do the following query:

subscription {
    filteredEmployeeUpdated(firstIds: [1, 12], secondIds: [2, 11]) {
        id
        details {
            forename
            surname
        }
    }
}

This will combine both arguments firstIds and secondIds into a single array, which is then used for the IN check.

We do not validate whether a client is allowed to subscribe to specific events or topics. If this is a blocker for you, please contact us. We have ideas on how to address this issue.

Implementation Details and Noteworthy Information

Deduplication of Subscriptions

The Cosmo Router deduplicates Subscriptions internally to save resources. If multiple Subscriptions use the same topic as a trigger, all Subscriptions share the same trigger. The trigger is shut down when all Subscriptions that depend on it are unsubscribed. Internal resources, such as clients, are also cleaned up when no subscriptions are being served anymore.

Stateless-ness of Subgraphs

With EDFS, the Router connects directly to the Event Source but doesn't require any stateful connections, e.g. WebSocket, to the Subgraphs. This makes the Subgraphs much simpler to reason about and easier to deploy. Serverless deployment options usually have limitations on request length. With an Event Broker in the middle, Subgraphs can be stateless without having to give up on Subscriptions.

Efficiency, CPU & Memory Consumption (Epoll/Kqueue)

EDFS is built on top of Event-Driven principles, which means that the implementation is non-blocking, as CPU efficient as possible, and has a very low memory footprint.

We're using Epoll and Kqueue on Systems that support it (Linux, Darwin, etc.) to be as efficient as possible.

To give you some numbers, 10.000 clients connected to one Router consume ~150-200MB of Memory and have 0% CPU usage when idle (not publishing any messages). In addition, these 10k clients when idle require ~40 goroutines, e.g. for thread pools, etc...

The Router supports multi-core out of the box and is capable of scaling up to a multitude of 10k events per second published.

Publish Events from any System, not just Subgraphs

It's worth noting that publishing Entity update Events is not limited to just Subgraphs. EDFS is designed to fully decouple the API Consumer from the implementation of the Event-Driven Architecture.

A client can create a Job via a Mutation and Subscribe to the Job state via EDFS. Next, the Mutation can kick off a long-running process that will be handled by one or many systems in the background. At each step, e.g. when an increment of work is done, each subsystem can publish a message to indicate that the state of an Entity has changed.

Once the message is published by one of the sub-systems, the Router can Query all Subgraphs to resolve the current state of the Job.

With EDFS, each Subgraph can add fields to an Entity that it's responsible for and publish events to the Message Broker when a long-running Operation updates the overall state of an Entity.

Last updated