Your Own Web3 API

PARSIQ Data Lake SDK — Quick Introduction & Reference

Introduction

Data Lakes serve as a custom, application-specific data layer for filtering, aggregation, and analysis of blockchain-scale data contained in the Tsunami data store. This SDK provides the general infrastructure for querying Tsunami, maintaining historical data, and transparently handling blockchain reorganizations as well as various configuration options, allowing you to adjust the runtime behavior to the specific needs of your application.

The Data Lake SDK supports two main types of Data Lakes — sequential and parallel.

Sequential Data Lakes are well-suited for low-latency computations with evolving state, especially when data volumes are not too large.

Parallel Data Lakes are tailored towards processing massive volumes of historical data by allowing multiple workers to process blocks independently and out-of-order. The trade-off is that they lack the capability to maintain state.

In some cases, a combination of parallel and sequential Data Lakes is the perfect solution to enable processing of both historical and real-time data, but additional processing or conversion may be needed when transitioning from historical to real-time mode.

Basics of sequential Data Lake implementation

Let's go over the steps involved in implementing a sequential Data Lake, using a simple Data Lake gathering some basic information on CNS registry contract as a running example. We will be monitoring associations of tokens with URIs and current token ownership only, without delving into name resolution.

The complete code for the Data Lake itself (let's call it src/datalake.ts) is listed below. We will analyze it in detail in the following sections. To make it run, follow these steps:

  • Initialize a new package.
  • Install the dependencies (@parsiq/datalake-sdk, @parsiq/tsunami-client, @parsiq/tsunami-client-sdk-http, @ethersproject/abi).
  • Put the implementation in src/datalake.ts.
  • Replace the TSUNAMI_API_KEY constant in src/datalake.ts with your actual Tsunami API key.
  • Implement the entry point (e.g., main.ts), which should simply call run().
  • Put together simple Dockerfile and docker-compose.yml.
  • Build and run.

We will provide simple examples of the files involved in the following sections but for now let's just focus on the Data Lake.

import { Interface } from '@ethersproject/abi'
import * as sdk from '@parsiq/datalake-sdk'
import { ChainId, TsunamiApiClient } from '@parsiq/tsunami-client'
import {
  DatalakeTsunamiApiClient,
  TsunamiEvent,
  TsunamiFilter
} from '@parsiq/tsunami-client-sdk-http'
// Import your ABIs here, in a format suitable for decoding using @ethersproject/abi.
import cnsRegistryAbi from './cns-registry.json'
 
// Put your Tsunami API key here.
const TSUNAMI_API_KEY = ''
// This is the chain ID for Ethereum mainnet Tsunami API. Change it if you want to work with a different net.
const TSUNAMI_API_NET = ChainId.ETH_MAINNET
 
// CNS Registry contract address, replace or drop if you intend to monitor something else.
const CONTRACT_ADDRESS = '0xD1E5b0FF1287aA9f9A268759062E4Ab08b9Dacbe'
 
// topic_0 hashes of our events of interest.
const EVENT_NEW_URI_TOPIC_0 =
  '0xc5beef08f693b11c316c0c8394a377a0033c9cf701b8cd8afd79cecef60c3952'
const EVENT_TRANSFER_TOPIC_0 =
  '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
 
// Contract deployment block is used as a starting point for the Data Lake.
const CONTRACT_DEPLOYMENT_BLOCK_NUMBER = 9_082_251
 
// This defines the layout of the type-safe K-V storage.
type DatalakeStorageLayout = {
  '': any
  domains: { uri: string }
  ownership: { owner: string }
}
 
type DatalakeStorageMetaLayout = {
  '': {}
  domains: {}
  ownership: {}
}
 
// Types for decoded events follow.
type NewUriEvent = {
  tokenId: {
    _hex: string
  }
  uri: string
}
 
type TransferEvent = {
  from: string
  to: string
  tokenId: {
    _hex: string
  }
}
 
class Datalake extends sdk.AbstractMultiStorageDataLakeBase<
  DatalakeStorageLayout,
  DatalakeStorageMetaLayout,
  TsunamiFilter,
  TsunamiEvent
> {
  private cnsRegistryDecoder: Interface
 
  // Construct ABI decoders here.
  constructor() {
    super()
    this.cnsRegistryDecoder = new Interface(cnsRegistryAbi)
  }
 
  public override getProperties(): sdk.DataLakeProperties {
    return {
      id: 'DATALAKE-TEMPLATE',
      initialBlockNumber: CONTRACT_DEPLOYMENT_BLOCK_NUMBER
    }
  }
 
  // This method generates the filter used to retrieve events from Tsunami. Filter may change from block to block.
  public async genTsunamiFilterForBlock(
    block: sdk.Block & sdk.DataLakeRunnerState,
    isNewBlock: boolean
  ): Promise<TsunamiFilter> {
    return {
      contract: [CONTRACT_ADDRESS],
      topic_0: [EVENT_NEW_URI_TOPIC_0, EVENT_TRANSFER_TOPIC_0]
    }
  }
 
  // Main event handler.
  public async processTsunamiEvent(
    event: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState
  ): Promise<void | TsunamiFilter> {
    switch (event.topic_0) {
      case EVENT_NEW_URI_TOPIC_0:
        await this.processNewUriEvent(event)
        break
      case EVENT_TRANSFER_TOPIC_0:
        await this.processTransferEvent(event)
        break
    }
  }
 
  private async processNewUriEvent(event: TsunamiEvent): Promise<void> {
    // Decodes the event...
    const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!)
    const decoded = this.cnsRegistryDecoder.decodeEventLog(
      fragment,
      event.log_data!,
      [event.topic_0!, event.topic_1!]
    ) as unknown as NewUriEvent
    // ...then writes to reogranization-aware K-V storage.
    await this.set('domains', decoded.tokenId._hex, { uri: decoded.uri })
  }
 
  private async processTransferEvent(event: TsunamiEvent): Promise<void> {
    if (event.op_code !== 'LOG4') {
      return
    }
    const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!)
    const decoded = this.cnsRegistryDecoder.decodeEventLog(
      fragment,
      event.log_data!,
      [event.topic_0!, event.topic_1!, event.topic_2!, event.topic_3!]
    ) as unknown as TransferEvent
    await this.set('ownership', decoded.tokenId._hex, { owner: decoded.to })
  }
 
  // The following event handlers should be no-ops under most circumstances.
  public async processEndOfBlockEvent(
    event: sdk.Block & sdk.DataLakeRunnerState
  ): Promise<void> {}
  public async processBeforeDropBlockEvent(
    event: sdk.DropBlockData & sdk.DataLakeRunnerState
  ): Promise<void> {}
  public async processAfterDropBlockEvent(
    event: sdk.DropBlockData & sdk.DataLakeRunnerState
  ): Promise<void> {}
}
 
export const run = async (): Promise<void> => {
  const logger = new sdk.ConsoleLogger()
  logger.log('DEBUG', 'Initializing datalake...')
  const datalake = new Datalake()
  logger.log('DEBUG', 'Initializing Tsunami API...')
  const tsunami = new TsunamiApiClient(TSUNAMI_API_KEY, TSUNAMI_API_NET)
  logger.log('DEBUG', 'Initializing SDK parsiq.js...')
  const tsunamiSdk = new DatalakeTsunamiApiClient(tsunami)
  logger.log('DEBUG', 'Initializing runner...')
  const runner = new sdk.MultiStorageDataLakeRunner({
    storageConfig: {
      '': { meta: {} },
      domains: { meta: {} },
      ownership: { meta: {} }
    },
    datalake: datalake,
    tsunami: tsunamiSdk,
    log: logger
  })
  logger.log('DEBUG', 'Running...')
  await runner.run()
}

Key questions

Before beginning to implement your Data Lake, we recommend answering the following key questions regarding it.

What events are you interested in?

First of all, you need to determine what kind of information available in the Tsunami data store you want to process. The specific kinds of available events and filtering options depend in a large part on the parsiq.js you are using. The SDK itself has few assumptions about the parsiq.js, listed in detail in the reference section below. Thus as long as the Data Lake and the parsiq.js agree on the filter and event formats, everything should be in order.

In our sample case, we want to monitor the CNS registry contract on Ethereum mainnet:

const TSUNAMI_API_NET = ChainId.ETH_MAINNET
const CONTRACT_ADDRESS = '0xD1E5b0FF1287aA9f9A268759062E4Ab08b9Dacbe'

We are only interested in two log event types, NewURI, which indicates the minting of a new token, and Transfer, which indicates the transfer of token ownership:

const EVENT_NEW_URI_TOPIC_0 =
  '0xc5beef08f693b11c316c0c8394a377a0033c9cf701b8cd8afd79cecef60c3952'
