Options
All
  • Public
  • Public/Protected
  • All
Menu

Class LiftbridgeClient

Create a client for working with a Liftbridge cluster.

example

Insecure connection (default).

import LiftbridgeClient from 'liftbridge';

const client = new LiftbridgeClient('localhost:9292');
await client.connect();
example

Secure TLS connection (recommended in production).

import LiftbridgeClient from 'liftbridge';

const client = new LiftbridgeClient('localhost:9292', {
     rootCertificateFile: './credentials/ca.crt',
     privateKeyFile: './credentials/private.key',
     certificateChainFile: './credentials/chain.crt'
});
await client.connect();

Hierarchy

  • LiftbridgeClient

Index

Constructors

Events

Methods

Constructors

constructor

  • new LiftbridgeClient(addresses: string[] | string, serverCredentials?: ICredentials | undefined, options?: undefined | object): LiftbridgeClient
  • A client for use with a Liftbridge cluster.

    Parameters

    • addresses: string[] | string

      String or array of strings of Liftbridge server addresses to connect to.

    • Default value serverCredentials: ICredentials | undefined = undefined

      TLS credentials to use. Defaults to insecure context.

    • Optional options: undefined | object

      Additional options to pass on to low-level gRPC client for channel creation.

    Returns LiftbridgeClient

Events

subscribe

  • Create an ephemeral subscription for the given stream. It begins receiving messages starting at the configured position and waits for new messages when it reaches the end of the stream. The default start position is the end of the stream. It throws NoSuchPartitionError if the given stream does not exist. Use subscribe().close() to close a subscription.

    example

    Subscribing to a subject.

    import LiftbridgeClient from 'liftbridge';
    import LiftbridgeStream, { StartPosition } from 'liftbridge/stream';
    
    const client = new LiftbridgeClient('localhost:9292');
    await client.connect();
    const subscription = client.subscribe(new LiftbridgeStream({
         subject: 'my-subject',
         name: 'stream-name',
         startPosition: StartPosition.EARLIEST
    }));
    
    subscription.on('data', (data: Message) => {
         console.log('subscribe on data = ', LiftbridgeMessage.toJSON(data));
    });
    
    // When ready to finish subscriptionsubscription.close();

    Parameters

    Returns ClientReadableStream<Message>

    ReadableStream of messages.

Methods

close

  • close(): void
  • Close the client connection to the Liftbridge cluster.

    Returns void

connect

  • connect(timeout?: undefined | number, retryOptions?: Partial<IBackOffOptions>): Promise<APIClient>
  • Establish a connection to the Liftbridge cluster.

    example

    Connecting to a Liftbridge cluster with a custom timeout and multiple retries.

    import LiftbridgeClient from 'liftbridge';
    
    const client = new LiftbridgeClient('localhost:9292');
    await client.connect(3000, {
         delayFirstAttempt: true,
         jitter: 'full';
         numOfAttempts: 10,
         timeMultiple: 1.5,
         startingDelay: 250
    });

    Parameters

    • Optional timeout: undefined | number

      Milliseconds before the connection attempt times out. This is set as the gRPC Deadline.

    • Optional retryOptions: Partial<IBackOffOptions>

      Retry & backoff options.

    Returns Promise<APIClient>

    Client instance.

createStream

  • Create a new stream attached to a NATS subject. Subject is the NATS subject the stream is attached to, and name is the stream identifier, unique per subject. It throws PartitionAlreadyExistsError if a stream with the given subject and name already exists.

    example

    Create a new stream on the Liftbridge cluster.

    import LiftbridgeClient from 'liftbridge';
    
    const client = new LiftbridgeClient('localhost:9292');
    await client.connect();
    await client.createStream(new LiftbridgeStream({
         subject: 'my-subject',
         name: 'stream-name',
         partitions: 5,
         maxReplication: true
    })).catch(err => {
         if (err.code !== ErrorCodes.ERR_PARTITION_ALREADY_EXISTS) {
             throw err;
         }
    });

    Parameters

    Returns Promise<CreateStreamResponse>

    CreateStreamResponse gRPC object.

publish

  • Publish a new message to the NATS subject. If the AckPolicy is not NONE and a deadline is provided, this will synchronously block until the first ack is received. If the ack is not received in time, a DeadlineExceeded status code is returned. If an AckPolicy and deadline are configured, this returns the first Ack on success, otherwise it returns null.

    example

    Publish a message to a subject.

    import LiftbridgeClient from 'liftbridge';
    import LiftbridgeMessage, { AckPolicy } from 'liftbridge/message';
    
    const client = new LiftbridgeClient('localhost:9292');
    await client.connect();
    
    await client.publish(new LiftbridgeMessage({
         subject: 'my-subject',
         key: 'message-key',
         value: 'message-value',
         ackPolicy: AckPolicy.ALL,
         partitionStrategy: 'roundrobin',
         ackInbox: 'ack.my-subject',
         headers: { 'some-header': '123' }
    }));

    Parameters

    Returns Promise<PublishResponse>

    PublishResponse gRPC object.