Options
All
  • Public
  • Public/Protected
  • All
Menu

@fluent-org/logger

Build Status License Docs

Fluent Forward Protocol implementation for Node.js. Built upon fluent-logger-node.

NPM

Install

$ npm install @fluent-org/logger

Client

@fluent-org/logger provides a fully functional client that implements the Forward protocol. It supports reconnection, acknowledgements, timeouts, event retries, and more, and exposes its functionality via a simple typed Promise interface.

For a full list of the client options and methods, see the FluentClient docs

Prerequisites

The fluent daemon should be listening in forward mode.

A simple starting configuration for Fluentd is the following:

<source>
  @type forward
  port 24224
</source>

<match **.*>
  @type stdout
</match>

See the FluentD docs for more info.

A similar starting configuration for Fluent Bit is the following:

[INPUT]
    Name              forward
    Listen            0.0.0.0
    Port              24224
    Buffer_Chunk_Size 1M
    Buffer_Max_Size   6M

[OUTPUT]
    Name   stdout
    Match  *

See the Fluent Bit docs for more info.

Sending an event record to an upstream Fluent server

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
  }
});

The emit method has following signature

emit(data: Record<string, any>): Promise<void>;
emit(data: Record<string, any>, timestamp: number | Date | EventTime): Promise<void>;
emit(label: string, data: Record<string, any>): Promise<void>;
emit(label: string, data: Record<string, any>, timestamp: number | Date | EventTime): Promise<void>;

The returned Promise is resolved once the event is written to the socket, or rejected if an error occurs.

Acknowledgements

The Fluent forward protocol provides explicit support for acknowledgements, which allow the client to be sure that the event reached its destination.

Enabling acknowledgements means that the promise returned by emit will be resolved once the client receives an explicit acknowledgement from the server.

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  ack: {}
});

Event modes

The Fluent forward protocol provides multiple message modes, Message, Forward, PackedForward(default), CompressedPackedForward. The Fluent client supports all of them.

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  eventMode: "Message" | "Forward" | "PackedForward" | "CompressedPackedForward"
});

Disable automatic reconnect

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
    disableReconnect: true
  }
});
// If you disable reconnections, the socket has to be manually connected, 
// connect() returns a promise, which rejects on connection errors.
logger.connect();

Shared key authentication

Logger configuration:

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
  },
  security: {
    clientHostname: "client.localdomain",
    sharedKey: "secure_communication_is_awesome"
  }
});

Fluentd configuration:

<source>
  @type forward
  port 24224
  <security>
    self_hostname input.testing.local
    shared_key secure_communication_is_awesome
  </security>
</source>

<match dummy.*>
  @type stdout
</match>

See also the Fluentd examples.

TLS/SSL encryption

Logger configuration:

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
    tls: {
      ca: fs.readFileSync("/path/to/ca_cert.pem")
    },
  },
  security: {
    clientHostname: "client.localdomain",
    sharedKey: "secure_communication_is_awesome"
  },
});

Fluentd configuration:

<source>
  @type forward
  port 24224
  <transport tls>
    ca_cert_path /path/to/ca_cert.pem
    ca_private_key_path /path/to/ca_key.pem
    ca_private_key_passphrase very_secret_passphrase
  </transport>
  <security>
    self_hostname input.testing.local
    shared_key secure_communication_is_awesome
  </security>
</source>

<match dummy.*>
  @type stdout
</match>

FYI: You can generate certificates using the fluent-ca-generate command since Fluentd 1.1.0.

See also How to enable TLS/SSL encryption.

Mutual TLS Authentication

Logger configuration:

const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
    tls: {
      ca: fs.readFileSync("/path/to/ca_cert.pem"),
      cert: fs.readFileSync("/path/to/client-cert.pem"),
      key: fs.readFileSync("/path/to/client-key.pem"),
      passphrase: "very-secret"
    },
  },
  security: {
    clientHostname: "client.localdomain",
    sharedKey: "secure_communication_is_awesome"
  }
});

Fluentd configuration:

<source>
  @type forward
  port 24224
  <transport tls>
    ca_path /path/to/ca-cert.pem
    cert_path /path/to/server-cert.pem
    private_key_path /path/to/server-key.pem
    private_key_passphrase very_secret_passphrase
    client_cert_auth true
  </transport>
  <security>
    self_hostname input.testing.local
    shared_key secure_communication_is_awesome
  </security>
</source>

<match dummy.*>
  @type stdout
</match>

EventTime support

We can also specify an EventTime as a timestamp. See the EventTime docs

const FluentClient = require("@fluent-org/logger").FluentClient;
const EventTime = require("@fluent-org/logger").EventTime;
const eventTime = new EventTime(1489547207, 745003500); // 2017-03-15 12:06:47 +0900
const logger = new FluentClient("tag_prefix", {
  socket: {
    host: "localhost",
    port: 24224,
    timeout: 3000, // 3 seconds
  }
});
logger.emit("tag", { message: "This is a message" }, eventTime);

Handling errors

The Fluent client will manage errors internally, and reject promises on errors. If you"d like to access the non-user facing internal errors, you can do so by passing errorHandler

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  onSocketError: (err: Error) => {
    console.error("error!", err)
  }
});

Retrying events

Sometimes it makes sense to resubmit events if their initial submission failed. You can do this by specifying eventRetry.

const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
  eventRetry: {}
});

Server

@fluent-org/logger includes a fully functional forward server which can be used as a downstream Fluent sink.

const FluentServer = require("@fluent-org/logger").FluentServer;

const server = new FluentServer({ listenOptions: { port: 24224 }});

await server.listen();

Fluentd config:

<match pattern>
  @type forward
  send_timeout 60s
  recover_wait 10s
  hard_timeout 60s

  <server>
    name fluent_node 
    host 127.0.0.1
    port 24224
    weight 60
  </server>

  <secondary>
    @type file
    path /var/log/fluent/forward-failed
  </secondary>
</match>

See the FluentD docs for more info.

Alternatively, see the Fluent Bit docs for info on setting up Fluent Bit.

For a full list of the server options and methods, see the FluentServer docs

License

Apache License, Version 2.0.

About NodeJS versions

This package is compatible with NodeJS versions >= 12.

Index

Type aliases

FluentAuthOptions

FluentAuthOptions: { clientHostname: string; sharedKey: string; username?: string; password?: string }

The authentication options for the client

Type declaration

  • clientHostname: string

    The client host name (required).

    Must be unique to this process

  • sharedKey: string

    The shared key with the server. (required)

  • Optional username?: string

    The username to authenticate with. (optional)

  • Optional password?: string

    The password to authenticate with. (optional)

EventModes

EventModes: "Message" | "Forward" | "PackedForward" | "CompressedPackedForward"

The set of accepted event modes. See Forward protocol spec

Message will send each event to FluentD individually.

Forward will collect the events together by tag, and send them together to FluentD in a single packet. This is more efficient with a flushInterval to batch these together.

PackedForward will behave the same as Forward, but will pack the events as part of entering the queue. This saves memory and bandwidth.

CompressedPackedForward will behave the same as PackedForward, but additionally compress the items before emission, saving more bandwidth.

Timestamp

Timestamp: number | Date | EventTime

The set of accepted Timestamp values

AckOptions

AckOptions: { ackTimeout: number }

Acknowledgement settings

Type declaration

  • ackTimeout: number

    How long to wait for an acknowledgement from the server

SendQueueLimit

SendQueueLimit: { size: number; length: number }