const EVENT_TRANSFER_TOPIC_0 =
  '0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'

Topic 0 hashes have been computed here from the event signatures in the CNS registry ABI.

The CNS registry contract was deployed at block 9,082,251 so we will not need to look at any preceding blocks:

const CONTRACT_DEPLOYMENT_BLOCK_NUMBER = 9_082_251

Since parsiq.js serves raw events, we will want to decode those using ABI, e.g., @ethersproject/abi:

import { Interface } from '@ethersproject/abi'
import cnsRegistryAbi from './cns-registry.json'

The sample ABI file can be found in the section below.

We also have to define the types for decoded log events:

type NewUriEvent = {
  tokenId: {
    _hex: string
  }
  uri: string
}
 
type TransferEvent = {
  from: string
  to: string
  tokenId: {
    _hex: string
  }
}

So far, we have been defining only types and constants. We will start to put all of this together once we have answered all the key questions about our Data Lake.

How do you want to process the data?

You can perform more or less arbitrary computations on the events you are processing with the limiting factor being sufficient performance to be able to catch up to the block rate of the chain in question.

In the case of our example Data Lake, we won't be doing anything much with the data, we just want to store the information on the new tokens and transfers of ownership.

Having determined that, we don't need to write any code just yet so we can move on to the next question.

How do you want to store the results?

The Data Lakes can produce arbitrary side effects while processing events, but the main result of their work is supposed to be a data store with processed data that would be easy to query for consumer apps.

The SDK supports either plain old PostgreSQL as a storage backend or PostgreSQL-based Citus, which allows for a substantial degree of horizontal scalability. The storage is organized into any number of independent, type-safe key-value storages that maintain both the current state and the complete history of changes. The SDK manages the state rollbacks automatically in case of blockchain reorganizations. While the default storage model is document-based, the SDK supports inclusion of additional relational fields and indices in the storage tables. This is mostly needed for advanced use cases so we will not be using this capability in our example Data Lake.

As we have already mentioned, we want to keep track of mappings of tokens to URIs, and of current ownership information for each token. Hence, we are going to need two fairly simple storages:

type DatalakeStorageLayout = {
  '': any
  domains: { uri: string } // Keyed by token ID.
  ownership: { owner: string } // Keyed by token ID.
}

Here the domains storage maps token IDs (keys) to URIs while the ownership storage maps token IDs to owner addresses. The SDK makes sure that the current state of these two storages is represented by the domains_entities and mutations_entities tables on the storage backend while historical information is kept in the domains_mutations and ownership_mutations tables.

The SDK also requires us to always define a default storage with an empty string as a name but we are simply leaving it unused in our example.

While we won't be using custom fields in our Data Lake, we still have to to define a type declaring just that:

type DatalakeStorageMetaLayout = {
  '': {}
  domains: {}
  ownership: {}
}

Implementing the Data Lake itself

Now we are ready to implement the Data Lake as follows:

class Datalake extends sdk.AbstractMultiStorageDataLakeBase<DatalakeStorageLayout, DatalakeStorageMetaLayout, TsunamiFilter, TsunamiEvent> {

We are extending a convenient base class exposed by the SDK here. The filter and event types for the parsiq.js are imported from our client package. The types defining the storage layout are the ones we declared above while anwsering the key questions about the Data Lake.

We need to initialize the event decoder in the constructor:

    private cnsRegistryDecoder: Interface;
 
    constructor() {
        super();
        this.cnsRegistryDecoder = new Interface(cnsRegistryAbi);
    }

We also have to define the method providing the fundamental properties of our Data Lake - its name and starting block:

public override getProperties(): sdk.DataLakeProperties {
    return {
        id: 'EXAMPLE-DATALAKE',
        initialBlockNumber: CONTRACT_DEPLOYMENT_BLOCK_NUMBER,
    };
}

The next method specifies the filter that we want to use to retrieve events from Tsunami:

public async genTsunamiFilterForBlock(block: sdk.Block & sdk.DataLakeRunnerState, isNewBlock: boolean): Promise<TsunamiFilter> {
    return {
        contract: [CONTRACT_ADDRESS],
        topic_0: [EVENT_NEW_URI_TOPIC_0, EVENT_TRANSFER_TOPIC_0],
    };
}

This method may generate the filter dynamically as it can both inspect and update the state of the Data Lake, doubling as the event handler for new blocks.

Note that in case your implementation of genTsunamiFilterForBlock() updates the state, you MUST NOT alter the state if the isNewBlock parameter is false. genTsunamiFilterForBlock() is also called when the events are pre-fetched in bulk for multiple blocks. Unless your state update logic is idempotent with respect to processing the same new_block event multiple times, failing to respect the value of isNewBlock may corrupt your state.

The simple parsiq.js used in this example provides the capabilities to retrieve log events by contract address and topics, among other things.

We also have to define the event handler for Tsunami events:

public async processTsunamiEvent(event: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void | TsunamiFilter> {
    switch (event.topic_0) {
        case EVENT_NEW_URI_TOPIC_0:
            await this.processNewUriEvent(event);
            break;
        case EVENT_TRANSFER_TOPIC_0:
            await this.processTransferEvent(event);
            break;
    }
}

We will just dispatch to separate implementations depending on the topic_0 here.

Note that while genTsunamiFilterForBlock() cannot change the filter mid-block, processTsunamiEvent() can. But in case you are generating a lot of filter updates mid-block, the runner repeatedly calls the Tsunami API with new requests for block events, resulting in the worst-case quadratic complexity both time- and network-wise.

The handler for NewURI events is trivial as we just decode the raw event and update the domains K-V storage:

private async processNewUriEvent(event: TsunamiEvent): Promise<void> {
    const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
    const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
        event.topic_0!,
        event.topic_1!
    ]) as unknown as NewUriEvent;
    await this.set('domains', decoded.tokenId._hex, { uri: decoded.uri });
}

We are only writing to the storage here but the event handlers may also inspect it using the methods described in the reference section below.

It is important that you await on all the state-updating methods in your event handlers. Failing to do so may lead to undefined behavior.

The set() method is often convenient but not completely type-safe. Its fourth argument, providing the data to be written to separate fields associated with the specified key, is always optional. This is safe when we are not using those fields but will crash at run-time if we do and fail to provide the values. setRecord() is a more cumbersome but type-safe alternative.

The handler for Transfer events is pretty much the same:

private async processTransferEvent(event: TsunamiEvent): Promise<void> {
    if (event.op_code !== 'LOG4') {
        return;
    }
    const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
    const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
        event.topic_0!,
        event.topic_1!,
        event.topic_2!,
        event.topic_3!
    ]) as unknown as TransferEvent;
    await this.set('ownership', decoded.tokenId._hex, { owner: decoded.to });
}

We are almost done. The SDK provides some additional event handlers for block boundaries and reorganizations but we can leave their implementations empty in this case:

public async processEndOfBlockEvent(event: sdk.Block & sdk.DataLakeRunnerState): Promise<void> {}
public async processBeforeDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
public async processAfterDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
}

Our Data Lake is ready to roll.

Calling the Data Lake runner provided by the SDK

Invoking the runner is easy enough as we just have to construct a bunch of objects that it needs, such as the Data Lake instance, parsiq.js etc.

The storageConfig argument is required for proper database schema initialization.

export const run = async (): Promise<void> => {
  const logger = new sdk.ConsoleLogger()
  logger.log('DEBUG', 'Initializing datalake...')
  const datalake = new Datalake()
  logger.log('DEBUG', 'Initializing Tsunami API...')
  const tsunami = new TsunamiApiClient(TSUNAMI_API_KEY, TSUNAMI_API_NET)
  logger.log('DEBUG', 'Initializing SDK parsiq.js...')
  const tsunamiSdk = new DatalakeTsunamiApiClient(tsunami)
  logger.log('DEBUG', 'Initializing runner...')
  const runner = new sdk.MultiStorageDataLakeRunner({
    storageConfig: {
      '': { meta: {} },
      domains: { meta: {} },
      ownership: { meta: {} }
    },
    datalake: datalake,
    tsunami: tsunamiSdk,
    log: logger
  })
  logger.log('DEBUG', 'Running...')
  await runner.run()
}

Configuring the runner

Using the following docker-compose.yml will run the datalake with sensible settings. Take a look at the environment variables that it passes to the runner's container. The reference section below contains a detailed list of all the configuration options available.

version: '3.8'
 
