Kafka

Kafka event provider support for EDFS

Minimum requirements

PackageMinimum version

controlplane

0.88.3

router

0.88.0

wgc

0.55.0

Full schema example

Here is a comprehensive example of how to use Kafka with EDFS. This guide covers publish, subscribe, and the filter directive. All examples can be modified to suit your specific needs. The schema directives and edfs__* types belong to the EDFS schema contract and must not be modified.

# EDFS

directive @edfs__kafkaPublish(topic: String!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__kafkaSubscribe(topics: [String!]!, providerId: String! = "default") on FIELD_DEFINITION

# OpenFederation

directive @openfed__subscriptionFilter(condition: openfed__SubscriptionFilterCondition!) on FIELD_DEFINITION

scalar openfed__SubscriptionFilterValue

input openfed__SubscriptionFieldCondition {
    fieldPath: String!
    values: [openfed__SubscriptionFilterValue]!
}

input openfed__SubscriptionFilterCondition {
    AND: [openfed__SubscriptionFilterCondition!]
    IN: openfed__SubscriptionFieldCondition
    NOT: openfed__SubscriptionFilterCondition
    OR: [openfed__SubscriptionFilterCondition!]
}

# Custom

input UpdateEmployeeInput {
    name: String
    email: String
}

type Mutation {
   updateEmployeeMyKafka(employeeID: Int!, update: UpdateEmployeeInput!): edfs__PublishResult! @edfs__kafkaPublish(topic: "employeeUpdated", providerId: "my-kafka")
}

type Subscription {
    filteredEmployeeUpdatedMyKafka(employeeID: ID!): Employee!
        @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
        @openfed__subscriptionFilter(condition: { IN: { fieldPath: "id", values: [1, 3, 4, 7, 11] } })
    filteredEmployeeUpdatedMyKafkaWithListFieldArguments(firstIds: [ID!]!, secondIds: [ID!]!): Employee!
        @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
    filteredEmployeeUpdatedMyKafkaWithNestedListFieldArgument(input: KafkaInput!): Employee!
        @edfs__kafkaSubscribe(topics: ["employeeUpdated", "employeeUpdatedTwo"], providerId: "my-kafka")
        @openfed__subscriptionFilter(condition: {
            OR: [
                { IN: { fieldPath: "id", values: ["{{ args.input.ids }}"] } },
                { IN: { fieldPath: "id", values: [1] } },
            ],
        })
}

input KafkaInput {
    ids: [Int!]!
}

# Subgraph schema

type Employee @key(fields: "id", resolvable: false) {
  id: Int! @external
}

type edfs__PublishResult {
    success: Boolean!
}

You can create the abstract subgraph with the following wgc command:

wgc subgraph publish employee --namespace default --schema edfs-graph.graphqls --routing-url http://localhost:4004/graphql

The routing url is still mandatory due a limitation. You can specify anything to make it pass.

Router config

Based on the example above, you will need a compatible router configuration.

config.yaml
events:
  providers:
    kafka:
      - id: my-kafka # Needs to match with the providerID in the directive
        tls: 
          enabled: true
        authentication: 
          sasl_plain: 
            password: "password"
            username: "username"
        brokers:
          - "localhost:9092"

Example Query

This query assumes that your implemented employee subgraph can resolve the fields.

subscription {
  filteredEmployeeUpdatedMyKafka(employeeID: 1) {
    id
    tag
    details {
      surname
    }
  }
}

System diagram

Last updated