Fluent Forward Protocol implementation for Node.js. Built upon fluent-logger-node.
$ npm install @fluent-org/logger
@fluent-org/logger
provides a fully functional client that implements the Forward protocol. It supports reconnection, acknowledgements, timeouts, event retries, and more, and exposes its functionality via a simple typed Promise interface.
For a full list of the client options and methods, see the FluentClient docs
The fluent daemon should be listening in forward mode.
A simple starting configuration for Fluentd is the following:
<source>
@type forward
port 24224
</source>
<match **.*>
@type stdout
</match>
See the FluentD docs for more info.
A similar starting configuration for Fluent Bit is the following:
[INPUT]
Name forward
Listen 0.0.0.0
Port 24224
Buffer_Chunk_Size 1M
Buffer_Max_Size 6M
[OUTPUT]
Name stdout
Match *
See the Fluent Bit docs for more info.
const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
socket: {
host: "localhost",
port: 24224,
timeout: 3000, // 3 seconds
}
});
The emit method has following signature
emit(data: Record<string, any>): Promise<void>;
emit(data: Record<string, any>, timestamp: number | Date | EventTime): Promise<void>;
emit(label: string, data: Record<string, any>): Promise<void>;
emit(label: string, data: Record<string, any>, timestamp: number | Date | EventTime): Promise<void>;
The returned Promise is resolved once the event is written to the socket, or rejected if an error occurs.
The Fluent forward protocol provides explicit support for acknowledgements, which allow the client to be sure that the event reached its destination.
Enabling acknowledgements means that the promise returned by emit
will be resolved once the client receives an explicit acknowledgement from the server.
const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
ack: {}
});
The Fluent forward protocol provides multiple message modes, Message
, Forward
, PackedForward
(default), CompressedPackedForward
. The Fluent client supports all of them.
const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
eventMode: "Message" | "Forward" | "PackedForward" | "CompressedPackedForward"
});
const logger = new FluentClient("tag_prefix", {
socket: {
host: "localhost",
port: 24224,
timeout: 3000, // 3 seconds
disableReconnect: true
}
});
// If you disable reconnections, the socket has to be manually connected,
// connect() returns a promise, which rejects on connection errors.
logger.connect();
Logger configuration:
const logger = new FluentClient("tag_prefix", {
socket: {
host: "localhost",
port: 24224,
timeout: 3000, // 3 seconds
},
security: {
clientHostname: "client.localdomain",
sharedKey: "secure_communication_is_awesome"
}
});
Fluentd configuration:
<source>
@type forward
port 24224
<security>
self_hostname input.testing.local
shared_key secure_communication_is_awesome
</security>
</source>
<match dummy.*>
@type stdout
</match>
See also the Fluentd examples.
Logger configuration:
const logger = new FluentClient("tag_prefix", {
socket: {
host: "localhost",
port: 24224,
timeout: 3000, // 3 seconds
tls: {
ca: fs.readFileSync("/path/to/ca_cert.pem")
},
},
security: {
clientHostname: "client.localdomain",
sharedKey: "secure_communication_is_awesome"
},
});
Fluentd configuration:
<source>
@type forward
port 24224
<transport tls>
ca_cert_path /path/to/ca_cert.pem
ca_private_key_path /path/to/ca_key.pem
ca_private_key_passphrase very_secret_passphrase
</transport>
<security>
self_hostname input.testing.local
shared_key secure_communication_is_awesome
</security>
</source>
<match dummy.*>
@type stdout
</match>
FYI: You can generate certificates using the fluent-ca-generate
command since Fluentd 1.1.0.
See also How to enable TLS/SSL encryption.
Logger configuration:
const logger = new FluentClient("tag_prefix", {
socket: {
host: "localhost",
port: 24224,
timeout: 3000, // 3 seconds
tls: {
ca: fs.readFileSync("/path/to/ca_cert.pem"),
cert: fs.readFileSync("/path/to/client-cert.pem"),
key: fs.readFileSync("/path/to/client-key.pem"),
passphrase: "very-secret"
},
},
security: {
clientHostname: "client.localdomain",
sharedKey: "secure_communication_is_awesome"
}
});
Fluentd configuration:
<source>
@type forward
port 24224
<transport tls>
ca_path /path/to/ca-cert.pem
cert_path /path/to/server-cert.pem
private_key_path /path/to/server-key.pem
private_key_passphrase very_secret_passphrase
client_cert_auth true
</transport>
<security>
self_hostname input.testing.local
shared_key secure_communication_is_awesome
</security>
</source>
<match dummy.*>
@type stdout
</match>
We can also specify an EventTime as a timestamp. See the EventTime docs
const FluentClient = require("@fluent-org/logger").FluentClient;
const EventTime = require("@fluent-org/logger").EventTime;
const eventTime = new EventTime(1489547207, 745003500); // 2017-03-15 12:06:47 +0900
const logger = new FluentClient("tag_prefix", {
socket: {
host: "localhost",
port: 24224,
timeout: 3000, // 3 seconds
}
});
logger.emit("tag", { message: "This is a message" }, eventTime);
The Fluent client will manage errors internally, and reject promises on errors. If you"d like to access the non-user facing internal errors, you can do so by passing errorHandler
const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
onSocketError: (err: Error) => {
console.error("error!", err)
}
});
Sometimes it makes sense to resubmit events if their initial submission failed. You can do this by specifying eventRetry
.
const FluentClient = require("@fluent-org/logger").FluentClient;
const logger = new FluentClient("tag_prefix", {
eventRetry: {}
});
@fluent-org/logger
includes a fully functional forward server which can be used as a downstream Fluent sink.
const FluentServer = require("@fluent-org/logger").FluentServer;
const server = new FluentServer({ listenOptions: { port: 24224 }});
await server.listen();
Fluentd config:
<match pattern>
@type forward
send_timeout 60s
recover_wait 10s
hard_timeout 60s
<server>
name fluent_node
host 127.0.0.1
port 24224
weight 60
</server>
<secondary>
@type file
path /var/log/fluent/forward-failed
</secondary>
</match>
See the FluentD docs for more info.
Alternatively, see the Fluent Bit docs for info on setting up Fluent Bit.
For a full list of the server options and methods, see the FluentServer docs
Apache License, Version 2.0.
This package is compatible with NodeJS versions >= 12.
The client host name (required).
Must be unique to this process
The shared key with the server. (required)
The username to authenticate with. (optional)
The password to authenticate with. (optional)
The set of accepted event modes. See Forward protocol spec
Message
will send each event to FluentD individually.
Forward
will collect the events together by tag, and send them together to FluentD in a single packet.
This is more efficient with a flushInterval
to batch these together.
PackedForward
will behave the same as Forward
, but will pack the events as part of entering the queue. This saves memory and bandwidth.
CompressedPackedForward
will behave the same as PackedForward
, but additionally compress the items before emission, saving more bandwidth.
The set of accepted Timestamp values
Acknowledgement settings
How long to wait for an acknowledgement from the server
The queue size limit (memory)
This checks the memory size of the queue, which is only useful with PackedForward
and PackedForwardCompressed
.
Defaults to +Infinity
The queue length limit (# of entries)
This checks the number of events in the queue, which is useful with all event modes.
Defaults to +Infinity
If to wait for all pending events to finish sending to the fluent server before disconnecting
Defaults to false (does not wait)
The maximum amount of time to wait for pending events to finish sending to the fluent server before disconnecting
Defaults to 0 (no maximum time)
The amount of time to wait before disconnecting the socket on disconnection
Useful to wait for acknowledgements on final flush
Defaults to 0
The constructor options passed to the client
The event mode to use. Defaults to PackedForward
The connection options. See subtype for defaults.
The fluentd security options. See subtype for defaults.
Acknowledgement settings.
The timestamp resolution of events passed to FluentD.
Passing false here means that the timestamps will be emitted as numbers (unless you explicitly provide an EventTime) Passing true means that timestamps will alwyas be emitted as EventTime object instances. This includes timestamps
Defaults to false (seconds). If true, the resolution will be in milliseconds.
How long to wait to flush the queued events
If this is 0, we don't wait at all
Defaults to 0
The limit at which the queue needs to be flushed.
Used when flushInterval is > 0 to say "flush after flushInterval ms, or when the queue reaches X size"
See the subtype for defaults
The limit at which we flush synchronously. By default, we flush asynchronously, which can be bad if we're sending 30k+ events at a time.
This sets a size limit at which we flush synchronously within emit(), which makes sure we're flushing as quickly as possible
Defaults to null (no limit)
See the subtype for defaults
The limit at which we start dropping events
Prevents the queue from growing to an unbounded size and exhausting memory.
See the subtype for defaults
The limit at which we start dropping events when we're not writable
Prevents the queue from growing too much when fluentd is down for an extended period
Defaults to null (no limit)
See the subtype for defaults
The delay after which we're not writable to start flushing events. Useful to make sure we don't drop events during short blips
Defaults to 0 (no delay)
Retry event submission on failure
Warning: This effectively keeps the event in memory until it is successfully sent or retries exhausted
See subtype for defaults
Options to control disconnection behavior
How many times to try to flush before disconnecting, wait times, etc
See subtype for defaults
Disable connection on client creation. Expects the client to call .connect to start sending messages.
Defaults to false
Event retry settings
The parameters represent an exponential backoff formula: min(maxDelay, max(minDelay, backoff^attempts * delay))
How often we retry each event
Defaults to 4
The backoff factor for each attempt
Defaults to 2
The delay factor for each attempt
Defaults to 100
The global minimum delay
The global maximum delay
Called with each error
Can be used for logging, or if the error is non-retryable, this callback can throw
the error to short circuit the callback.
The server security hardening options
The hostname of the server. Should be unique to this process
The shared key to authenticate clients with
Whether to use user authentication
A dict of users to their passwords
The server setup options
The security options.
Defaults to undefined (no auth).
Whether or not to keep the sockets alive. Sent in HELO, but currently ignored
Defaults to false
Reconnection settings for the socket
The parameters represent an exponential backoff formula: min(maxDelay, max(minDelay, backoff^attempts * delay))
The attempt count is incremented each time we fail a connection, and set to zero each time a connection is successfully made. Note this is before handshaking
The backoff factor for each attempt
Defaults to 2
The delay factor for each attempt
Defaults to 500
The global minimum delay
The global maximum delay
If connecting to a unix domain socket, e.g unix:///var/run/fluentd.sock, then specify that here.
This overrides host/port below. Defaults to undefined
.
The host (IP) to connect to
Defaults to localhost
.
The port to connect to
Defaults to 24224
.
The socket timeout to set. After timing out, the socket will be idle and reconnect once the client wants to write something.
Defaults to 3000 (3 seconds)
Set to -1 for no timeout
TLS connection options. See Node docs
If provided, the socket will be a TLS socket.
Reconnection options. See subtype for defaults.
Disable reconnection on failure. This can be useful for one-offs, or if you'd like to manually manage the connection.
Prevents the socket from being created on client create.
Defaults to false
Treat the socket as not writable when draining.
This can reduce memory use, but only when the client slows down emission. Otherwise either the send buffer fills up or the FluentClient.sendQueue fills up.
Defaults to false
Generated using TypeDoc
The authentication options for the client