services:
  datalake_runner:
    container_name: datalake_template_container
    environment:
      PGHOST: pg
      PGPORT: 5432
      PGDATABASE: datalake
      PGUSER: datalake
      PGPASSWORD: datalake
      DLSDK_SHOW_CONFIG: "true"
      # Set DLSDK_VERBOSE to "false" and DLSDK_NO_DEBUG to "true" for less console output.
      DLSDK_VERBOSE: "true"
      DLSDK_NO_DEBUG: "false"
      #DLSDK_VERBOSE: "false"
      #DLSDK_NO_DEBUG: "true"
      DLSDK_POLL_TSUNAMI: "true"
      DLSDK_NO_RMQ: "true"
      # DLSDK_RECOVERY_BLOCK_FRAME controls the number of blocks retrieved at once in historical/batch/recovery mode.
      DLSDK_RECOVERY_BLOCK_FRAME: 200
      DLSDK_FRAME_LEVEL_ISOLATION: "true"
      DLSDK_MEMORY_CACHE: "true"
      DLSDK_RECREC_SLEEP_MS: 100
      DLSDK_RECREC_ATTEMPTS: 5
      DLSDK_MODE_MUTEX_DELAY_MS: 10
      DLSDK_PULL_SLEEP_MS: 100
      DLSDK_PULL_AWAKEN_MS: 500
      DLSDK_FORCE_GC: 5
      DLSDK_LOG_MEMORY_USAGE: 5000
    depends_on:
      - pg
    image: "datalake-template-dev:latest"
    # Drop the '--reset' CLI option here if you don't want to reset the datalake's state on each run.
    entrypoint: /bin/bash ./run.sh --reset
    #entrypoint: /bin/bash ./run.sh
 
  pg:
    container_name: postgres_datalake_container
    image: postgres
    environment:
      POSTGRES_DB: datalake
      POSTGRES_USER: datalake
      POSTGRES_PASSWORD: datalake
    ports:
      - "54329:5432"
    volumes:
      - ./volumes/postgres/:/var/lib/postgresql/data
    stop_grace_period: 1m

Sample misc. files for the CNS registry Data Lake

This section contains samples for the rest of the files needed to create a complete Data Lake.

Dockerfile

FROM node:16-alpine AS datalake-template-dev
RUN apk add bash
RUN npm install --global wait-port
RUN mkdir -p /home/node/runner && chown -R node:node /home/node/runner
USER node
WORKDIR /home/node/runner
RUN mkdir -p /home/node/runner/dist
COPY --chown=node:node package.json package-lock.json ./
COPY --chown=node:node local_packages/ ./local_packages/
RUN npm ci --ignore-scripts
COPY --chown=node:node tsconfig.json run.sh main.ts ./
COPY --chown=node:node src/ ./src/
RUN npx tsc
RUN rm -rf package.json package-lock.json local_packages/ tsconfig.json main.ts src/

main.ts

import { run } from './src/datalake'
 
run()

run.sh

#!/bin/bash
wait-port pg:5432
node --expose-gc --optimize-for-size dist/main.js "$@"

src/cns-registry.json

[
  {
    "anonymous": false,
    "inputs": [
      {
        "indexed": true,
        "internalType": "uint256",
        "name": "tokenId",
        "type": "uint256"
      },
      {
        "indexed": false,
        "internalType": "string",
        "name": "uri",
        "type": "string"
      }
    ],
    "name": "NewURI",
    "type": "event"
  },
  {
    "anonymous": false,
    "inputs": [
      {
        "indexed": true,
        "internalType": "address",
        "name": "from",
        "type": "address"
      },
      {
        "indexed": true,
        "internalType": "address",
        "name": "to",
        "type": "address"
      },
      {
        "indexed": true,
        "internalType": "uint256",
        "name": "tokenId",
        "type": "uint256"
      }
    ],
    "name": "Transfer",
    "type": "event"
  }
]

Basics of parallel Data Lake implementation

Advantages and limitations of parallel Data Lakes

Parallel Data Lakes are designed to achieve high performance while churning through historical data, by throwing multiple independent workers at a single task. This allows to leverage the hardware resources available to increase throughput, with the storage backend usually becoming the limiting factor to performance gains.

Naturally, this comes with a price. Since blocks and events can and will be processed out of order and simultaneously by separate workers, no meaningful concept of evolving state can be computed at the parallel stage. In algebraic terms, the state updates affected by the Data Lake workers should be commutative and associative so that any computation order would produce the same result, up to equivalence from the eventual data consumer's standpoint. Ideally, the computations should also be idempotent, which improves data consistency guarantees and the ease of recovering from failures.

Once again, since blocks are processed out of order, history of state is not maintained and reorganizations are not supported, which limits parallel Data Lakes to operating on historical data. This means that in case you need to continue in real-time mode, you will need to implement a separate sequential Data Lake and potentially a custom stage for converting the data collected at the parallel stage to a more suitable form. You can think of this as the 'reduce' step to parallel Data Lake's 'map' step. The interfaces of parallel and sequential Data Lakes are somewhat dissimilar but it is usually possible to achieve a high degree of code reuse by implementing a helper class employed by both.

Implementing a parallel Data Lake

While the fundamental concepts of parallel Data Lakes are very similar to those underlying sequntial Data Lakes, there are important differences in both the running mode of parallel datalakes and the capabilities exposed to them by the SDK.

We will be using a simplified version of the Uniswap V2 Data Lake as a running example here. The aim is to collect swap turnover data for all the deployed pairs.

Basic concepts of the parallel Data Lake runner

For the purposes of work distribution and sequencing, parallel Data Lake SDK employs the following three concepts:

  • Tasks
  • Task groups
  • Filter groups
Tasks

A task is a small unit of work, consisting of a range of blocks (typically a few hundreds or thousands of those) and associated with a specific filter group. Tasks are distributed to workers that are effectively sequential Data Lake runners operating in batch mode with a few performance-related tweaks.

Task groups

The primary purpose of task groups is to reduce the probability of cross-worker deadlocks when using plain old PostgreSQL as a storage backend. This is achieved by organizing the data into multiple sets of tables using the same schema, each of which is associated with a single task group. Workers never take more than one task from any task group at the same time, ensuring that every worker is writing to its own set of tables.

Filter groups

Filter groups serve to separate heterogeneous workloads into homogeneous tasks. A filter group is identified by a unique string and it associates block ranges with Tsunami filters to be employed by workers processing tasks belonging to that filter group. New filter groups can be created at run-time, causing the creation of new tasks. However, existing filter groups must not be altered because the runner's behavior is undefined in that case.

For example, the uniswap Data Lake organizes its work into two filter groups: factory (responsible for monitoring the factory contract to identify the deployed pools) and pairs (responsible for monitoring Swap events).

Data Lake design

All the initial steps in implementing parallel Data Lakes, such as obtaining a parsiq.js and designing the storage layout, are the same for sequential Data Lakes. As far as storage layout is concerned, remember that your updates should be commutative and associative so keep that in mind while designing the schema. It would also be advantageous to make your state updates:

  • Insert-only (never updating existing keys)
  • Avoiding any reads

For Uniswap, we will use a layout with two separate storages: for keeping the turnover data and for keeping the information on individual pairs:

type StorageLayout = {
  '': [string, string, string, string] // amount0In, amount1In, amount0Out, amount1Out
  pairs: [string, string, string, string, number] // token0, token1, pair, index, block_number
}

The key for the default storage is a combination of contract address and block number. The key for the pairs storage is simply the contract address.

Implement your Data Lake

You will need to implement a class (e.g., MyParallelDataLake) that conforms to the interface IParallelMultiStorageDataLake<StorageValueLayout, StorageMetaLayout, TsunamiFilter, TsunamiEvent>. You may start from scratch but it is recommended to extend the abstract base class provided by the SDK: AbstractParallelMultiStorageDataLakeBase<StorageValueLayout, StorageMetaLayout, TsunamiFilter, TsunamiEvent>.

You will need to implement the following methods.

getProperties(): ParallelDataLakeProperties

This method is mandatory for parallel Data Lakes as it specifies the fundamental Data Lake properties that have no reasonable defaults.

  public getProperties(): sdk.ParallelDataLakeProperties {
    return {
      id: 'UNISWAP-V2-DATALAKE',
      initialBlockNumber: 0, // ...or the deployment block of the factory contract for optimum performance.
      lastBlockNumber: 16000000, // ...or the actual block number up to which you want the data.
      groupSize: 6000, // Group and task size are important performance tuning parameters.
      taskSize: 2000,
      groups: new Set(['factory', 'pairs']),
      // Imporantly, this will lock the processing of Swap events until all the pairs have been identified.
      lockedGroups: new Set(['pairs']),
    };
  }

You will want to use the group size so that the resulting number of task groups is roughly twice the maximum possible number of workers you expect to be using but there should be at least a couple of meaningfully sized tasks within each task group (i.e., larger or equal to the block frame size that you intend to use). Group size becomes less important if you are using single-storage mode rather than the SDK's default custom sharding. However, it is still advisable to keep the number of the task groups at least roughly equal to the number of workers as it can assist with smoother task distribution.

