> ## Documentation Index
> Fetch the complete documentation index at: https://cosmo-docs.wundergraph.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

<Frame caption="Kafka system connecting CMS, CRM, and ERP to routers via Cosmo Streams">
  <img src="https://mintcdn.com/wundergraphinc/xf0VUtB1vejbGgo8/images/router/event-driven-federated-subscriptions-edfs/kafka-system-linking-apps-and-routers.png?fit=max&auto=format&n=xf0VUtB1vejbGgo8&q=85&s=f6a0666519f240e87f36cb0c65b2a2be" alt="Kafka system with CMS, CRM, ERP feeding into Kafka, output to three routers/clients" width="1966" height="1018" data-path="images/router/event-driven-federated-subscriptions-edfs/kafka-system-linking-apps-and-routers.png" />
</Frame>

## Minimum requirements

| Package      | Minimum version |
| ------------ | --------------- |
| controlplane | 0.88.3          |
| router       | 0.88.0          |
| wgc          | 0.55.0          |

<Note>
  Kafka topic arguments support the same [argument template syntax](/router/cosmo-streams#the-“subjects”-argument) as the other event providers, for example `employeeUpdated.{{ args.employeeID }}`.
  When you use topic templates with Kafka, you must create the resulting name as a Kafka topic in your broker ahead of time. The Router does not create Kafka topics automatically.
</Note>

## Full schema example

Below is a comprehensive example of how to use Kafka with Cosmo Streams. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and `edfs__*` types belong to the Cosmo Streams schema contract and must not be modified.

```js theme={"system"}
# Cosmo Streams

directive @edfs__kafkaPublish(topic: String!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__kafkaSubscribe(topics: [String!]!, providerId: String! = "default") on FIELD_DEFINITION

# OpenFederation

directive @openfed__subscriptionFilter(condition: openfed__SubscriptionFilterCondition!) on FIELD_DEFINITION

scalar openfed__SubscriptionFilterValue

input openfed__SubscriptionFieldCondition {
    fieldPath: String!
    values: [openfed__SubscriptionFilterValue]!
}

input openfed__SubscriptionFilterCondition {
    AND: [openfed__SubscriptionFilterCondition!]
    IN: openfed__SubscriptionFieldCondition
    NOT: openfed__SubscriptionFilterCondition
    OR: [openfed__SubscriptionFilterCondition!]
}

# Custom

input UpdateEmployeeInput {
    name: String
    email: String
}

type Mutation {
   updateEmployeeMyKafka(employeeID: Int!, update: UpdateEmployeeInput!): edfs__PublishResult! @edfs__kafkaPublish(topic: "employeeUpdated", providerId: "my-kafka")
}

type Subscription {
    filteredEmployeeUpdatedMyKafka(employeeID: ID!): Employee!
        @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
        @openfed__subscriptionFilter(condition: { IN: { fieldPath: "id", values: [1, 3, 4, 7, 11] } })
    filteredEmployeeUpdatedMyKafkaWithListFieldArguments(firstIds: [ID!]!, secondIds: [ID!]!): Employee!
        @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
    filteredEmployeeUpdatedMyKafkaWithNestedListFieldArgument(input: KafkaInput!): Employee!
        @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
        @openfed__subscriptionFilter(condition: {
            OR: [
                { IN: { fieldPath: "id", values: ["{{ args.input.ids }}"] } },
                { IN: { fieldPath: "id", values: [1] } },
            ],
        })
}

input KafkaInput {
    ids: [Int!]!
}

# Subgraph schema

type Employee @key(fields: "id", resolvable: false) {
  id: Int! @external
}

type edfs__PublishResult {
    success: Boolean!
}
```

You can create the above Event-Driven Graph (EDG—an abstract subgraph) with the following [wgc](/cli/intro) command:

```js theme={"system"}
wgc subgraph publish edg --namespace default --schema eedg.graphqls
```

## Router configuration

Based on the example above, you will need a compatible router configuration.

<CodeGroup>
  ```yaml config.yaml theme={"system"}
  events:
    providers:
      kafka:
        - id: my-kafka # Needs to match with the providerID in the directive
          tls:
            enabled: true
          authentication:
            sasl_plain:
              password: "password"
              username: "username"
          brokers:
            - "localhost:9092"
  ```
</CodeGroup>

## Example Query

In the example query below, one or more subgraphs have been implemented alongside the Event-Driven Graph to resolve any other fields defined on `Employee`, e.g., `tag` and `details.surname`.

```js theme={"system"}
subscription {
  filteredEmployeeUpdatedMyKafka(employeeID: 1) {
    id # resolved by the Event-Driven Graph (through the event)
    tag # resolved by another subgraph
    details { # resolved by another subgraph
      surname
    }
  }
}
```

## System diagram

<Frame caption="Service–Kafka–Router interaction showing event flow and subscriptions">
  <img src="https://mintcdn.com/wundergraphinc/xf0VUtB1vejbGgo8/images/router/event-driven-federated-subscriptions-edfs/service-kafka-router-client-interaction.png?fit=max&auto=format&n=xf0VUtB1vejbGgo8&q=85&s=728131a037f997b9f36311864d3af4b3" alt="Service interacting with Kafka, router, and clients, showing event flow and subscriptions" width="1160" height="1092" data-path="images/router/event-driven-federated-subscriptions-edfs/service-kafka-router-client-interaction.png" />
</Frame>
