
# TODO: cherry-pick from BAP

# plugins:
#   my-cool-plugin:  # the name to reference in a stage's `module` field
#     path: name.js  # relative paths are resolved relative to this config's directory

templates:
  my-templates:          # the namespace for stages in this template file
    path: templates.yml  # relative paths are resolved relative to this config's directory

pipeline:

  stdin:
    module: stdin  # technically not needed since the module defaults to the stage name

  lines:
    inChannels:
      - stdin

  json-in:
    inChannels:
      - lines

  # Drop all fields except for the ones I'm interested in.
  # https://www.freedesktop.org/software/systemd/man/systemd.journal-fields.html
  clean-journal:
    module: keep
    inChannels:
      - json-in
    fields:
      hostname: ['_HOSTNAME', '_MACHINE_ID']
      severity: PRIORITY
      process: ['CONTAINER_NAME', 'SYSLOG_IDENTIFIER', 'UNIT', '_SYSTEMD_UNIT']
      transport: _TRANSPORT,
      message: MESSAGE
      ts: ['_SOURCE_REALTIME_TIMESTAMP', '__REALTIME_TIMESTAMP']

  # Only interested in msgs of higher severity.
  higher-severity:
    module: js
    inChannels:
      - clean-journal
    function: !!js/function >-
      function(data) {
        return data.severity < 5
      }

  # Split the "clean-journal" stream into two: aggregated by specific processes and not.
  unaggregated:
    module: js
    inChannels:
      - higher-severity
    aggregated:
      foo.service: true
      bar.service: true
    function: !!js/function >-
      function(data) {
        return this.config.aggregated[data.process] === undefined
      }

  # Group msgs from service as a single msg.
  aggregated:
    module: sql
    inChannels:
      - clean-journal
    # Will aggregate when either interval reached or buffer reaches max size.
    interval: 3 # seconds
    bufferSize: 10000
    query: >-
      SELECT
        `process`,
        ARRAY(`message`) AS `message`,
        ARRAY(DISTINCT `hostname`) AS `hostname`,
        MIN(`ts`) AS `ts`,
        MAX(`ts`) AS `end`,
        (MAX(`ts`) - MIN(`ts`)) / 1000000 AS `duration`
      FROM ?
      WHERE
        `process` IN ("foo.service", "bar.service")
      GROUP BY `process`

  # Handle some transormations couldn't figure out how to do in sql.
  post-aggregation:
    module: js
    inChannels:
      - aggregated
    function: !!js/function >-
      function(data) {
        if (data.message !== undefined) {
          data.message = data.message.join('\n')
          data.hostname = data.hostname[0]
          return true
        }
      }

  # A simple plugin to handle common task of casting to desired types.
  cast:
    inChannels:
      - unaggregated
      - post-aggregation
    fields:
      severity: int
      ts: ts-usec
      end: ts-usec

  # Specify a deterministic _id so logs can be re-processed without duplicating msgs.
  elasticify:
    module: js
    inChannels:
      - cast
    function: !!js/function >-
      function(data) {
        if (data._type === undefined) {
          data._type = 'syslog'
          data.type = data._type
        }
        if (data._id === undefined) {
          data._id = this.util.format('%s::%s', data.hostname, data.ts.toISOString())
        }
        return true
      }

  es-logs:
    module: elasticsearch
    bufferSize: 100
    inChannels:
      - elasticify
    index: !!js/function >-
      function (data) {
        return this.util.format('logs-%s', data.ts.format('YYYY.MM'))
      }
    api: '2.3'
    # See https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/host-reference.html
    # hosts:
    #   - host: elasticsearch.foo.com
    #     protocol: https:
    #     port: 443
    #     auth: user:pass
    # ssl:
    #   rejectUnauthorized: true
    #   ca: >-
    #     sldkfsdlfkj

  # Example showing how stats & error handling could be handled in a
  # user-defined way. By default, a stage will emit('stats', {...}) and
  # emit('errors', err, {...}) but those channels could be overridden on a
  # per-stage basis via `errChannel` & `statsChannel`.

  # Default outChannels for a stage is [STAGENAME], so calling it something
  # other than `stats` to avoid loop.
  log-stats:
    template: my-templates.log-stats
    interval: 30  # override template's interval

  # Store pipeline stats to elasticsearch.
  es-stats:
    module: elasticsearch
    inChannels:
      - log-stats
    index: stats
    type: sample
    api: '2.3'

  # Capture errors.
  log-errors:
    template: my-templates.log-errors

  # Emit stats & errors to the conole.
  console-log:
    module: log
    inChannels:
      - log-stats
      - log-errors
    level: INFO
    extra:
      foo: bar