genTsunamiFilters(): Promise\<FilterGroups<TsunamiFilter>>

This method specifies the initial filter groups for your Data Lake.

For Uniswap:

  public async genTsunamiFilters(): Promise<sdk.FilterGroups<TsunamiFilter>> {
    const filters = new Map();
    filters.set('factory', { group: 'factory', filters: [{
      block: 0, filter: {
        topic_0: '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9', // The topic for PairCreated events.
        contract: '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f', // Uniswap V2 factory contract address.
      }
    }] });
    filters.set('pairs', { group: 'pairs', filters: [{
      block: 0, filter: {
        topic_0: '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822', // The topic for Swap events.
      }
    }] });
    return filters;
  }
processNewBlockEvent(block: NewBlockData & DataLakeRunnerState): Promise<void>

This is the event handler for a new block event. Since the parallel version of the runner does not use this method to generate the filter, more often than not the implementation can be a no-op unless you need some specific start-of-block bookkeeping.

processTsunamiEvent(filterGroup: string, event: TsunamiEvent & TimecodedEvent & DataLakeRunnerState): Promise\<FilterGroups<TsunamiFilter> | void>

This method is similar to the analogous method for sequential Data Lakes. It also serves as a key processing point for the Data Lake but, in addition to the filter group name being passed to it as a first argument, the semantics of this method's return value are very different from the sequential case.

A non-void return value means that the worker wants to initialize a whole new filter group (or groups) and create fresh tasks as appropriate. This could be useful, for example, if we chose a different approach to the Uniswap Data Lake's design where every PairCreated event would create a fresh filter group for that specific contract address together with associated tasks. Turns out, while that approach is viable, it is less performant than the one we have chosen here. In other applications, it might be the right way to go.

An implementation for the Uniswap Data Lake, along with auxiliary methods, would be as follows:

  public async processTsunamiEvent(filterGroup: string, evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void | sdk.FilterGroups<TsunamiFilter>> {
    if (filterGroup === 'factory') {
      return await this.processPairCreatedEvent(evt);
    } else {
      await this.processPairEvent(evt);
    }
  }
 
  public async processPairCreatedEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
    const fragment = this.uniswapFactoryInterface.getEvent(evt.topic_0!);
    const decoded = this.uniswapFactoryInterface.decodeEventLog(fragment, evt.log_data!, [evt.topic_0!, evt.topic_1!, evt.topic_2!]) as unknown as PairCreatedEvent;
    await this.apis.pairs.set(decoded[2].toLowerCase(), [
      decoded[0],
      decoded[1],
      decoded[2],
      new BigNumber(decoded[3]._hex).toFixed(),
      evt.block_number!,
    ]);
  }
 
  public async processPairEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
    if (this.pairs === undefined) {
      this.pairs = await this.getPairList();
    }
    if (!this.pairs.has(evt.contract.toLowerCase())) {
      return;
    }
    this.pair = evt.contract;
    this.block = evt._DLSDK.block_number;
    await this.processSwapEvent(evt);
  }
 
  private async processSwapEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
    const fragment = this.uniswapPairInterface.getEvent(evt.topic_0!);
    const decoded = this.uniswapPairInterface.decodeEventLog(fragment, evt.log_data!, [evt.topic_0!, evt.topic_1!, evt.topic_2!]) as unknown as SwapEvent;
    await this.updateVolumes(decoded.amount0In, decoded.amount1In, decoded.amount0Out, decoded.amount1Out);
  }
 
  private async updateVolumes(amount0In: Amount, amount1In: Amount, amount0Out: Amount, amount1Out: Amount): Promise<void> {
    const current = this.turnover.get(this.pair) ?? ['0', '0', '0', '0'];
    const updated: Turnover = [
      new BigNumber(current[0]).plus(new BigNumber(amount0In._hex)).toFixed(),
      new BigNumber(current[1]).plus(new BigNumber(amount1In._hex)).toFixed(),
      new BigNumber(current[2]).plus(new BigNumber(amount0Out._hex)).toFixed(),
      new BigNumber(current[3]).plus(new BigNumber(amount1Out._hex)).toFixed(),
    ];
    this.turnover.set(this.pair, updated);
  }
 
  public async getPairList(): Promise<Set<Address>> {
    const result = new Set<Address>();
    const properties = this.getProperties();
    // This assumes that the datalake works in 'single_storage' mode.
    await this.apis.pairs.withPg(async (pg) => {
      const res = await pg.query(
        ' SELECT entity_key FROM ' + pg.escapeIdentifier('pairs_0_entities') + ' '
      );
      for (const row of res.rows) {
        result.add(row.entity_key);
      }
    });
    return result;
  }

Note that the event handler does not, in itself, change the Data Lake's state and turnover data is updated in bulk at the end of the block.

Note that the code above implies that the Data Lake object has some internal state, which should be initialized as follows:

  private apis: sdk.ISimpleStorageMapping<StorageLayout>;
  private readonly uniswapFactoryInterface;
  private readonly uniswapPairInterface;
  private pair: string;
  private block: number;
  private turnover: Map<Address, Turnover>;
  private pairs?: Set<Address>;
 
  constructor(apis: sdk.ISimpleStorageMapping<StorageLayout>) {
    this.apis = apis;
    this.uniswapFactoryInterface = new Interface(uniswapFactoryAbi);
    this.uniswapPairInterface = new Interface(uniswapPairAbi);
    this.pair = '';
    this.block = 0;
    this.turnover = new Map();
  }
 
  public setApis(apis: sdk.ISimpleStorageMapping<StorageLayout>): void {
    this.apis = apis;
  }
processEndOfBlockEvent(event: NewBlockData & DataLakeRunnerState): Promise<void>

End of block event handler. This can be useful if you intend to accumulate some data within the block boundaries before dumping it in a single update at the end. For Uniswap:

  public async processEndOfBlockEvent(evt: sdk.Block & sdk.DataLakeRunnerState): Promise<void> {
    for (const key of this.turnover.keys()) {
      await this.apis[''].set(this.genKey(key, this.block), this.turnover.get(key)!);
    }
    this.turnover.set('turnover', new Map());
  }
 
  private genKey(pair: string, block: number): string {
    return pair + '-' + block.toString().padStart(10, '0');
  }
unlockGroups(finished: Set<string>, locked: Set<string>): Set<string>;

In case you don't need task ordering, the implementation can be as simple as return new Set();. Since our approach to Uniswap Data Lake involves ordering between factory and pairs filter groups, we have to implement this as follows:

  public unlockGroups(finished: Set<string>, locked: Set<string>): Set<string> {
    if (finished.has('factory') && locked.has('pairs')) {
        return new Set(['pairs']);
    }
    return new Set();
  }

Or, in simple terms, tasks belonging to the pairs filter group should be unlocked if and only if all tasks within this factory filter group are marked as done.

The interface for the parallel Data Lakes includes a few more methods but the abstract base class supplies reasonable implementations for those.

Running and configuring your Data Lake

Running a parallel Data Lake is similar to running a sequential one as you just need to use ParallelMultiStorageDataLakeRunner. Note that you can and should run multiple instances of the runner safely. The SDK will take care of safe schema initialization and task distribution across your workers.

SDK reference

This section covers various aspects of the SDK in greater detail.

SDK requirements for parsiq.jss

The SDK makes relatively few assumptions about the Tsunami API client's interface, allowing it to interoperate smoothly with different implementations exposing different capabilities. The assumptions are as follows:

  • The client must be capable of event look-up by block hash, using the optional block_hash field in the TsunamiFilter.
  • The client must be capable of additional filtering on top of block_hash, by limiting the events returned to those after a specific timecode provided in the after_timecode field in the TsunamiFilter.
  • The client must be capable of event look-up by block number range, using the optional block_range field in the TsunamiFilter.
  • The client must be capable of grouping the events by block_hash.
  • The events returned by the client contain the timecode field, which imposes a total order on all events.
  • The events are returned in ascending timecode order.
  • The parsiq.js must also be able to provide basic information on blocks, i.e., hashes, parents and data on the current top block.

Advanced methods in the sequential Data Lake interface

processEndOfBlockEvent(event: NewBlockData & DataLakeRunnerState): Promise<void>

Intended for advanced use cases, this method allows you to run some additional logic and affect extra state changes at the end of the block. The timecode used for state updates is the same as that of the last event processed. Unless you require this in your Data Lake, simply leave the implementation empty.

processBeforeDropBlockEvent(event: DropBlockData & DataLakeRunnerState): Promise<void> and processAfterDropBlockEvent(event: DropBlockData & DataLakeRunnerState): Promise<void>

