Your NATS server must be configured to allow jetstreams before initiation. Please see the documentation for more details.

By configuring a stream and consumer, messages that are sent while a subscription is inactive can be consumed at a later time, e.g., when the subscription is restarted. This can help to prevent the loss of messages.

Event-Driven Graph schema configuration

The @edfs__natsSubscribe directive defines an optional streamConfiguration argument. Providing this input object allows the configuration of a stream and consumer on a NATS connection.

Note that if the streamConfiguration argument is undefined, the connection will be interpreted not to use a stream/consumer. If defined, all input object fields are required:

Input nameTypeValue
consumerInactiveThresholdInt!The inactive threshold of the consumer in seconds. The default value is 30 seconds.
consumerNameString!The name of the consumer.
streamNameString!The name of the stream.

NATS configuration

Stream

The stream name that is supplied to the streamName input must already be configured by your NATS server. If the relevant stream for a stream-reliant subscription cannot be found, an error will be returned.

Please consult the following documentation on creating a stream (and ensure your NATS server has been configured to allow jetstreams):

https://docs.nats.io/nats-concepts/jetstream/streams

Typically, this can be achieved using the NATS cli and following the prompts:

nats str add stream-name

Consumer

As long as the stream exists, when a subscription is started, the router will attempt to identify an existing consumer. This identification is the name (supplied by the consumerName input), followed by a hash of the subjects, and the host name and listen address of the router.

If a consumer is not found, the router will create a new durable consumer with the same name.

By default, consumers will automatically be deleted after 30 seconds of inactivity. Inactivity is classed as the lack of a running subscription. You can change this value by specifying the value for the consumerInactiveThreshold input in the stream configuration. If you set the consumerInactiveThreshold to 0 (or any negative value), the consumer will never be deleted.

If you wish for your consumer to have a timeout threshold, please configure accordingly using the following documentation:

https://docs.nats.io/nats-concepts/jetstream/consumers

https://docs.nats.io/nats-concepts/jetstream/consumers/example_configuration

Typically, this can be achieved using the NATS cli and following the prompts:

nats consumer add stream-name consumer-name

Example

In the example below, the NATS provider my-nats has also defined a stream configuration. The streamName input has been set to “myStream”, and the consumerName input has been set to myConsumer.

directive @edfs__natsSubscribe(
  subjects: [String!]!,
  providerId: String! = "default",
  streamConfiguration: edfs__NatsStreamConfiguration
) on FIELD_DEFINITION

input edfs__NatsStreamConfiguration {
    consumerInactiveThreshold: Int! = 30
    consumerName: String!
    streamName: String!
}

type Subscription {
    userUpdated(id: Int!): User! @edfs__natsSubscribe(
        subjects: ["user.{{ args.id }}"],
        providerId: "my-nats",
        streamConfiguration: { consumerName: "myConsumer", streamName: "myStream" },
    )
}

type User @key(fields: "id", resolvable: false) {
    id: Int! @external
}

Was this page helpful?