Stream and consumer configuration

Configuring a stream and consumer for a NATS provider.

Your NATS server must be configured to allow jetstreams before initiation. Please see the documentation for more details.

By configuring a stream and consumer, messages that are sent while a subscription is inactive can be consumed at a later time, e.g., when the subscription is restarted. This can help to prevent the loss of messages.

Event-Driven Graph schema configuration

The @edfs__subscribe directive defines an optional streamConfiguration argument. Providing this input object allows the configuration of a stream and consumer on a NATS connection.

Note that if the streamConfiguration argument is undefined, the connection will be interpreted not to use a stream/consumer. If defined, all input object fields are required:

Input nameTypeValue

consumerName

String!

The name of the consumer

streamName

String!

The name of the stream

NATS configuration

Stream

The stream name that is supplied to the streamName input must already be configured by your NATS server. If the relevant stream for a stream-reliant subscription cannot be found, an error will be returned.

Please consult the following documentation on creating a stream (and ensure your NATS server has been configured to allow jetstreams): https://docs.nats.io/nats-concepts/jetstream/streams

Typically, this can be achieved using the NATS cli and following the prompts:

nats str add stream-name

Consumer

As long as the stream exists, when a subscription is started, the router will first attempt to fetch a consumer of the same name supplied to the consumerName input. If a consumer is not found, the router will create a new durable consumer of the same name. Durable consumers will persist and not timeout.

If you wish for your consumer to have a timeout threshold, please configure accordingly using the following documentation: https://docs.nats.io/nats-concepts/jetstream/consumers https://docs.nats.io/nats-concepts/jetstream/consumers/example_configuration

Typically, this can be achieved using the NATS cli and following the prompts:

nats consumer add stream-name consumer-name

Example

In the example below, the NATS connection source "my-nats" has also defined a stream configuration. The streamName input has been set to "myStream", and the consumerName input has been set to "myConsumer".

directive @key(fields: openfed__FieldSet!, resolvable: Boolean = true) repeatable on INTERFACE | OBJECT
directive @keydirective @edfs__subscribe(
    subjects: [String!]!, sourceName: String! = "default", streamConfiguration: edfs__StreamConfiguration,
) on FIELD_DEFINITION

input edfs__StreamConfiguration {
    consumerName: String!
    streamName: String!
}

type Subscription {
    userUpdated(id: Int!): User! @edfs__subscribe(
        subjects: ["user.{{ args.id }}"],
        sourceName: "my-nats",
        streamConfiguration: { consumerName: "myConsumer", streamName: "myStream" },
    )
}

type User @key(fields: "id", resolvable: false) {
    id: Int! @external
}

Last updated