These two methods are intended for advanced use cases where the Data Lake needs to update the state or do some auxiliary processing in case of blockchain reorganizations. Note that the runner takes care of keeping the state consistent without involving the Data Lake. So unless you have a very good reason to do otherwise, your implementations of these two methods should do nothing at all.

In case you do wish to perform state updates on reorganizations, keep in mind the following:

  • processBeforeDropBlockEvent() can inspect the pre-reorg state about to be destroyed but all changes it makes to it will be destroyed as well.
  • processAfterDropBlockEvent() can make changes to the state that will persist after the reorg's processing is finished, but it can't observe the pre-reorg state directly since it is already gone by the time this method is called.
  • Therefore, communication between these two methods is one of the rare cases when there is a good reason for your Data Lake to use its own object properties to keep the data.

In addition to these methods, you may want to override the following methods as well, although this is optional.

getProperties(): DataLakeProperties

This method is supposed to return an object describing the fundamental, immutable properties of the Data Lake. Note that these should never change, whether at run-time or between runs. It only makes sense to ever change these values if you intend to reset your Data Lake.

Except for id, all fields of DataLakeProperties are optional, and the runner will assume 'the reasonable defaults' if those are omitted. The fields are as follows:

id

id is mostly used for aesthetic purposes but it also serves as the primary key for the table keeping the Data Lake runner's internal state. Defaults to "UNNAMED-DATALAKE".

initialBlockNumber

This determines the starting point of your Data Lake and should be used for cases where you don't want to inspect the entire blockchain, e.g., if you are monitoring a specific contract address and don't want to process the blocks before the contract's original deployment. Defaults to 0, i.e., the genesis block of your specific blockchain.

extraSchemaInit

This allows you to make changes to the default Data Lake schema, although obviously you must keep it compatible with the SDK's expectations. The main intended purpose would be defining additional GIN indices on your Data Lake's value storages. Leaving it undefined means you do not need to make any alteration to the default Data Lake schema.

For example, the NFT ownership app defines the following additional indices on its default schema:

    CREATE INDEX idx_gin_entities_entity_value ON entities USING GIN ((entity_value -> 'owner'));
    CREATE INDEX idx_gin_mutations_entity_value ON mutations USING GIN ((entity_value -> 'owner'));

setTsunamiApiClient(tsunamiClient: ITsunamiApiClient<TsunamiFilter, TsunamiEvent>): void

If you need your Data Lake to query via the Tsunami API directly (e.g., to inspect historical data while processing fresh events), you will also need to implement the setTsunamiApiClient() method. Note that this is reserved for advanced use cases and should not be done unless you understand very well what you are doing and you are willing to accept the computational costs.

Inspecting and modifying your Data Lake's state

Both IStateStorage interface and the AbstractDataLakeBase expose a number of methods for inspecting and changing the state of your Data Lake. We already saw .get() and .set() in the examples above so let's examine the entire set of capabilities.

Inspecting the current state

get(key: string): Promise<ValueType | null>, getRecord(key: string): Promise<ValueRecord<ValueType, MetaType> | null>

Returns the current value of the entity specified by the key. getRecord() differs from get() in that it does support fetching of meta fields in addition to the value itself.

has(key: string): Promise<boolean>

Returns false if the value of the entity is null, or true otherwise.

Modifying the state

set(key: string, value: ValueType | null, meta?: MetaType): Promise<void>, setRecord(key: string, value: ValueRecord<ValueType, MetaType> | null): Promise<void>

Sets the current value of the entity to value. The block number and timecode associated with this state change are automatically provided to the state storage by the Data Lake runner. set() is more convenient when you are not using meta field, as the second argument is optional, but it also means it lacks type safety. setRecord() is type-safe, but more cumbersome to use.

delete(key: string): Promise<void>

Equivalent to calling .set(key, null).

Inspecting the historical state

getEntityAtBlockNumber(key: string, block: BlockNumber): Promise<ValueType | null>, getEntityAtTimecode(key: string, timecode: Timecode): Promise<ValueType | null>, getEntityRecordAtBlockNumber(key: string, block: BlockNumber): Promise<ValueRecord<ValueType, MetaType> | null>, getEntityRecordAtTimecode(key: string, timecode: Timecode): Promise<ValueRecord<ValueType, MetaType> | null>

Similar to get() but returns the value of the entity at some point in the past. The point in the past is specified either as a block number or a timecode.

getStateAtBlockNumber(block: BlockNumber): Promise<Map<string, ValueType>>, getStateAtTimecode(timecode: Timecode): Promise<Map<string, ValueType>>, getStateRecordsAtBlockNumber(block: BlockNumber): Promise<Map<string, ValueRecord<ValueType, MetaType>>>, getStateRecordsAtTimecode(timecode: Timecode): Promise<Map<string, ValueRecord<ValueType, MetaType>>>

Returns the entire state of the Data Lake at some point in the past as a map from entity keys to values. Entities the value of which is null are not included. The point in the past is specified as a block number or timecode.

getEntityHistory(key: string): Promise<Mutation<ValueType>[]>, getEntityHistoryRecords(key: string): Promise<Mutation<ValueRecord<ValueType, MetaType>>[]>

Returns the entire history of value changes of a given entity, together with associated block numbers and timecodes. Note that null values are also included.

Details on state storage organization

Every independent state storage uses two PostgreSQL tables to keep the data — entities and mutations (possibly prefixed by the name of the storage if used in the context of a multi-storage runner).

The entities table simply keeps the current, most up-to-date values of all entities and is used for fast querying of the current state.

The mutations table, on the other hand, keeps the entire history of updates for all the entities. This serves two purposes:

  • The mutations table is used for querying the historical state, e.g., what value did a given entity have at a certain point in the past?
  • It is also used for reverting the state to a certain point in the past in case of blockchain reorganizations. This allows the Data Lake runner to maintain the proper state after reorg without involving the Data Lake itself.

When using --memory-cache, similar data structures are also used in memory to keep the updates that have happened in the current block frame. These are dumped into the database at the frame boundary.

Alternate Data Lake interfaces and base classes

IDataLake<ValueType extends unknown, MetaType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}> and AbstractDataLakeBase<ValueType extends unknown, MetaType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}>

This simplified interface and its associated Data Lake base class are meant for use with basic DataLakeRunner, which is a thin wrapper over MultiStorageDatalakeRunner. Everything, including runtime configuration, remains the same as with standard sequential Data Lakes but this set-up supports only one K-V storage. The upside is that typing and interacting with the storages are somewhat simpler and more convenient than in the multi-storage case.

AbstractEffectfulMultiStorageDataLakeBase<ValueTypes extends { '': unknown, 'effectstorage': { effects: EffectType[] } }, MetaTypes extends { [key in keyof ValueTypes]: {} }, EffectType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}>

This base class can serve as a convenient starting point in case you want arbitrary reversible side effects in your Data Lake. You will need to define the type for your side effects and implement the methods for performing and reverting the side effects in question. The base class implements a standard sequential Data Lake in other respects.

Configuration of sequential Data Lakes

Configuring a Data Lake involves several components:

  • Environment variables are mostly used for things that are more convenient to specify in your docker-compose or k8s configuration files, and should rarely, if ever, change throughout the lifetime of your Data Lake.
  • Command-line options chiefly control the running modes of your Data Lake, and some of these are used when you are interacting with your Data Lake through shell. These can also be overriden by passing a fixed configuration to the .run() method of DataLakeRunner.
  • constructor() arguments are intended for stuff that can only be passed to the runner at run-time, such as database connection pool or storage configuration for multi-storage runner.

Where multiple methods of configuring the same value are available, command-line options take precedence over environment variables and run() arguments take precedence over command-line options.

We will now go over configuring all the vital aspects of your Data Lake, covering various options where available.

Configuration reference

This section contains reference tables for all the configuration options. Refer to the subsequent section for details.

Constructor argumentTypeDescription
datalakeIMultiStorageDataLake<ValueTypes, TsunamiFilter, TsunamiEvent>Data Lake instance to run.
dbPoolpg.PoolDB connection pool to use (optional). If omitted, runner will instantiate its own DB connection pool using the standard PG* enviornment variables.
logILoggerLogger instance to use (optional). NullLogger will be used by default.
rmqamqplib.ConnectionRabbitMQ connection to use (optional). If omitted, runner will instantiate its own.
storageConfigStorageOptionMapping<ValueTypes>Only for MultiStorageDataLakeRunner. Describes state storage configuration.
tsunamiITsunamiApiClient<TsunamiFilter, TsunamiEvent>Tsunami API client to use.

Note that for boolean environment variables, i.e., 1, yes and true (any case) are interpreted as true. All other values are interpreted as false.

