> ## Documentation Index
> Fetch the complete documentation index at: https://cosmo-docs.wundergraph.com/llms.txt
> Use this file to discover all available pages before exploring further.

# SubscriptionOnStart Handler

> A Cosmo Streams Custom Module, which lets you customize subscription initialization behavior

The `SubscriptionOnStart` handler is a custom module hook that allows you to intercept and customize the initialization of GraphQL subscriptions.
This handler is called once when a subscription starts, giving you the opportunity to validate permissions, send initial events, or perform setup logic.

This handler is particularly useful for:

* **Subscription authentication**: Validate JWT tokens or user permissions before allowing subscriptions
* **Initial event delivery**: Send welcome messages or current state to new subscribers
* **Subscription logging**: Track subscription attempts and user behavior
* **Connection validation**: Ensure clients meet specific criteria before subscribing
* **Rate limiting**: Control subscription attempts per user or client
* **State initialization**: Initialize state used by other handlers such as `OnReceiveEvents` or `OnPublishEvents` of the same module

## Handler Interface

In order to use the `SubscriptionOnStart` handler you need to create a [Custom Module](../../custom-modules) which implements the `SubscriptionOnStartHandler` interface.

```go theme={"system"}
type SubscriptionOnStartHandler interface {
    // SubscriptionOnStart is called once at subscription start
    // The error is propagated to the client.
    SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error
}

type SubscriptionOnStartHandlerContext interface {
	// Request is the original request received by the router.
	Request() *http.Request
	// Logger is the logger for the request
	Logger() *zap.Logger
	// Operation is the GraphQL operation
	Operation() OperationContext
	// Authentication is the authentication for the request
	Authentication() authentication.Authentication
	// SubscriptionEventConfiguration is the subscription event configuration (will return nil for engine subscription)
	SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
	// EmitLocalEvent sends an event directly to the subscription stream of the
	// currently connected client.
	//
	// This method triggers the router to resolve the client's operation and emit
	// the resulting data as a stream event. The event exists only within the
	// router; it is not forwarded to any message broker.
	//
	// The event is delivered exclusively to the client associated with the current
	// handler execution. No other subscriptions are affected.
	//
	// The method returns true if the event was successfully emitted, or false if
	// it was dropped.
	EmitLocalEvent(event datasource.StreamEvent) bool
	// NewEvent creates a new event that can be used in the subscription.
	//
	// The data parameter must contain valid JSON bytes. The format depends on the subscription type.
	//
	// For event-driven subscriptions (Cosmo Streams / EDFS), the data should contain:
	// __typename : The name of the schema entity, which is expected to be returned to the client.
	// {keyName} : The key of the entity as configured on the schema via @key directive.
	// Example usage: ctx.NewEvent([]byte(`{"__typename": "Employee", "id": 1}`))
	//
	// For normal subscriptions, you need to provide the complete GraphQL response structure.
	// Example usage: ctx.NewEvent([]byte(`{"data": {"fieldName": value}}`))
	//
	// You can use EmitLocalEvent to emit this event to subscriptions.
	NewEvent(data []byte) datasource.MutableStreamEvent
}
```

## Error Handling

When you return an error from the `SubscriptionOnStart` handler, the router responds to the client with an error event and closes the subscription.
You can choose to log a generic error or a custom error response with more details for the client.

```go theme={"system"}
return errors.New("my handler error")
```

This will result in an internal server error response to the client.

```json theme={"system"}
{
  "errors": [
    {
      "message": "Internal server error"
    }
  ]
}
```

Whereas you can return a custom error response with more details for the client.

```go theme={"system"}
return &core.StreamHandlerError{Message: "my graphql error"}
```

This will result in a error response with more details for the client.

```json theme={"system"}
{
  "errors": [
    {
      "message": "my graphql error",
    }
  ]
}
```

<Warning>
  Errors are not logged automatically by the router. If you need the error to be logged, you can use `ctx.Logger()` to log the error yourself.
</Warning>

## Usage Example

### Complete Custom Module with Event Bypass

The following example demonstrates how to register a passive `SubscriptionOnStart` handler that logs subscription attempts but allows all subscriptions to proceed normally.

```go theme={"system"}
package module

import (
	"github.com/wundergraph/cosmo/router/core"
	"go.uber.org/zap"
)

func init() {
	// Register your module with the router
	core.RegisterModule(&SubscriptionStartModule{})
}

const ModuleID = "subscriptionStartModule"

// SubscriptionStartModule demonstrates a passive subscription start handler
type SubscriptionStartModule struct{}

// SubscriptionOnStart logs subscription attempts and allows them to proceed
func (m *SubscriptionStartModule) SubscriptionOnStart(
	ctx core.SubscriptionOnStartHandlerContext,
) error {
	logger := ctx.Logger()
	config := ctx.SubscriptionEventConfiguration()

	// Log subscription details
	logger.Info("Subscription started",
		zap.String("field_name", config.RootFieldName()),
		zap.String("provider_id", config.ProviderID()),
		zap.String("provider_type", string(config.ProviderType())),
	)

	// Allow subscription to proceed
	return nil
}

// Module returns the module information for registration
func (m *SubscriptionStartModule) Module() core.ModuleInfo {
	return core.ModuleInfo{
		ID: ModuleID,
		New: func() core.Module {
			return &SubscriptionStartModule{}
		},
	}
}

// Interface guards to ensure we implement the required interfaces
var (
	_ core.SubscriptionOnStartHandler = (*SubscriptionStartModule)(nil)
)
```

### Return initial events

You can use `ctx.EmitLocalEvent()` to send initial or welcome events to subscribers immediately when they connect.
This is useful for providing current state or welcome messages.

```go theme={"system"}
func (m *SubscriptionStartModule) SubscriptionOnStart(
	ctx core.SubscriptionOnStartHandlerContext,
) error {
	// Bypass the handler on other subscriptions
	if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" {
		return nil
	}

	// Create an initial event with minimal required fields
	// The router will resolve all other fields requested by the subscriber
	initialEventData := `{ "__typename": "Employee", "id": 1 }`
	initialEvent := ctx.NewEvent([]byte(initialEventData))

	success := ctx.EmitLocalEvent(initialEvent)
	if !success {
		ctx.Logger().Warn("Failed to send initial event to subscriber")
	}

	return nil
}
```

The payload data used to create a new event has to follow a specific format.\
It has to be a valid JSON object that contains the `__typename` field to identify the entity type we want to return
for this subscription. The other field in this case is `id`, which represents the entity key of `Employee` types as defined in the schema.
The router will use this information to resolve all fields requested by the subscriber to generate a complete response.

### Prevent subscriptions on missing token claims

This example validates JWT tokens and blocks subscriptions for users without the required "role" claim, demonstrating proper authentication enforcement.

```go theme={"system"}
func (m *SubscriptionStartModule) SubscriptionOnStart(ctx core.SubscriptionOnStartHandlerContext) error {
	// Only check "employeeUpdated" subscription
	if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdated" {
		return nil
	}

	auth := ctx.Authentication()
	if auth == nil {
		return &core.StreamHandlerError{Message: "unauthorized"}
	}

	// Check for specific "admin" role
	roleValue, hasRole := auth.Claims()["role"]
	if !hasRole {
		return &core.StreamHandlerError{Message: "missing role claim"}
	}

	role, ok := roleValue.(string)
	if !ok || role != "admin" {
		return &core.StreamHandlerError{Message: "admin role required"}
	}

	return nil
}
```