Type declaration

  • size: number

    The queue size limit (memory)

    This checks the memory size of the queue, which is only useful with PackedForward and PackedForwardCompressed.

    Defaults to +Infinity

  • length: number

    The queue length limit (# of entries)

    This checks the number of events in the queue, which is useful with all event modes.

    Defaults to +Infinity

DisconnectOptions

DisconnectOptions: { waitForPending: boolean; waitForPendingDelay: number; socketDisconnectDelay: number }

Type declaration

  • waitForPending: boolean

    If to wait for all pending events to finish sending to the fluent server before disconnecting

    Defaults to false (does not wait)

  • waitForPendingDelay: number

    The maximum amount of time to wait for pending events to finish sending to the fluent server before disconnecting

    Defaults to 0 (no maximum time)

  • socketDisconnectDelay: number

    The amount of time to wait before disconnecting the socket on disconnection

    Useful to wait for acknowledgements on final flush

    Defaults to 0

FluentClientOptions

FluentClientOptions: { eventMode?: EventModes; socket?: FluentSocketOptions; security?: FluentAuthOptions; ack?: Partial<AckOptions>; milliseconds?: boolean; flushInterval?: number; sendQueueIntervalFlushLimit?: Partial<SendQueueLimit>; sendQueueSyncFlushLimit?: Partial<SendQueueLimit>; sendQueueMaxLimit?: Partial<SendQueueLimit>; sendQueueNotFlushableLimit?: Partial<SendQueueLimit>; sendQueueNotFlushableLimitDelay?: number; eventRetry?: Partial<EventRetryOptions>; disconnect?: Partial<DisconnectOptions>; disableAutoconnect?: boolean }

The constructor options passed to the client

Type declaration

  • Optional eventMode?: EventModes

    The event mode to use. Defaults to PackedForward

  • Optional socket?: FluentSocketOptions

    The connection options. See subtype for defaults.

  • Optional security?: FluentAuthOptions

    The fluentd security options. See subtype for defaults.

  • Optional ack?: Partial<AckOptions>

    Acknowledgement settings.

  • Optional milliseconds?: boolean

    The timestamp resolution of events passed to FluentD.

    Passing false here means that the timestamps will be emitted as numbers (unless you explicitly provide an EventTime) Passing true means that timestamps will alwyas be emitted as EventTime object instances. This includes timestamps

    Defaults to false (seconds). If true, the resolution will be in milliseconds.

  • Optional flushInterval?: number

    How long to wait to flush the queued events

    If this is 0, we don't wait at all

    Defaults to 0

  • Optional sendQueueIntervalFlushLimit?: Partial<SendQueueLimit>

    The limit at which the queue needs to be flushed.

    Used when flushInterval is > 0 to say "flush after flushInterval ms, or when the queue reaches X size"

    See the subtype for defaults

  • Optional sendQueueSyncFlushLimit?: Partial<SendQueueLimit>

    The limit at which we flush synchronously. By default, we flush asynchronously, which can be bad if we're sending 30k+ events at a time.

    This sets a size limit at which we flush synchronously within emit(), which makes sure we're flushing as quickly as possible

    Defaults to null (no limit)

    See the subtype for defaults

  • Optional sendQueueMaxLimit?: Partial<SendQueueLimit>

    The limit at which we start dropping events

    Prevents the queue from growing to an unbounded size and exhausting memory.

    See the subtype for defaults

  • Optional sendQueueNotFlushableLimit?: Partial<SendQueueLimit>

    The limit at which we start dropping events when we're not writable

    Prevents the queue from growing too much when fluentd is down for an extended period

    Defaults to null (no limit)

    See the subtype for defaults

  • Optional sendQueueNotFlushableLimitDelay?: number

    The delay after which we're not writable to start flushing events. Useful to make sure we don't drop events during short blips

    Defaults to 0 (no delay)

  • Optional eventRetry?: Partial<EventRetryOptions>

    Retry event submission on failure

    Warning: This effectively keeps the event in memory until it is successfully sent or retries exhausted

    See subtype for defaults

  • Optional disconnect?: Partial<DisconnectOptions>

    Options to control disconnection behavior

    How many times to try to flush before disconnecting, wait times, etc

    See subtype for defaults

  • Optional disableAutoconnect?: boolean

    Disable connection on client creation. Expects the client to call .connect to start sending messages.

    Defaults to false

EventRetryOptions

EventRetryOptions: { attempts: number; backoff: number; delay: number; minDelay: number; maxDelay: number; onError: any }

Event retry settings

The parameters represent an exponential backoff formula: min(maxDelay, max(minDelay, backoff^attempts * delay))

Type declaration

  • attempts: number

    How often we retry each event

    Defaults to 4

  • backoff: number

    The backoff factor for each attempt

    Defaults to 2

  • delay: number

    The delay factor for each attempt

    Defaults to 100

  • minDelay: number

    The global minimum delay

  • maxDelay: number

    The global maximum delay

  • onError: function
    • onError(err: Error): void
    • Called with each error

      Can be used for logging, or if the error is non-retryable, this callback can throw the error to short circuit the callback.

      Parameters

      • err: Error

      Returns void

FluentServerSecurityOptions

FluentServerSecurityOptions: { serverHostname: string; sharedKey: string; authorize: boolean; userDict: Record<string, string> }

The server security hardening options

Type declaration

  • serverHostname: string

    The hostname of the server. Should be unique to this process

  • sharedKey: string

    The shared key to authenticate clients with

  • authorize: boolean

    Whether to use user authentication

  • userDict: Record<string, string>

    A dict of users to their passwords

FluentServerOptions

FluentServerOptions: { security?: FluentServerSecurityOptions; keepalive?: boolean; tlsOptions?: tls.TlsOptions; listenOptions?: net.ListenOptions }

The server setup options

Type declaration

  • Optional security?: FluentServerSecurityOptions

    The security options.

    Defaults to undefined (no auth).

  • Optional keepalive?: boolean

    Whether or not to keep the sockets alive. Sent in HELO, but currently ignored

    Defaults to false

  • Optional tlsOptions?: tls.TlsOptions

    TLS setup options.

    See the Node.js docs for more info

    Defaults to undefined

  • Optional listenOptions?: net.ListenOptions

    Socket listen options

    See the Node.js docs for more info

    Defaults to {port: 0}

ReconnectOptions

ReconnectOptions: { backoff: number; delay: number; minDelay: number; maxDelay: number }

Reconnection settings for the socket

The parameters represent an exponential backoff formula: min(maxDelay, max(minDelay, backoff^attempts * delay))

The attempt count is incremented each time we fail a connection, and set to zero each time a connection is successfully made. Note this is before handshaking

Type declaration

  • backoff: number

    The backoff factor for each attempt

    Defaults to 2

  • delay: number

    The delay factor for each attempt

    Defaults to 500

  • minDelay: number

    The global minimum delay

  • maxDelay: number

    The global maximum delay

FluentSocketOptions

FluentSocketOptions: { path?: string; host?: string; port?: number; timeout?: number; tls?: tls.ConnectionOptions; reconnect?: Partial<ReconnectOptions>; disableReconnect?: boolean; notWritableWhenDraining?: boolean }

Type declaration

  • Optional path?: string

    If connecting to a unix domain socket, e.g unix:///var/run/fluentd.sock, then specify that here.

    This overrides host/port below. Defaults to undefined.

  • Optional host?: string

    The host (IP) to connect to

    Defaults to localhost.

  • Optional port?: number

    The port to connect to

    Defaults to 24224.

  • Optional timeout?: number

    The socket timeout to set. After timing out, the socket will be idle and reconnect once the client wants to write something.

    Defaults to 3000 (3 seconds)

    Set to -1 for no timeout

  • Optional tls?: tls.ConnectionOptions

    TLS connection options. See Node docs

    If provided, the socket will be a TLS socket.

  • Optional reconnect?: Partial<ReconnectOptions>

    Reconnection options. See subtype for defaults.

  • Optional disableReconnect?: boolean

    Disable reconnection on failure. This can be useful for one-offs, or if you'd like to manually manage the connection.

    Prevents the socket from being created on client create.

    Defaults to false

  • Optional notWritableWhenDraining?: boolean

    Treat the socket as not writable when draining.

    This can reduce memory use, but only when the client slows down emission. Otherwise either the send buffer fills up or the FluentClient.sendQueue fills up.

    Defaults to false

Legend

  • Constructor
  • Method
  • Protected method
  • Static method

Generated using TypeDoc