run() argumentEnvironment variableCommand-line option (abbrev.)TypeDefault valueDescription
N/ADLSDK_DATALAKE_IDN/AstringUNNAMED-DATALAKEUnique Data Lake ID.
N/ADLSDK_GCL_PROJECT_IDN/AstringN/AProject ID for GoogleCloudLogger.
N/ADLSDK_GCL_LOG_NAMEN/Astring(empty string)Log name for GoogleCloudLogger.
N/ADLSDK_GOOGLE_APPLICATION_CREDENTIALSN/AstringN/AFilename of credentials for GoogleCloudLogger.
batchN/A--batch (-b)number(off)Attempt to fetch and process the next X blocks, then exit.
current_blockN/AN/Anumber(off)Specify the expected current block for consistency checks when running as a worker process for the parallel SDK.
flush_cacheDLSDK_FLUSH_CACHE--flush-cachenumber(turned off)Causes the runner to flush the memory cache after every X processed blocks.
force_downgradeN/A--force-downgradebooleanfalseAllows the runner to proceed if the package version is lower than the DB version.
force_gcDLSDK_FORCE_GC--force-gc (-f)number(turned off)Force GC every X block frames.
frame_level_isolationDLSDK_FRAME_LEVEL_ISOLATION--frame-level-isolation (-f)booleanfalseUse block frame-level transactions instead of block-level transactions.
insert_onlyDLSDK_INSERT_ONLY--insert-onlybooleanfalseIndicates that the Data Lake never overwrites existing keys in the storage, allowing simpler and more performant implementation of set()/dumpCache().
keep_last_blocksDLSDK_KEEP_LAST_BLOCKS--keep-last-blocksnumber1000Specifies the number of last blocks to keep when purging data.
log_memory_usageDLSDK_LOG_MEMORY_USAGE--log-memory_usage (-o)number(turned off)Log memory usage every X milliseconds.
max_blockDLSDK_MAX_BLOCK--max-blocknumber(off)Specify the maximum block number to process.
memory_cacheDLSDK_MEMORY_CACHE--memory-cache (-m)booleanfalseUse memory caching for state updates.
metrics_serverDLSDK_METRICS_SERVER--metrics-serverbooleanfalseIndicates that the metrics server should be started.
metrics_server_portDLSDK_METRICS_SERVER_PORT--metrics-server-portnumber8089Port number to use for the metrics server.
metrics_server_interval_sqlDLSDK_METRICS_SERVER_INTERVAL_SQL--metrics-server-interval-sqlstring1 minuteTimeframe for the metrics server as an SQL interval.
mode_mutex_delay_msDLSDK_MODE_MUTEX_DELAY_MSN/Anumber10Delay before attempting to grab the mode mutex again after failure.
mutations_onlyDLSDK_MUTATIONS_ONLY--mutations-onlybooleanfalseTurns on the mutations_only mode for the K-V storages.
no_bdmDLSDK_NO_BDM--no-bdmbooleanfalseAllow the runner to work without BlockDataManager, thus ignoring blockhash ancestry checks.
no_consistency_checkDLSDK_NO_CONSISTENCY_CHECK--no-consistency-checkbooleanfalseDisables database consistency checks on start-up.
no_debugDLSDK_NO_DEBUG--no-debug (-d)booleanfalseDiscard all debug-level log records.
no_event_batchingDLSDK_NO_EVENT_BATCHING--no-event-batching (-x)booleanfalseDo not attempt to pre-fetch Tsunami events for multiple blocks.
no_recursive_recoveryDLSDK_NO_RECURSIVE_RECOVERY--no-recursive-recovery (-y)booleanfalseDo not attempt to recover from cascade reorgs.
no_rmqDLSDK_NO_RMQ--no-rmq (-n)booleanfalseDo not listen to RabbitMQ for fresh blocks.
no_shutdown_handlersN/AN/AbooleanfalsePrevents the runner for installing its own signal handlers, e.g., when being managed by another process. Used when running as a worker process for the parallel SDK.
parallelN/AN/AbooleanfalseEngages specialized runner mode for use with the parallel SDK. The semantics of this option are, unfortunately, very different from all other options. It should never be set manually as doing so will either have no effect at all or cause strange and difficult to predict behavior of the runner. Strictly for internal use by the parallel SDK.
poll_tsunamiDLSDK_POLL_TSUNAMI--poll-tsunami (-p)booleanfalsePoll Tsunami in background for fresh blocks.
pull_awaken_msDLSDK_PULL_AWAKEN_MSN/Anumber500--poll-tsunami polling interval.
pull_sleep_msDLSDK_PULL_SLEEP_MSN/Anumber10--poll-tsunami sleep interval.
purge_block_dataDLSDK_PURGE_BLOCK_DATA--purge-block-databooleanfalseIndicates that the block data should be purged in addition to storage mutation records when purging old data.
purge_every_blocksDLSDK_PURGE_EVERY_BLOCKS--purge-every-blocksnumber(turned off)Indicates that the old mutation records should be purged from the db every X blocks.
queue_nameDLSDK_RMQ_QUEUE--queue-name (-q)string(empty string)RabbitMQ queue name to use.
recovery_block_frameDLSDK_RECOVERY_BLOCK_FRAMEN/Anumber3Block frame size when fast-forwarding.
recrec_attemptsDLSDK_RECREC_ATTEMPTSN/Anumber5Maximum number of cascade reorg recovery attempts.
recrec_sleep_msDLSDK_RECREC_SLEEP_MSN/Anumber100Cascade reorg recovery delay.
resetN/A--reset (-r)booleanfalseNuke the Data Lake and reset to the initial state.
rmq_quorum_queueDLSDK_RMQ_QUORUM_QUEUE--rmq-quorum-queue (-k)booleanfalseUse the quorum RabbitMQ queue type instead of classic.
rmq_sleep_msDLSDK_RMQ_SLEEP_MSN/Anumber10Delay before attempting to fetch the next message from RabbitMQ.
rmq_urlDLSDK_RMQ_URLN/Astring(empty string)RabbitMQ URL.
set_lastN/A--set-last (-l)number(off)Reset the Data Lake to the specified block.
show_configDLSDK_SHOW_CONFIG--show-configbooleanfalseCauses the SDK runner to log the complete effective configuration on start-up.
sparse_blocksDLSDK_SPARSE_BLOCKS--sparse-blocksbooleanfalseEngages a different event retrieval mode for Tsunami API client. Blocks with no relevant events are ignored, potentially speeding up the processing in some applications.
statsN/A--stats (-s)booleanfalseDisplay block processing time stats and exit.
stats_fromN/A--stats-fromnumberfirst known blockStarting block for --stats.
stats_toN/A--stats-tonumberlast known blockEnding block for --stats.
test_modeN/AN/AbooleanfalseFor running internal test suites only.
tsunami_recovery_attemptsDLSDK_TSUNAMI_RECOVERY_ATTEMPTSN/Anumber20Maximum number of retries on Tsunami API failures.
tsunami_recovery_sleep_msDLSDK_TSUNAMI_RECOVERY_SLEEP_MSN/Anumber3000Delay between repeated attempts to query Tsunami on failures.
unsafeN/A--unsafe (-u)booleanfalseEngages unsafe mode which omits some important sanity checks in the runner in exchange for a minor performance boost.
use_citusDLSDK_USE_CITUS--use-citusbooleanfalseMakes StateStorage to run in Citus-compatible mode.
verboseDLSDK_VERBOSE--verbose(-z)booleanfalseProduce more detailed logs while running.

Database configuration

Data Lake runner uses PostgreSQL as a storage backend by default to persist both its own state and the state of the Data Lake. It can't function when it isn't available. Configuring PostgreSQL is simple enough.

The runner will make use of all the standard PG* environment variables supported by the pg package since it invokes new pg.Pool() without any arguments. Unless you are using a particularly unusual set-up, you will need to configure PGHOST, PGPORT, PGUSER, PGPASSWORD and PGDATABASE.

As an alternative, you may skip the configuration through the environment variables entirely and pass the database connection pool a construct in your own code as an argument to the runner:

const runner = new sdk.DataLakeRunner({
  dbPool: new pg.Pool(),
  datalake: datalake,
  tsunami: tsunami,
  log: new sdk.ConsoleLogger()
})

Note that you don't need to initialize the database schema by hand, the runner will do that automatically.

When using Citus as a storage backend instead of plain old PostgreSQL, you need to make sure to turn on Citus-compatible mode either by passing a --use-citus command-line option to the runner or setting the DLSDK_USE_CITUS environment variable to true.

RabbitMQ configuration and alternatives for retrieving new blocks

RabbitMQ serves as the default transport for incoming new block notifications as well as notifications about blockchain reorganizations. If you are using RMQ, you will need to configure it using the environment variables:

  • DLSDK_RMQ_URL is used to specify the connection parameters.
  • DLSDK_RMQ_QUEUE should specify the name of the queue that the runner will listen to.
  • DLSDK_RMQ_SLEEP_MS specifies the queue polling interval for the runner. As the runner is stateful and processes incoming events synchronously, it polls the RMQ using .get(), and sleeps for the specified number of milliseconds before attempting to fetch the next message.

