
pipeline:

  odd:
    module: kafka-in
    config:
      hosts:
        - 'localhost:9092'
      topics:
        - odd
      groupId: odd
      offset: earliest
      format: json
      compression: gzip
      debug: cgrp,topic,fetch
      exitAfterSeconds: 5

  even:
    module: kafka-in
    config:
      hosts:
        - 'localhost:9092'
      topics:
        - even
      groupId: even
      offset: earliest
      format: json
      compression: gzip
      debug: cgrp,topic,fetch
      exitAfterSeconds: 5

  # Remove variable & environment specific data and transform to format that is
  # easier for `jq` to produce deterministic results.
  testify:
    module: js
    inChannels:
      - odd
      - even
    config:
      function: !!js/function >-
        function(event, channel) {
          event.channel = channel
          return {key: event.ts, value: event}
        }

  json-out:
    inChannels:
      - testify

  test-output:
    module: file-out
    inChannels:
      - json-out
    config:
      path: test/kafka/out.json

  log-errors:
    module: errors
    inChannels:
      - errors
    config:
      intervalSeconds: 5
      stackDepth: 6

  log-stats:
    module: stats
    inChannels:
      - stats

  log:
    inChannels:
      - log
      - log-errors
      - log-stats
