Skip to main content
SubscriptionOnCreate is an experimental hook. Its signature and behaviour may change without prior notice.
The SubscriptionOnCreate handler is a custom module hook that runs once per subscription, before the subscription is registered with the message broker. It gives you access to the subscription event configuration and lets you modify it in place. This handler is useful for:
  • Dynamic subject routing: Override NATS subjects, Redis channels or Kafka topics based on custom logic
  • Per-request configuration: Override provider-level defaults for individual subscriptions

Limitations

Compared to SubscriptionOnStart, this handler:
  • Cannot emit events to the client
  • Only applies to Cosmo Streams / EDFS subscriptions, not to regular GraphQL subscriptions

Handler Interface

Implement SubscriptionOnCreateHandler in a Custom Module to use this hook.
type SubscriptionOnCreateHandler interface {
    // SubscriptionOnCreate is called once before the subscription is registered.
    // Mutations to the SubscriptionEventConfiguration take effect immediately.
    // Returning a non-nil error aborts the subscription.
    //
    // This method is currently EXPERIMENTAL.
    // The signature and behaviour might change without prior notice.
    SubscriptionOnCreate(ctx SubscriptionOnCreateHandlerContext) error
}

type SubscriptionOnCreateHandlerContext interface {
    // Request is the original request received by the router.
    Request() *http.Request
    // Logger is the logger for the request.
    Logger() *zap.Logger
    // Operation is the GraphQL operation.
    Operation() OperationContext
    // Authentication is the authentication for the request.
    Authentication() authentication.Authentication
    // SubscriptionEventConfiguration returns the current subscription event configuration.
    // The returned value is a pointer; mutating it directly takes effect without any additional call.
    //
    // This method is currently EXPERIMENTAL.
    // The signature and behaviour might change without prior notice.
    SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
}

Modifying the Configuration

SubscriptionEventConfiguration is an interface. To access provider-specific fields you must type-assert to the concrete configuration type for your message broker. For NATS subscriptions:
import natspubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"

func (m *MyModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
    cfg, ok := ctx.SubscriptionEventConfiguration().(*natspubsub.SubscriptionEventConfiguration)
    if !ok {
        return nil
    }
    // cfg.Subjects and cfg.StreamConfiguration are now directly mutable.
    return nil
}
For Redis subscriptions:
import redispubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/redis"

func (m *MyModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
    cfg, ok := ctx.SubscriptionEventConfiguration().(*redispubsub.SubscriptionEventConfiguration)
    if !ok {
        return nil
    }
    // cfg fields are now directly mutable.
    return nil
}
Use ctx.SubscriptionEventConfiguration().ProviderType() to branch by provider type when your module handles multiple providers.

Usage Example

Dynamic NATS subject routing

The following example rewrites NATS subjects based on a tenant identifier from the request header. Each tenant is isolated to its own subject namespace.
package module

import (
    "fmt"

    "github.com/wundergraph/cosmo/router/core"
    natspubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
)

func init() {
    core.RegisterModule(&TenantRoutingModule{})
}

const ModuleID = "tenantRoutingModule"

type TenantRoutingModule struct{}

func (m *TenantRoutingModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
    // Only rewrite subjects for the "employeeUpdated" subscription.
    if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" {
        return nil
    }

    cfg, ok := ctx.SubscriptionEventConfiguration().(*natspubsub.SubscriptionEventConfiguration)
    if !ok {
        return nil
    }

    tenantID := ctx.Request().Header.Get("X-Tenant-ID")
    if tenantID == "" {
        return nil
    }

    // Prefix every subject with the tenant ID.
    for i, subject := range cfg.Subjects {
        cfg.Subjects[i] = fmt.Sprintf("%s.%s", tenantID, subject)
    }
    return nil
}

func (m *TenantRoutingModule) Module() core.ModuleInfo {
    return core.ModuleInfo{
        ID: ModuleID,
        New: func() core.Module {
            return &TenantRoutingModule{}
        },
    }
}

var _ core.SubscriptionOnCreateHandler = (*TenantRoutingModule)(nil)

Dynamic Redis channel routing

The following example rewrites Redis channels based on a user ID extracted from the authenticated request. Each user receives events only from their own channel.
package module

import (
    "fmt"

    "github.com/wundergraph/cosmo/router/core"
    redispubsub "github.com/wundergraph/cosmo/router/pkg/pubsub/redis"
)

func init() {
    core.RegisterModule(&UserChannelRoutingModule{})
}

const ModuleID = "userChannelRoutingModule"

type UserChannelRoutingModule struct{}

func (m *UserChannelRoutingModule) SubscriptionOnCreate(ctx core.SubscriptionOnCreateHandlerContext) error {
    // Only rewrite channels for the "orderStatusUpdated" subscription.
    if ctx.SubscriptionEventConfiguration().RootFieldName() != "orderStatusUpdated" {
        return nil
    }

    cfg, ok := ctx.SubscriptionEventConfiguration().(*redispubsub.SubscriptionEventConfiguration)
    if !ok {
        return nil
    }

    auth := ctx.Authentication()
    if auth == nil {
        return nil
    }

    userID, ok := auth.Claims()["sub"].(string)
    if !ok || userID == "" {
        return nil
    }

    // Route each user to their own channel.
    for i, channel := range cfg.Channels {
        cfg.Channels[i] = fmt.Sprintf("%s.%s", channel, userID)
    }
    return nil
}

func (m *UserChannelRoutingModule) Module() core.ModuleInfo {
    return core.ModuleInfo{
        ID: ModuleID,
        New: func() core.Module {
            return &UserChannelRoutingModule{}
        },
    }
}

var _ core.SubscriptionOnCreateHandler = (*UserChannelRoutingModule)(nil)