RMQ queue name can also be specified through a command-line option --queue-name.

The SDK will default to classic RMQ queue type, but using the --rmq-quorum-queue command line option or the equivalent DLSDK_RMQ_QUORUM_QUEUE environment variable allows switching to the quorum queue type instead.

Alernatively, as with the database connection pool, you can construct your own amqplib connection and pass it directly to the runner (DLSDK_RMQ_URL will be ignored if you go with this option):

const runner = new sdk.DataLakeRunner({
  rmq: await amqplib.connect('...'),
  datalake: datalake,
  tsunami: tsunami,
  log: new sdk.ConsoleLogger()
})

Note that the runner can function without RabbitMQ by actively polling Tsunami for new blocks. To employ this mode, you need to run your Data Lake using the --no-rmq and --poll-tsunami command-line options. You can also use --poll-tsunami without --no-rmq, the runner will attempt to use both RabbitMQ transport and active Tsunami polling simultaneously in that case.

When using --poll-tsunami, you can use two environment variables to fine-tune its behavior:

  • DLSDK_PULL_SLEEP_MS controls how often the Tsunami polling thread checks whether it should try pulling new blocks.
  • DLSDK_PULL_AWAKEN_MS specifies the RabbitMQ timeout -- if the runner hasn't seen any new blocks from RMQ for longer that the value of this variable, it will attempt to find new blocks in Tsunami.

When using --poll-tsunami without --no-rmq, another environment variable comes into play: a mutex is used to prevent RMQ and polling threads from trying to process new blocks at the same time, so DLSDK_MODE_MUTEX_DELAY_MS is used to specify the delay between the attempts to obtain this mutex.

Performance tuning

Your Data Lake shouldn't require much performance tuning when running in real-time as it should be easily able to keep up to any blockchain. When processing historical data, performance consideration becomes much more important, as you normally need to munch through preexisting blocks as quickly as possible.

Recovery block frame and related options

The recovery procedure of the runner is used in several contexts:

  • During the initial processing of historical data or while catching up after the Data Lake runner's been offline for a while.
  • During the --batch processing of blocks.
  • During the processing of blockchain reorganizations, especially important in case of major reorgs.

It consists of two separate phases:

  • Rollback attempts to deal with the consequences of a possible blockchain reorganization by reverting the runner state to the last known good block matching the current Tsunami blocks.
  • After that, fast-forward procedure is designed to quickly process large numbers of blocks to catch up to the top of the blockchain.

Fast-forward recovery processes the blocks in 'frames' and the size of the frame is an important performance tuning parameter. It is controlled through the DLSDK_RECOVERY_BLOCK_FRAME environment variable. The effect of this value on performance is non-linear and non-monotonous, and depends on the following factors:

  • The specifics of the Data Lake you are running, in particular the volume and distribution of events you are monitoring as well as the utilization of the state storage.
  • The hardware you have available for your runner and database backend as well as the properties of the network connection between the two.
  • The specific performance-related options you are using for your runner.

As a result, tuning the block frame size is something of a dark art. We recommend trying the values in the 5 to 200 blocks range and measuring the results.

Among other things, block frame affects the number of blocks that the runner tries to fetch the events for from Tsunami. This behavior is turned on by default but it can be counter-productive, especially in cases where the Tsunami filter that your Data Lake is using changes very often. To turn off this behavior, use the --no-event-batching command-line option.

By default, the runner wraps the state updates for every block into a single transaction. This behavior can be changed so that the runner wraps the updates for the entire block frame into a single transaction. To switch to that mode, use the runner's --frame-level-isolation command-line option. Note that the effect on performance is likely to be fairly minor by itself but this command-line option is a prerequisite for employing the in-memory cache, which is covered in the next section of this document.

Configuring in-memory cache for state updates

The roundtrip time to the database backend can become an issue when processing historical data, especially if your Data Lake is making a lot of state updates. You can alleviate this issue by using the --memory-cache option of the runner. This is a space/time trade-off as the runner will accumulate the state updates during the fast-forward recovery, dumping those to the database as a single query at block frame boundary. The DLSDK_RECOVERY_BLOCK_FRAME becomes an even more important tuning parameter when using --memory-cache.

Note that using --memory-cache may lead to increased memory consumption, both on average and in terms of max RAM usage. You may want to use --force-gc FORCE_GC command-line option, forcing a garbage collection cycle after every FORCE_GC block frames, and configure Node.JS to reduce space requirements, which is covered in the next section.

--log-memory-usage X command-line option can be used to dump basic RAM usage stats to logs every X milliseconds. This may help in profiling the memory usage of your Data Lake.

--flush-cache option, or the equivalent DLSDK_FLUSH_CACHE environment variable, may be used to set a maximum number of blocks to be processed before the cache is dumped into the database. Note that this only makes sense when using the parallel runner in sparse_blocks mode; otherwise tuning the DLSDK_RECOVERY_BLOCK_FRAME variable is a saner way to achieve the same results.

Configuring Node.JS when using in-memory cache

If you are using --force-gc option, you will need to run Node with --expose-gc command-line option, otherwise --force-gc will have no effect.

Using --optimize-for-size is recommended when running with --memory-cache.

Lastly, make sure the --max-old-space-size, limiting the arena size for third-generation objects, is set to around 80-90% of the actual RAM available.

Miscellaneous performance-related options

--unsafe command-line option turns off some important sanity checks and may result in a slight performance improvement. Using it is not recommended in general as the gain is going to be minor but your state will be far less resistant to being corrupted.

Recursive recovery may happen in case of a blockchain reorganization cascade or in case the Tsunami database is corrupted. By default, the runner will attempt to recover from this by rolling back the current transaction, sleeping for a while, and restarting the recovery procedure; thus, under normal circumstances, it should be fairly successful in doing this. There are two environment variables controlling this procedure:

  • DLSDK_RECREC_SLEEP_MS controls the interval in milliseconds between repeated recovery attempts.
  • DLSDK_RECREC_ATTEMPTS specifies the maximum number of attempts at recursive recovery before terminating the runner.

In case you prefer to rely on your orchestration to recover from unexpected situations, you may turn off this behavior at all by using the --no-recursive-recovery command-line option.

Similarly to the options above, DLSDK_TSUNAMI_RECOVERY_SLEEP_MS and DLSDK_TSUNAMI_RECOVERY_ATTEMPTS control the behavior of the runner in case of generic problems with Tsunami API client, such as those caused by network connectivity issues or Tsunami being temporarily down. The default values are pretty generous and you should have little reason to change them.

--stats command-line option, along with --stats-from and --stats-to to specify the block range, can be used to display some simple stats on block processing times. 'Raw' is the time interval between the moment a new_block event has been received and the moment the block has been processed. Thus, it may ignore the processing overhead outside of processNewBlockEvent(), such as dumping the accumulated state cache to the database. 'Real' computes the time interval between new_block events and may be overly pessimistic, especially when processing real-time data, as opposed to historical data.

Google Cloud logger configuration

If you are using the simple Google Cloud logger class provided with the SDK, its configuration is specified by the three environment variables:

  • DLSDK_GOOGLE_APPLICATION_CREDENTIALS is the filename for your app key.
  • DLSDK_GCL_PROJECT_ID specifies the project ID to be used.
  • DLSDK_GCL_LOG_NAME is used to specify your log name.

Options reserved for use by the parallel SDK runner

The following options are intended to be used by the parallel SDK runner and should not be used elsewhere:

  • parallel
  • current_block

Miscellaneous configuration options

Unless you explicitly override the getProperties() method in your Data Lake, specifying the Data Lake ID, the runner will check the environment variable DLSDK_DATALAKE_ID to see if one is defined there.

There are also a few more command-line options the runner supports.

--batch can be used to process a fixed number of blocks instead of listening to events in the background. This should be mostly useful during development and/or testing of your Data Lake.

--reset is also mostly intended for development and testing as it completely nukes your Data Lake by reinitializing the database schema.

--set-last should be used with care as it may corrupt the state of the Data Lake. It is intended as a milder, but still dangerous, alternative of --reset, allowing you to roll back your Data Lake without resetting it completely.

--force-downgrade allows the runner to run after package version downgrade.

--no-debug forces the runner to discard all the DEBUG-level log records both from itself and from the Data Lake.

--show-config option causes the run to dump its complete effective configurtion to logs on start-up.

--verbose mode causes the runner to dump extra debugging information to logs.

--test-mode enables extra features needed for running integration tests and shouldn't ever be explicitly used in other contexts.

The no_shutdown_handlers option, only available as a run() argument, should be used in case you are invoking the runner as a part of a larger application, intending to install your own signal handlers that call the runner's graceful shutdown() procedure as needed.

--version displays the current SDK version and --help displays the brief help page similar to the following:

usage: simple.ts [-h] [-v] [-b BATCH] [--flush-cache FLUSH_CACHE] [--force-downgrade] [-g FORCE_GC] [-f] [--insert-only] [-o LOG_MEMORY_USAGE] [-m] [-d] [-x] [-y] [-n] [-p] [-q QUEUE_NAME] [-r] [-k]
                 [-l SET_LAST] [--show-config] [-s] [--stats-from STATS_FROM] [--stats-to STATS_TO] [-t] [-u] [--use-citus] [-z]

PARSIQ DataLake SDK (@parsiq/datalake-sdk, version 0.17.33) running [UNNAMED-DATALAKE]

optional arguments:
  -h, --help            show this help message and exit
  -v, --version         display version
  -b BATCH, --batch BATCH
                        fetch (at most) the next BATCH blocks from Tsunami, process and exit
  --flush-cache FLUSH_CACHE
                        flush the memory cache at specified number of processed blocks
  --force-downgrade     forces database version downgrade to current package version
  -g FORCE_GC, --force-gc FORCE_GC
                        force garbage collection every FORCE_GC block frames (if available)
  -f, --frame-level_isolation
                        stuff the entire block frame into a single transaction, as opposed to a single block normally
  --insert-only         disallow storage updates by dropping the ON CONLICT clause for storage INSERTs
  -o LOG_MEMORY_USAGE, --log-memory_usage LOG_MEMORY_USAGE
                        log memory usage to debug log every LOG_MEMORY_USAGE milliseconds
  -m, --memory-cache    store the state updates in memory and dump to db at block frame boundaries
  -d, --no-debug        forcibly discard all runner and datalake DEBUG-level log records
  -x, --no-event_batching
                        do not attempt to prefetch the events for multiple blocks while recovering
  -y, --no-recursive_recovery
                        do not attempt to fix recursive recovery
  -n, --no-rmq          do not listen to RMQ for block events
  -p, --poll-tsunami    poll Tsunami for new blocks if none were received through RMQ for a while
  -q QUEUE_NAME, --queue-name QUEUE_NAME
                        RMQ queue name to use
  -r, --reset           reset the datalake to a clean slate before running
  -k, --rmq-quorum_queue
                        use quorum RabbitMQ queue type instead of classic
  -l SET_LAST, --set-last SET_LAST
                        set the last processed block number to the specified value (nuking the states above that block)
  --show-config         log complete configuration on startup
  -s, --stats           display block processing stats and exit
  --stats-from STATS_FROM
                        starting block for --stats
  --stats-to STATS_TO   ending block for --stats
  -t, --test-mode       enable additional features intended strictly for testing
  -u, --unsafe          unsafe mode omits some important sanity checks and should only be used for performance critical tasks
  --use-citus           adapt SQL queries to Citus storage
  -z, --verbose         produce more detailed log records

Configuration of parallel Data Lakes

Running a parallel Data Lake is similar to running a sequential one, you just need to use ParallelMultiStorageDataLakeRunner. Note that you can and should run multiple instances of the runner safely. The SDK will take care of safe schema initialization and task distribution across your workers. Configuration options for parallel Data Lakes are covered in the next section.

Configuration reference

This sections contains reference tables for all the configuration options. Refer to the subsequent sections for details.

The constructor arguments for the ParallelMultiStorageDataLakeRunner are the same as for the MultiStorageDataLakeRunner. The only differences are a different type for the datalake argument and the absence of the rmq argument as the parallel Data Lakes run only in historical mode.

Constructor argumentTypeDescription
datalakeIParallelMultiStorageDataLake<ValueTypes, TsunamiFilter, TsunamiEvent>Data Lake instance to run.
dbPoolpg.PoolDB connection pool to use. Optional. If omitted, runner will instantiate its own DB connection pool using the standard PG* enviornment variables.
logILoggerLogger instance to use. Optional. NullLogger will be used by default.
storageConfigStorageOptionMapping<ValueTypes>Only for MultiStorageDataLakeRunner. Describes state storage configuration.
tsunamiITsunamiApiClient<TsunamiFilter, TsunamiEvent>Tsunami API client to use.

Note that for boolean environment variables, i.e., 1, yes and true (any case) are interpreted as true. All other values are interpreted as false.

run() argumentEnvironment variableCommand-line option (abbrev.)TypeDefault valueDescription
forced_unlock_chancePDLSDK_FORCED_UNLOCK_CHANCEN/Anumber0A floating point number between 0 and 1, specifying the chance for the forced task unlocking to occur upon finishing a task even though not all tasks have been exhausted yet.
group_lock_attemptsPDLSDK_GROUP_LOCK_ATTEMPTSN/Anumber25Maximum number of repeated task group lock attempts on lock failure.
group_lock_sleep_msPDLSDK_GROUP_LOCK_SLEEP_MSN/Anumber200Delay in milliseconds before repeated attempt to obtain a task group lock.
init_sleep_msPDLSDK_INIT_SLEEP_MSN/Anumber3000Delay in milliseconds before repeated attempt to obtain schema initialization lock.
init_timeout_sqlPDLSDK_INIT_TIMEOUT_SQLN/Astring"1 minute"An SQL interval value for schema initialization timeout.
metrics_serverPDLSDK_METRICS_SERVER--metrics-serverbooleanfalseIndicates that the metrics server should be started.
metrics_server_portPDLSDK_METRICS_SERVER_PORT--metrics-server-portnumber8091Port number to use for the metrics server.
mutations_onlyPDLSDK_MUTATIONS_ONLY--mutations-onlybooleanfalseTurns on the mutations_only mode for the K-V storages.
no_debugPDLSDK_NO_DEBUG--no-debug (-d)booleanfalseSuppresses all debug-level log messages.
no_shutdown_handlersN/AN/AbooleanfalseSuppresses the installation of default signal handlers. May be used when running in the context of a larger application that provides its own signal-handling logic.
randomize_tasksPDLSDK_RANDOMIZE_TASKS--randomize-tasksbooleanfalseRandomizes task selection from a locked task group. Lowers the performance of task selection but essential when running in single_storage mode to reduce task contention across workers.
resetN/A--reset (-r)booleanfalseForces the schema reinitialization. In context of the parallel SDK, it is recommended to trigger forced schema init by other means, specifically manual dropping of the schema table.
show_configPDLSDK_SHOW_CONFIG--show-configbooleanfalseDumps the complete effective runner configuration to log on start-up.
single_storagePDLSDK_SINGLE_STORAGE--single-storagebooleanfalsePrevents the use of custom sharding so that all workers use a single set of tables. Intended to be used when running with Citus as a storage backend.
sparse_blocksPDLSDK_SPARSE_BLOCKS--sparse-blocks (-s)booleanfalseAllows sparse blocks in Tsunami responses and in event processing. Dispenses with the overhead of processing blocks with no relevant events at the cost of slightly weakening the consistency guarantees.
unlock_stale_tasks_sqlPDLSDK_UNLOCK_STALE_TASKS_SQLN/Astring(turned off)Specifies the SQL interval value for timeout on stale tasks. After the timeout expires, the tasks may be forcibly unlocked even if still marked as LOCKED. If omitted, stale tasks may remain stuck forever.
update_lock_time_per_blockPDLSDK_UPDATE_LOCK_TIME_PER_BLOCKN/Anumber(turned off)Specifies the number of blocks to process before updating the lock time on the current task. If unspecified, the lock time will never be updated, potentially leading to premature forced unlocking.
use_citusPDLSDK_USE_CITUS--use-citusbooleanfalseSwitches on the Citus-compatible runner mode. Runner will generate queries, including schema definitions, compatible with Citus storage backend.
verbosePDLSDK_VERBOSE--verbose (-z)booleanfalseWhen switched on, causes the runner to produce additional log records for debugging purposes.
wait_timeout_sqlPDLSDK_WAIT_TIMEOUT_SQLN/Astring5 minutesSpecifies the waiting state timeout for schema initialization purposes as an SQL interval value.

Importantly, as the parallel runner uses sequential runners as worker processes, it is affected by most of the DLSDK_* environment variables listed above. The parallel runner overrides the following configuration settings of the worker:

  • batch is set to the current job size.
  • current_block and max_block are set to specify the job boundaries.
  • parallel, no_bdm, no_shutdown_handlers, no_recursive_recovery, frame_level_isolation and memory_cache are always set to true.
  • sparse_blocks, use_citus, no_debug and verbose are always set to match the parallel runner's own configuration.