PARSIQ Data Lake SDK — Quick Introduction & Reference
Introduction
Data Lakes serve as a custom, application-specific data layer for filtering, aggregation, and analysis of blockchain-scale data contained in the Tsunami data store. This SDK provides the general infrastructure for querying Tsunami, maintaining historical data, and transparently handling blockchain reorganizations as well as various configuration options, allowing you to adjust the runtime behavior to the specific needs of your application.
The Data Lake SDK supports two main types of Data Lakes — sequential and parallel.
Sequential Data Lakes are well-suited for low-latency computations with evolving state, especially when data volumes are not too large.
Parallel Data Lakes are tailored towards processing massive volumes of historical data by allowing multiple workers to process blocks independently and out-of-order. The trade-off is that they lack the capability to maintain state.
In some cases, a combination of parallel and sequential Data Lakes is the perfect solution to enable processing of both historical and real-time data, but additional processing or conversion may be needed when transitioning from historical to real-time mode.
Basics of sequential Data Lake implementation
Let's go over the steps involved in implementing a sequential Data Lake, using a simple Data Lake gathering some basic information on CNS registry contract as a running example. We will be monitoring associations of tokens with URIs and current token ownership only, without delving into name resolution.
The complete code for the Data Lake itself (let's call it src/datalake.ts
) is
listed below. We will analyze it in detail in the following sections. To make it
run, follow these steps:
- Initialize a new package.
- Install the dependencies (
@parsiq/datalake-sdk
,@parsiq/tsunami-client
,@parsiq/tsunami-client-sdk-http
,@ethersproject/abi
). - Put the implementation in
src/datalake.ts
. - Replace the
TSUNAMI_API_KEY
constant insrc/datalake.ts
with your actual Tsunami API key. - Implement the entry point (e.g.,
main.ts
), which should simply callrun()
. - Put together simple
Dockerfile
anddocker-compose.yml
. - Build and run.
We will provide simple examples of the files involved in the following sections but for now let's just focus on the Data Lake.
import { Interface } from '@ethersproject/abi'
import * as sdk from '@parsiq/datalake-sdk'
import { ChainId, TsunamiApiClient } from '@parsiq/tsunami-client'
import {
DatalakeTsunamiApiClient,
TsunamiEvent,
TsunamiFilter
} from '@parsiq/tsunami-client-sdk-http'
// Import your ABIs here, in a format suitable for decoding using @ethersproject/abi.
import cnsRegistryAbi from './cns-registry.json'
// Put your Tsunami API key here.
const TSUNAMI_API_KEY = ''
// This is the chain ID for Ethereum mainnet Tsunami API. Change it if you want to work with a different net.
const TSUNAMI_API_NET = ChainId.ETH_MAINNET
// CNS Registry contract address, replace or drop if you intend to monitor something else.
const CONTRACT_ADDRESS = '0xD1E5b0FF1287aA9f9A268759062E4Ab08b9Dacbe'
// topic_0 hashes of our events of interest.
const EVENT_NEW_URI_TOPIC_0 =
'0xc5beef08f693b11c316c0c8394a377a0033c9cf701b8cd8afd79cecef60c3952'
const EVENT_TRANSFER_TOPIC_0 =
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
// Contract deployment block is used as a starting point for the Data Lake.
const CONTRACT_DEPLOYMENT_BLOCK_NUMBER = 9_082_251
// This defines the layout of the type-safe K-V storage.
type DatalakeStorageLayout = {
'': any
domains: { uri: string }
ownership: { owner: string }
}
type DatalakeStorageMetaLayout = {
'': {}
domains: {}
ownership: {}
}
// Types for decoded events follow.
type NewUriEvent = {
tokenId: {
_hex: string
}
uri: string
}
type TransferEvent = {
from: string
to: string
tokenId: {
_hex: string
}
}
class Datalake extends sdk.AbstractMultiStorageDataLakeBase<
DatalakeStorageLayout,
DatalakeStorageMetaLayout,
TsunamiFilter,
TsunamiEvent
> {
private cnsRegistryDecoder: Interface
// Construct ABI decoders here.
constructor() {
super()
this.cnsRegistryDecoder = new Interface(cnsRegistryAbi)
}
public override getProperties(): sdk.DataLakeProperties {
return {
id: 'DATALAKE-TEMPLATE',
initialBlockNumber: CONTRACT_DEPLOYMENT_BLOCK_NUMBER
}
}
// This method generates the filter used to retrieve events from Tsunami. Filter may change from block to block.
public async genTsunamiFilterForBlock(
block: sdk.Block & sdk.DataLakeRunnerState,
isNewBlock: boolean
): Promise<TsunamiFilter> {
return {
contract: [CONTRACT_ADDRESS],
topic_0: [EVENT_NEW_URI_TOPIC_0, EVENT_TRANSFER_TOPIC_0]
}
}
// Main event handler.
public async processTsunamiEvent(
event: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState
): Promise<void | TsunamiFilter> {
switch (event.topic_0) {
case EVENT_NEW_URI_TOPIC_0:
await this.processNewUriEvent(event)
break
case EVENT_TRANSFER_TOPIC_0:
await this.processTransferEvent(event)
break
}
}
private async processNewUriEvent(event: TsunamiEvent): Promise<void> {
// Decodes the event...
const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!)
const decoded = this.cnsRegistryDecoder.decodeEventLog(
fragment,
event.log_data!,
[event.topic_0!, event.topic_1!]
) as unknown as NewUriEvent
// ...then writes to reogranization-aware K-V storage.
await this.set('domains', decoded.tokenId._hex, { uri: decoded.uri })
}
private async processTransferEvent(event: TsunamiEvent): Promise<void> {
if (event.op_code !== 'LOG4') {
return
}
const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!)
const decoded = this.cnsRegistryDecoder.decodeEventLog(
fragment,
event.log_data!,
[event.topic_0!, event.topic_1!, event.topic_2!, event.topic_3!]
) as unknown as TransferEvent
await this.set('ownership', decoded.tokenId._hex, { owner: decoded.to })
}
// The following event handlers should be no-ops under most circumstances.
public async processEndOfBlockEvent(
event: sdk.Block & sdk.DataLakeRunnerState
): Promise<void> {}
public async processBeforeDropBlockEvent(
event: sdk.DropBlockData & sdk.DataLakeRunnerState
): Promise<void> {}
public async processAfterDropBlockEvent(
event: sdk.DropBlockData & sdk.DataLakeRunnerState
): Promise<void> {}
}
export const run = async (): Promise<void> => {
const logger = new sdk.ConsoleLogger()
logger.log('DEBUG', 'Initializing datalake...')
const datalake = new Datalake()
logger.log('DEBUG', 'Initializing Tsunami API...')
const tsunami = new TsunamiApiClient(TSUNAMI_API_KEY, TSUNAMI_API_NET)
logger.log('DEBUG', 'Initializing SDK parsiq.js...')
const tsunamiSdk = new DatalakeTsunamiApiClient(tsunami)
logger.log('DEBUG', 'Initializing runner...')
const runner = new sdk.MultiStorageDataLakeRunner({
storageConfig: {
'': { meta: {} },
domains: { meta: {} },
ownership: { meta: {} }
},
datalake: datalake,
tsunami: tsunamiSdk,
log: logger
})
logger.log('DEBUG', 'Running...')
await runner.run()
}
Key questions
Before beginning to implement your Data Lake, we recommend answering the following key questions regarding it.
What events are you interested in?
First of all, you need to determine what kind of information available in the Tsunami data store you want to process. The specific kinds of available events and filtering options depend in a large part on the parsiq.js you are using. The SDK itself has few assumptions about the parsiq.js, listed in detail in the reference section below. Thus as long as the Data Lake and the parsiq.js agree on the filter and event formats, everything should be in order.
In our sample case, we want to monitor the CNS registry contract on Ethereum mainnet:
const TSUNAMI_API_NET = ChainId.ETH_MAINNET
const CONTRACT_ADDRESS = '0xD1E5b0FF1287aA9f9A268759062E4Ab08b9Dacbe'
We are only interested in two log event types, NewURI
, which indicates the
minting of a new token, and Transfer
, which indicates the transfer of token
ownership:
const EVENT_NEW_URI_TOPIC_0 =
'0xc5beef08f693b11c316c0c8394a377a0033c9cf701b8cd8afd79cecef60c3952'
const EVENT_TRANSFER_TOPIC_0 =
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
Topic 0 hashes have been computed here from the event signatures in the CNS registry ABI.
The CNS registry contract was deployed at block 9,082,251 so we will not need to look at any preceding blocks:
const CONTRACT_DEPLOYMENT_BLOCK_NUMBER = 9_082_251
Since parsiq.js serves raw events, we will want to decode those using ABI,
e.g., @ethersproject/abi
:
import { Interface } from '@ethersproject/abi'
import cnsRegistryAbi from './cns-registry.json'
The sample ABI file can be found in the section below.
We also have to define the types for decoded log events:
type NewUriEvent = {
tokenId: {
_hex: string
}
uri: string
}
type TransferEvent = {
from: string
to: string
tokenId: {
_hex: string
}
}
So far, we have been defining only types and constants. We will start to put all of this together once we have answered all the key questions about our Data Lake.
How do you want to process the data?
You can perform more or less arbitrary computations on the events you are processing with the limiting factor being sufficient performance to be able to catch up to the block rate of the chain in question.
In the case of our example Data Lake, we won't be doing anything much with the data, we just want to store the information on the new tokens and transfers of ownership.
Having determined that, we don't need to write any code just yet so we can move on to the next question.
How do you want to store the results?
The Data Lakes can produce arbitrary side effects while processing events, but the main result of their work is supposed to be a data store with processed data that would be easy to query for consumer apps.
The SDK supports either plain old PostgreSQL as a storage backend or PostgreSQL-based Citus, which allows for a substantial degree of horizontal scalability. The storage is organized into any number of independent, type-safe key-value storages that maintain both the current state and the complete history of changes. The SDK manages the state rollbacks automatically in case of blockchain reorganizations. While the default storage model is document-based, the SDK supports inclusion of additional relational fields and indices in the storage tables. This is mostly needed for advanced use cases so we will not be using this capability in our example Data Lake.
As we have already mentioned, we want to keep track of mappings of tokens to URIs, and of current ownership information for each token. Hence, we are going to need two fairly simple storages:
type DatalakeStorageLayout = {
'': any
domains: { uri: string } // Keyed by token ID.
ownership: { owner: string } // Keyed by token ID.
}
Here the domains
storage maps token IDs (keys) to URIs while the ownership
storage maps token IDs to owner addresses. The SDK makes sure that the current
state of these two storages is represented by the domains_entities
and
mutations_entities
tables on the storage backend while historical information
is kept in the domains_mutations
and ownership_mutations
tables.
The SDK also requires us to always define a default storage with an empty string as a name but we are simply leaving it unused in our example.
While we won't be using custom fields in our Data Lake, we still have to to define a type declaring just that:
type DatalakeStorageMetaLayout = {
'': {}
domains: {}
ownership: {}
}
Implementing the Data Lake itself
Now we are ready to implement the Data Lake as follows:
class Datalake extends sdk.AbstractMultiStorageDataLakeBase<DatalakeStorageLayout, DatalakeStorageMetaLayout, TsunamiFilter, TsunamiEvent> {
We are extending a convenient base class exposed by the SDK here. The filter and event types for the parsiq.js are imported from our client package. The types defining the storage layout are the ones we declared above while anwsering the key questions about the Data Lake.
We need to initialize the event decoder in the constructor:
private cnsRegistryDecoder: Interface;
constructor() {
super();
this.cnsRegistryDecoder = new Interface(cnsRegistryAbi);
}
We also have to define the method providing the fundamental properties of our Data Lake - its name and starting block:
public override getProperties(): sdk.DataLakeProperties {
return {
id: 'EXAMPLE-DATALAKE',
initialBlockNumber: CONTRACT_DEPLOYMENT_BLOCK_NUMBER,
};
}
The next method specifies the filter that we want to use to retrieve events from Tsunami:
public async genTsunamiFilterForBlock(block: sdk.Block & sdk.DataLakeRunnerState, isNewBlock: boolean): Promise<TsunamiFilter> {
return {
contract: [CONTRACT_ADDRESS],
topic_0: [EVENT_NEW_URI_TOPIC_0, EVENT_TRANSFER_TOPIC_0],
};
}
This method may generate the filter dynamically as it can both inspect and update the state of the Data Lake, doubling as the event handler for new blocks.
Note that in case your implementation of genTsunamiFilterForBlock()
updates
the state, you MUST NOT alter the state if the isNewBlock
parameter is
false
. genTsunamiFilterForBlock()
is also called when the events are
pre-fetched in bulk for multiple blocks. Unless your state update logic is
idempotent with respect to processing the same new_block
event multiple times,
failing to respect the value of isNewBlock
may corrupt your state.
The simple parsiq.js used in this example provides the capabilities to retrieve log events by contract address and topics, among other things.
We also have to define the event handler for Tsunami events:
public async processTsunamiEvent(event: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void | TsunamiFilter> {
switch (event.topic_0) {
case EVENT_NEW_URI_TOPIC_0:
await this.processNewUriEvent(event);
break;
case EVENT_TRANSFER_TOPIC_0:
await this.processTransferEvent(event);
break;
}
}
We will just dispatch to separate implementations depending on the topic_0
here.
Note that while genTsunamiFilterForBlock()
cannot change the filter mid-block,
processTsunamiEvent()
can. But in case you are generating a lot of filter
updates mid-block, the runner repeatedly calls the Tsunami API with new requests
for block events, resulting in the worst-case quadratic complexity both time-
and network-wise.
The handler for NewURI
events is trivial as we just decode the raw event and
update the domains
K-V storage:
private async processNewUriEvent(event: TsunamiEvent): Promise<void> {
const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
event.topic_0!,
event.topic_1!
]) as unknown as NewUriEvent;
await this.set('domains', decoded.tokenId._hex, { uri: decoded.uri });
}
We are only writing to the storage here but the event handlers may also inspect it using the methods described in the reference section below.
It is important that you await
on all the state-updating methods in your event
handlers. Failing to do so may lead to undefined behavior.
The set()
method is often convenient but not completely type-safe. Its fourth
argument, providing the data to be written to separate fields associated with
the specified key, is always optional. This is safe when we are not using those
fields but will crash at run-time if we do and fail to provide the values.
setRecord()
is a more cumbersome but type-safe alternative.
The handler for Transfer
events is pretty much the same:
private async processTransferEvent(event: TsunamiEvent): Promise<void> {
if (event.op_code !== 'LOG4') {
return;
}
const fragment = this.cnsRegistryDecoder.getEvent(event.topic_0!);
const decoded = this.cnsRegistryDecoder.decodeEventLog(fragment, event.log_data!, [
event.topic_0!,
event.topic_1!,
event.topic_2!,
event.topic_3!
]) as unknown as TransferEvent;
await this.set('ownership', decoded.tokenId._hex, { owner: decoded.to });
}
We are almost done. The SDK provides some additional event handlers for block boundaries and reorganizations but we can leave their implementations empty in this case:
public async processEndOfBlockEvent(event: sdk.Block & sdk.DataLakeRunnerState): Promise<void> {}
public async processBeforeDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
public async processAfterDropBlockEvent(event: sdk.DropBlockData & sdk.DataLakeRunnerState): Promise<void> {}
}
Our Data Lake is ready to roll.
Calling the Data Lake runner provided by the SDK
Invoking the runner is easy enough as we just have to construct a bunch of objects that it needs, such as the Data Lake instance, parsiq.js etc.
The storageConfig
argument is required for proper database schema
initialization.
export const run = async (): Promise<void> => {
const logger = new sdk.ConsoleLogger()
logger.log('DEBUG', 'Initializing datalake...')
const datalake = new Datalake()
logger.log('DEBUG', 'Initializing Tsunami API...')
const tsunami = new TsunamiApiClient(TSUNAMI_API_KEY, TSUNAMI_API_NET)
logger.log('DEBUG', 'Initializing SDK parsiq.js...')
const tsunamiSdk = new DatalakeTsunamiApiClient(tsunami)
logger.log('DEBUG', 'Initializing runner...')
const runner = new sdk.MultiStorageDataLakeRunner({
storageConfig: {
'': { meta: {} },
domains: { meta: {} },
ownership: { meta: {} }
},
datalake: datalake,
tsunami: tsunamiSdk,
log: logger
})
logger.log('DEBUG', 'Running...')
await runner.run()
}
Configuring the runner
Using the following docker-compose.yml
will run the datalake with sensible
settings. Take a look at the environment variables that it passes to the
runner's container. The reference section below contains a detailed list of all
the configuration options available.
version: '3.8'
services:
datalake_runner:
container_name: datalake_template_container
environment:
PGHOST: pg
PGPORT: 5432
PGDATABASE: datalake
PGUSER: datalake
PGPASSWORD: datalake
DLSDK_SHOW_CONFIG: "true"
# Set DLSDK_VERBOSE to "false" and DLSDK_NO_DEBUG to "true" for less console output.
DLSDK_VERBOSE: "true"
DLSDK_NO_DEBUG: "false"
#DLSDK_VERBOSE: "false"
#DLSDK_NO_DEBUG: "true"
DLSDK_POLL_TSUNAMI: "true"
DLSDK_NO_RMQ: "true"
# DLSDK_RECOVERY_BLOCK_FRAME controls the number of blocks retrieved at once in historical/batch/recovery mode.
DLSDK_RECOVERY_BLOCK_FRAME: 200
DLSDK_FRAME_LEVEL_ISOLATION: "true"
DLSDK_MEMORY_CACHE: "true"
DLSDK_RECREC_SLEEP_MS: 100
DLSDK_RECREC_ATTEMPTS: 5
DLSDK_MODE_MUTEX_DELAY_MS: 10
DLSDK_PULL_SLEEP_MS: 100
DLSDK_PULL_AWAKEN_MS: 500
DLSDK_FORCE_GC: 5
DLSDK_LOG_MEMORY_USAGE: 5000
depends_on:
- pg
image: "datalake-template-dev:latest"
# Drop the '--reset' CLI option here if you don't want to reset the datalake's state on each run.
entrypoint: /bin/bash ./run.sh --reset
#entrypoint: /bin/bash ./run.sh
pg:
container_name: postgres_datalake_container
image: postgres
environment:
POSTGRES_DB: datalake
POSTGRES_USER: datalake
POSTGRES_PASSWORD: datalake
ports:
- "54329:5432"
volumes:
- ./volumes/postgres/:/var/lib/postgresql/data
stop_grace_period: 1m
Sample misc. files for the CNS registry Data Lake
This section contains samples for the rest of the files needed to create a complete Data Lake.
Dockerfile
FROM node:16-alpine AS datalake-template-dev
RUN apk add bash
RUN npm install --global wait-port
RUN mkdir -p /home/node/runner && chown -R node:node /home/node/runner
USER node
WORKDIR /home/node/runner
RUN mkdir -p /home/node/runner/dist
COPY --chown=node:node package.json package-lock.json ./
COPY --chown=node:node local_packages/ ./local_packages/
RUN npm ci --ignore-scripts
COPY --chown=node:node tsconfig.json run.sh main.ts ./
COPY --chown=node:node src/ ./src/
RUN npx tsc
RUN rm -rf package.json package-lock.json local_packages/ tsconfig.json main.ts src/
main.ts
import { run } from './src/datalake'
run()
run.sh
#!/bin/bash
wait-port pg:5432
node --expose-gc --optimize-for-size dist/main.js "$@"
src/cns-registry.json
[
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
},
{
"indexed": false,
"internalType": "string",
"name": "uri",
"type": "string"
}
],
"name": "NewURI",
"type": "event"
},
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address"
},
{
"indexed": true,
"internalType": "uint256",
"name": "tokenId",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
}
]
Basics of parallel Data Lake implementation
Advantages and limitations of parallel Data Lakes
Parallel Data Lakes are designed to achieve high performance while churning through historical data, by throwing multiple independent workers at a single task. This allows to leverage the hardware resources available to increase throughput, with the storage backend usually becoming the limiting factor to performance gains.
Naturally, this comes with a price. Since blocks and events can and will be processed out of order and simultaneously by separate workers, no meaningful concept of evolving state can be computed at the parallel stage. In algebraic terms, the state updates affected by the Data Lake workers should be commutative and associative so that any computation order would produce the same result, up to equivalence from the eventual data consumer's standpoint. Ideally, the computations should also be idempotent, which improves data consistency guarantees and the ease of recovering from failures.
Once again, since blocks are processed out of order, history of state is not maintained and reorganizations are not supported, which limits parallel Data Lakes to operating on historical data. This means that in case you need to continue in real-time mode, you will need to implement a separate sequential Data Lake and potentially a custom stage for converting the data collected at the parallel stage to a more suitable form. You can think of this as the 'reduce' step to parallel Data Lake's 'map' step. The interfaces of parallel and sequential Data Lakes are somewhat dissimilar but it is usually possible to achieve a high degree of code reuse by implementing a helper class employed by both.
Implementing a parallel Data Lake
While the fundamental concepts of parallel Data Lakes are very similar to those underlying sequntial Data Lakes, there are important differences in both the running mode of parallel datalakes and the capabilities exposed to them by the SDK.
We will be using a simplified version of the Uniswap V2 Data Lake as a running example here. The aim is to collect swap turnover data for all the deployed pairs.
Basic concepts of the parallel Data Lake runner
For the purposes of work distribution and sequencing, parallel Data Lake SDK employs the following three concepts:
- Tasks
- Task groups
- Filter groups
Tasks
A task is a small unit of work, consisting of a range of blocks (typically a few hundreds or thousands of those) and associated with a specific filter group. Tasks are distributed to workers that are effectively sequential Data Lake runners operating in batch mode with a few performance-related tweaks.
Task groups
The primary purpose of task groups is to reduce the probability of cross-worker deadlocks when using plain old PostgreSQL as a storage backend. This is achieved by organizing the data into multiple sets of tables using the same schema, each of which is associated with a single task group. Workers never take more than one task from any task group at the same time, ensuring that every worker is writing to its own set of tables.
Filter groups
Filter groups serve to separate heterogeneous workloads into homogeneous tasks. A filter group is identified by a unique string and it associates block ranges with Tsunami filters to be employed by workers processing tasks belonging to that filter group. New filter groups can be created at run-time, causing the creation of new tasks. However, existing filter groups must not be altered because the runner's behavior is undefined in that case.
For example, the uniswap Data Lake organizes its work into two filter groups:
factory
(responsible for monitoring the factory contract to identify the
deployed pools) and pairs
(responsible for monitoring Swap
events).
Data Lake design
All the initial steps in implementing parallel Data Lakes, such as obtaining a parsiq.js and designing the storage layout, are the same for sequential Data Lakes. As far as storage layout is concerned, remember that your updates should be commutative and associative so keep that in mind while designing the schema. It would also be advantageous to make your state updates:
- Insert-only (never updating existing keys)
- Avoiding any reads
For Uniswap, we will use a layout with two separate storages: for keeping the turnover data and for keeping the information on individual pairs:
type StorageLayout = {
'': [string, string, string, string] // amount0In, amount1In, amount0Out, amount1Out
pairs: [string, string, string, string, number] // token0, token1, pair, index, block_number
}
The key for the default storage is a combination of contract address and block number. The key for the pairs storage is simply the contract address.
Implement your Data Lake
You will need to implement a class (e.g., MyParallelDataLake
) that conforms to
the interface
IParallelMultiStorageDataLake<StorageValueLayout, StorageMetaLayout, TsunamiFilter, TsunamiEvent>
.
You may start from scratch but it is recommended to extend the abstract base
class provided by the SDK:
AbstractParallelMultiStorageDataLakeBase<StorageValueLayout, StorageMetaLayout, TsunamiFilter, TsunamiEvent>
.
You will need to implement the following methods.
getProperties(): ParallelDataLakeProperties
This method is mandatory for parallel Data Lakes as it specifies the fundamental Data Lake properties that have no reasonable defaults.
public getProperties(): sdk.ParallelDataLakeProperties {
return {
id: 'UNISWAP-V2-DATALAKE',
initialBlockNumber: 0, // ...or the deployment block of the factory contract for optimum performance.
lastBlockNumber: 16000000, // ...or the actual block number up to which you want the data.
groupSize: 6000, // Group and task size are important performance tuning parameters.
taskSize: 2000,
groups: new Set(['factory', 'pairs']),
// Imporantly, this will lock the processing of Swap events until all the pairs have been identified.
lockedGroups: new Set(['pairs']),
};
}
You will want to use the group size so that the resulting number of task groups is roughly twice the maximum possible number of workers you expect to be using but there should be at least a couple of meaningfully sized tasks within each task group (i.e., larger or equal to the block frame size that you intend to use). Group size becomes less important if you are using single-storage mode rather than the SDK's default custom sharding. However, it is still advisable to keep the number of the task groups at least roughly equal to the number of workers as it can assist with smoother task distribution.
genTsunamiFilters(): Promise\<FilterGroups<TsunamiFilter>>
This method specifies the initial filter groups for your Data Lake.
For Uniswap:
public async genTsunamiFilters(): Promise<sdk.FilterGroups<TsunamiFilter>> {
const filters = new Map();
filters.set('factory', { group: 'factory', filters: [{
block: 0, filter: {
topic_0: '0x0d3648bd0f6ba80134a33ba9275ac585d9d315f0ad8355cddefde31afa28d0e9', // The topic for PairCreated events.
contract: '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f', // Uniswap V2 factory contract address.
}
}] });
filters.set('pairs', { group: 'pairs', filters: [{
block: 0, filter: {
topic_0: '0xd78ad95fa46c994b6551d0da85fc275fe613ce37657fb8d5e3d130840159d822', // The topic for Swap events.
}
}] });
return filters;
}
processNewBlockEvent(block: NewBlockData & DataLakeRunnerState): Promise<void>
This is the event handler for a new block event. Since the parallel version of the runner does not use this method to generate the filter, more often than not the implementation can be a no-op unless you need some specific start-of-block bookkeeping.
processTsunamiEvent(filterGroup: string, event: TsunamiEvent & TimecodedEvent & DataLakeRunnerState): Promise\<FilterGroups<TsunamiFilter> | void>
This method is similar to the analogous method for sequential Data Lakes. It also serves as a key processing point for the Data Lake but, in addition to the filter group name being passed to it as a first argument, the semantics of this method's return value are very different from the sequential case.
A non-void
return value means that the worker wants to initialize a whole new
filter group (or groups) and create fresh tasks as appropriate. This could be
useful, for example, if we chose a different approach to the Uniswap Data Lake's
design where every PairCreated
event would create a fresh filter group for
that specific contract address together with associated tasks. Turns out, while
that approach is viable, it is less performant than the one we have chosen here.
In other applications, it might be the right way to go.
An implementation for the Uniswap Data Lake, along with auxiliary methods, would be as follows:
public async processTsunamiEvent(filterGroup: string, evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void | sdk.FilterGroups<TsunamiFilter>> {
if (filterGroup === 'factory') {
return await this.processPairCreatedEvent(evt);
} else {
await this.processPairEvent(evt);
}
}
public async processPairCreatedEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
const fragment = this.uniswapFactoryInterface.getEvent(evt.topic_0!);
const decoded = this.uniswapFactoryInterface.decodeEventLog(fragment, evt.log_data!, [evt.topic_0!, evt.topic_1!, evt.topic_2!]) as unknown as PairCreatedEvent;
await this.apis.pairs.set(decoded[2].toLowerCase(), [
decoded[0],
decoded[1],
decoded[2],
new BigNumber(decoded[3]._hex).toFixed(),
evt.block_number!,
]);
}
public async processPairEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
if (this.pairs === undefined) {
this.pairs = await this.getPairList();
}
if (!this.pairs.has(evt.contract.toLowerCase())) {
return;
}
this.pair = evt.contract;
this.block = evt._DLSDK.block_number;
await this.processSwapEvent(evt);
}
private async processSwapEvent(evt: TsunamiEvent & sdk.TimecodedEvent & sdk.DataLakeRunnerState): Promise<void> {
const fragment = this.uniswapPairInterface.getEvent(evt.topic_0!);
const decoded = this.uniswapPairInterface.decodeEventLog(fragment, evt.log_data!, [evt.topic_0!, evt.topic_1!, evt.topic_2!]) as unknown as SwapEvent;
await this.updateVolumes(decoded.amount0In, decoded.amount1In, decoded.amount0Out, decoded.amount1Out);
}
private async updateVolumes(amount0In: Amount, amount1In: Amount, amount0Out: Amount, amount1Out: Amount): Promise<void> {
const current = this.turnover.get(this.pair) ?? ['0', '0', '0', '0'];
const updated: Turnover = [
new BigNumber(current[0]).plus(new BigNumber(amount0In._hex)).toFixed(),
new BigNumber(current[1]).plus(new BigNumber(amount1In._hex)).toFixed(),
new BigNumber(current[2]).plus(new BigNumber(amount0Out._hex)).toFixed(),
new BigNumber(current[3]).plus(new BigNumber(amount1Out._hex)).toFixed(),
];
this.turnover.set(this.pair, updated);
}
public async getPairList(): Promise<Set<Address>> {
const result = new Set<Address>();
const properties = this.getProperties();
// This assumes that the datalake works in 'single_storage' mode.
await this.apis.pairs.withPg(async (pg) => {
const res = await pg.query(
' SELECT entity_key FROM ' + pg.escapeIdentifier('pairs_0_entities') + ' '
);
for (const row of res.rows) {
result.add(row.entity_key);
}
});
return result;
}
Note that the event handler does not, in itself, change the Data Lake's state and turnover data is updated in bulk at the end of the block.
Note that the code above implies that the Data Lake object has some internal state, which should be initialized as follows:
private apis: sdk.ISimpleStorageMapping<StorageLayout>;
private readonly uniswapFactoryInterface;
private readonly uniswapPairInterface;
private pair: string;
private block: number;
private turnover: Map<Address, Turnover>;
private pairs?: Set<Address>;
constructor(apis: sdk.ISimpleStorageMapping<StorageLayout>) {
this.apis = apis;
this.uniswapFactoryInterface = new Interface(uniswapFactoryAbi);
this.uniswapPairInterface = new Interface(uniswapPairAbi);
this.pair = '';
this.block = 0;
this.turnover = new Map();
}
public setApis(apis: sdk.ISimpleStorageMapping<StorageLayout>): void {
this.apis = apis;
}
processEndOfBlockEvent(event: NewBlockData & DataLakeRunnerState): Promise<void>
End of block event handler. This can be useful if you intend to accumulate some data within the block boundaries before dumping it in a single update at the end. For Uniswap:
public async processEndOfBlockEvent(evt: sdk.Block & sdk.DataLakeRunnerState): Promise<void> {
for (const key of this.turnover.keys()) {
await this.apis[''].set(this.genKey(key, this.block), this.turnover.get(key)!);
}
this.turnover.set('turnover', new Map());
}
private genKey(pair: string, block: number): string {
return pair + '-' + block.toString().padStart(10, '0');
}
unlockGroups(finished: Set<string>, locked: Set<string>): Set<string>;
In case you don't need task ordering, the implementation can be as simple as
return new Set();
. Since our approach to Uniswap Data Lake involves ordering
between factory
and pairs
filter groups, we have to implement this as
follows:
public unlockGroups(finished: Set<string>, locked: Set<string>): Set<string> {
if (finished.has('factory') && locked.has('pairs')) {
return new Set(['pairs']);
}
return new Set();
}
Or, in simple terms, tasks belonging to the pairs
filter group should be
unlocked if and only if all tasks within this factory
filter group are marked
as done.
The interface for the parallel Data Lakes includes a few more methods but the abstract base class supplies reasonable implementations for those.
Running and configuring your Data Lake
Running a parallel Data Lake is similar to running a sequential one as you just
need to use ParallelMultiStorageDataLakeRunner
. Note that you can and
should run multiple instances of the runner safely. The SDK will take care
of safe schema initialization and task distribution across your workers.
SDK reference
This section covers various aspects of the SDK in greater detail.
SDK requirements for parsiq.jss
The SDK makes relatively few assumptions about the Tsunami API client's interface, allowing it to interoperate smoothly with different implementations exposing different capabilities. The assumptions are as follows:
- The client must be capable of event look-up by block hash, using the optional
block_hash
field in theTsunamiFilter
. - The client must be capable of additional filtering on top of
block_hash
, by limiting the events returned to those after a specific timecode provided in theafter_timecode
field in theTsunamiFilter
. - The client must be capable of event look-up by block number range, using the
optional
block_range
field in theTsunamiFilter
. - The client must be capable of grouping the events by
block_hash
. - The
events
returned by the client contain thetimecode
field, which imposes a total order on all events. - The
events
are returned in ascending timecode order. - The parsiq.js must also be able to provide basic information on blocks, i.e., hashes, parents and data on the current top block.
Advanced methods in the sequential Data Lake interface
processEndOfBlockEvent(event: NewBlockData & DataLakeRunnerState): Promise<void>
Intended for advanced use cases, this method allows you to run some additional logic and affect extra state changes at the end of the block. The timecode used for state updates is the same as that of the last event processed. Unless you require this in your Data Lake, simply leave the implementation empty.
processBeforeDropBlockEvent(event: DropBlockData & DataLakeRunnerState): Promise<void>
and processAfterDropBlockEvent(event: DropBlockData & DataLakeRunnerState): Promise<void>
These two methods are intended for advanced use cases where the Data Lake needs to update the state or do some auxiliary processing in case of blockchain reorganizations. Note that the runner takes care of keeping the state consistent without involving the Data Lake. So unless you have a very good reason to do otherwise, your implementations of these two methods should do nothing at all.
In case you do wish to perform state updates on reorganizations, keep in mind the following:
processBeforeDropBlockEvent()
can inspect the pre-reorg state about to be destroyed but all changes it makes to it will be destroyed as well.processAfterDropBlockEvent()
can make changes to the state that will persist after the reorg's processing is finished, but it can't observe the pre-reorg state directly since it is already gone by the time this method is called.- Therefore, communication between these two methods is one of the rare cases when there is a good reason for your Data Lake to use its own object properties to keep the data.
In addition to these methods, you may want to override the following methods as well, although this is optional.
getProperties(): DataLakeProperties
This method is supposed to return an object describing the fundamental, immutable properties of the Data Lake. Note that these should never change, whether at run-time or between runs. It only makes sense to ever change these values if you intend to reset your Data Lake.
Except for id
, all fields of DataLakeProperties
are optional, and the runner
will assume 'the reasonable defaults' if those are omitted. The fields are as
follows:
id
id
is mostly used for aesthetic purposes but it also serves as the primary key
for the table keeping the Data Lake runner's internal state. Defaults to
"UNNAMED-DATALAKE"
.
initialBlockNumber
This determines the starting point of your Data Lake and should be used for
cases where you don't want to inspect the entire blockchain, e.g., if you are
monitoring a specific contract address and don't want to process the blocks
before the contract's original deployment. Defaults to 0
, i.e., the genesis
block of your specific blockchain.
extraSchemaInit
This allows you to make changes to the default Data Lake schema, although
obviously you must keep it compatible with the SDK's expectations. The main
intended purpose would be defining additional GIN
indices on your Data Lake's
value storages. Leaving it undefined means you do not need to make any
alteration to the default Data Lake schema.
For example, the NFT ownership app defines the following additional indices on its default schema:
CREATE INDEX idx_gin_entities_entity_value ON entities USING GIN ((entity_value -> 'owner'));
CREATE INDEX idx_gin_mutations_entity_value ON mutations USING GIN ((entity_value -> 'owner'));
setTsunamiApiClient(tsunamiClient: ITsunamiApiClient<TsunamiFilter, TsunamiEvent>): void
If you need your Data Lake to query via the Tsunami API directly (e.g., to
inspect historical data while processing fresh events), you will also need to
implement the setTsunamiApiClient()
method. Note that this is reserved for
advanced use cases and should not be done unless you understand very well what
you are doing and you are willing to accept the computational costs.
Inspecting and modifying your Data Lake's state
Both IStateStorage
interface and the AbstractDataLakeBase
expose a number of
methods for inspecting and changing the state of your Data Lake. We already saw
.get()
and .set()
in the examples above so let's examine the entire set of
capabilities.
Inspecting the current state
get(key: string): Promise<ValueType | null>
, getRecord(key: string): Promise<ValueRecord<ValueType, MetaType> | null>
Returns the current value of the entity specified by the key. getRecord()
differs from get()
in that it does support fetching of meta fields in addition
to the value itself.
has(key: string): Promise<boolean>
Returns false
if the value of the entity is null
, or true
otherwise.
Modifying the state
set(key: string, value: ValueType | null, meta?: MetaType): Promise<void>
, setRecord(key: string, value: ValueRecord<ValueType, MetaType> | null): Promise<void>
Sets the current value of the entity to value
. The block number and timecode
associated with this state change are automatically provided to the state
storage by the Data Lake runner. set()
is more convenient when you are not
using meta field, as the second argument is optional, but it also means it lacks
type safety. setRecord()
is type-safe, but more cumbersome to use.
delete(key: string): Promise<void>
Equivalent to calling .set(key, null)
.
Inspecting the historical state
getEntityAtBlockNumber(key: string, block: BlockNumber): Promise<ValueType | null>
, getEntityAtTimecode(key: string, timecode: Timecode): Promise<ValueType | null>
, getEntityRecordAtBlockNumber(key: string, block: BlockNumber): Promise<ValueRecord<ValueType, MetaType> | null>
, getEntityRecordAtTimecode(key: string, timecode: Timecode): Promise<ValueRecord<ValueType, MetaType> | null>
Similar to get()
but returns the value of the entity at some point in the
past. The point in the past is specified either as a block number or a timecode.
getStateAtBlockNumber(block: BlockNumber): Promise<Map<string, ValueType>>
, getStateAtTimecode(timecode: Timecode): Promise<Map<string, ValueType>>
, getStateRecordsAtBlockNumber(block: BlockNumber): Promise<Map<string, ValueRecord<ValueType, MetaType>>>
, getStateRecordsAtTimecode(timecode: Timecode): Promise<Map<string, ValueRecord<ValueType, MetaType>>>
Returns the entire state of the Data Lake at some point in the past as a map
from entity keys to values. Entities the value of which is null
are not
included. The point in the past is specified as a block number or timecode.
getEntityHistory(key: string): Promise<Mutation<ValueType>[]>
, getEntityHistoryRecords(key: string): Promise<Mutation<ValueRecord<ValueType, MetaType>>[]>
Returns the entire history of value changes of a given entity, together with
associated block numbers and timecodes. Note that null
values are also
included.
Details on state storage organization
Every independent state storage uses two PostgreSQL tables to keep the data —
entities
and mutations
(possibly prefixed by the name of the storage if used
in the context of a multi-storage runner).
The entities
table simply keeps the current, most up-to-date values of all
entities and is used for fast querying of the current state.
The mutations
table, on the other hand, keeps the entire history of updates
for all the entities. This serves two purposes:
- The
mutations
table is used for querying the historical state, e.g., what value did a given entity have at a certain point in the past? - It is also used for reverting the state to a certain point in the past in case of blockchain reorganizations. This allows the Data Lake runner to maintain the proper state after reorg without involving the Data Lake itself.
When using --memory-cache
, similar data structures are also used in memory to
keep the updates that have happened in the current block frame. These are dumped
into the database at the frame boundary.
Alternate Data Lake interfaces and base classes
IDataLake<ValueType extends unknown, MetaType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}>
and AbstractDataLakeBase<ValueType extends unknown, MetaType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}>
This simplified interface and its associated Data Lake base class are meant for
use with basic DataLakeRunner
, which is a thin wrapper over
MultiStorageDatalakeRunner
. Everything, including runtime configuration,
remains the same as with standard sequential Data Lakes but this set-up supports
only one K-V storage. The upside is that typing and interacting with the
storages are somewhat simpler and more convenient than in the multi-storage
case.
AbstractEffectfulMultiStorageDataLakeBase<ValueTypes extends { '': unknown, 'effectstorage': { effects: EffectType[] } }, MetaTypes extends { [key in keyof ValueTypes]: {} }, EffectType extends {}, TsunamiFilter extends {}, TsunamiEvent extends {}>
This base class can serve as a convenient starting point in case you want arbitrary reversible side effects in your Data Lake. You will need to define the type for your side effects and implement the methods for performing and reverting the side effects in question. The base class implements a standard sequential Data Lake in other respects.
Configuration of sequential Data Lakes
Configuring a Data Lake involves several components:
- Environment variables are mostly used for things that are more convenient to specify in your docker-compose or k8s configuration files, and should rarely, if ever, change throughout the lifetime of your Data Lake.
- Command-line options chiefly control the running modes of your Data Lake, and
some of these are used when you are interacting with your Data Lake through
shell. These can also be overriden by passing a fixed configuration to the
.run()
method ofDataLakeRunner
. constructor()
arguments are intended for stuff that can only be passed to the runner at run-time, such as database connection pool or storage configuration for multi-storage runner.
Where multiple methods of configuring the same value are available, command-line
options take precedence over environment variables and run()
arguments take
precedence over command-line options.
We will now go over configuring all the vital aspects of your Data Lake, covering various options where available.
Configuration reference
This section contains reference tables for all the configuration options. Refer to the subsequent section for details.
Constructor argument | Type | Description |
---|---|---|
datalake | IMultiStorageDataLake<ValueTypes, TsunamiFilter, TsunamiEvent> | Data Lake instance to run. |
dbPool | pg.Pool | DB connection pool to use (optional). If omitted, runner will instantiate its own DB connection pool using the standard PG* enviornment variables. |
log | ILogger | Logger instance to use (optional). NullLogger will be used by default. |
rmq | amqplib.Connection | RabbitMQ connection to use (optional). If omitted, runner will instantiate its own. |
storageConfig | StorageOptionMapping<ValueTypes> | Only for MultiStorageDataLakeRunner . Describes state storage configuration. |
tsunami | ITsunamiApiClient<TsunamiFilter, TsunamiEvent> | Tsunami API client to use. |
Note that for boolean environment variables, i.e., 1
, yes
and true
(any
case) are interpreted as true
. All other values are interpreted as false
.
run() argument | Environment variable | Command-line option (abbrev.) | Type | Default value | Description |
---|---|---|---|---|---|
N/A | DLSDK_DATALAKE_ID | N/A | string | UNNAMED-DATALAKE | Unique Data Lake ID. |
N/A | DLSDK_GCL_PROJECT_ID | N/A | string | N/A | Project ID for GoogleCloudLogger . |
N/A | DLSDK_GCL_LOG_NAME | N/A | string | (empty string) | Log name for GoogleCloudLogger . |
N/A | DLSDK_GOOGLE_APPLICATION_CREDENTIALS | N/A | string | N/A | Filename of credentials for GoogleCloudLogger . |
batch | N/A | --batch (-b ) | number | (off) | Attempt to fetch and process the next X blocks, then exit. |
current_block | N/A | N/A | number | (off) | Specify the expected current block for consistency checks when running as a worker process for the parallel SDK. |
flush_cache | DLSDK_FLUSH_CACHE | --flush-cache | number | (turned off) | Causes the runner to flush the memory cache after every X processed blocks. |
force_downgrade | N/A | --force-downgrade | boolean | false | Allows the runner to proceed if the package version is lower than the DB version. |
force_gc | DLSDK_FORCE_GC | --force-gc (-f ) | number | (turned off) | Force GC every X block frames. |
frame_level_isolation | DLSDK_FRAME_LEVEL_ISOLATION | --frame-level-isolation (-f ) | boolean | false | Use block frame-level transactions instead of block-level transactions. |
insert_only | DLSDK_INSERT_ONLY | --insert-only | boolean | false | Indicates that the Data Lake never overwrites existing keys in the storage, allowing simpler and more performant implementation of set() /dumpCache() . |
keep_last_blocks | DLSDK_KEEP_LAST_BLOCKS | --keep-last-blocks | number | 1000 | Specifies the number of last blocks to keep when purging data. |
log_memory_usage | DLSDK_LOG_MEMORY_USAGE | --log-memory_usage (-o ) | number | (turned off) | Log memory usage every X milliseconds. |
max_block | DLSDK_MAX_BLOCK | --max-block | number | (off) | Specify the maximum block number to process. |
memory_cache | DLSDK_MEMORY_CACHE | --memory-cache (-m ) | boolean | false | Use memory caching for state updates. |
metrics_server | DLSDK_METRICS_SERVER | --metrics-server | boolean | false | Indicates that the metrics server should be started. |
metrics_server_port | DLSDK_METRICS_SERVER_PORT | --metrics-server-port | number | 8089 | Port number to use for the metrics server. |
metrics_server_interval_sql | DLSDK_METRICS_SERVER_INTERVAL_SQL | --metrics-server-interval-sql | string | 1 minute | Timeframe for the metrics server as an SQL interval. |
mode_mutex_delay_ms | DLSDK_MODE_MUTEX_DELAY_MS | N/A | number | 10 | Delay before attempting to grab the mode mutex again after failure. |
mutations_only | DLSDK_MUTATIONS_ONLY | --mutations-only | boolean | false | Turns on the mutations_only mode for the K-V storages. |
no_bdm | DLSDK_NO_BDM | --no-bdm | boolean | false | Allow the runner to work without BlockDataManager , thus ignoring blockhash ancestry checks. |
no_consistency_check | DLSDK_NO_CONSISTENCY_CHECK | --no-consistency-check | boolean | false | Disables database consistency checks on start-up. |
no_debug | DLSDK_NO_DEBUG | --no-debug (-d ) | boolean | false | Discard all debug-level log records. |
no_event_batching | DLSDK_NO_EVENT_BATCHING | --no-event-batching (-x ) | boolean | false | Do not attempt to pre-fetch Tsunami events for multiple blocks. |
no_recursive_recovery | DLSDK_NO_RECURSIVE_RECOVERY | --no-recursive-recovery (-y ) | boolean | false | Do not attempt to recover from cascade reorgs. |
no_rmq | DLSDK_NO_RMQ | --no-rmq (-n ) | boolean | false | Do not listen to RabbitMQ for fresh blocks. |
no_shutdown_handlers | N/A | N/A | boolean | false | Prevents the runner for installing its own signal handlers, e.g., when being managed by another process. Used when running as a worker process for the parallel SDK. |
parallel | N/A | N/A | boolean | false | Engages specialized runner mode for use with the parallel SDK. The semantics of this option are, unfortunately, very different from all other options. It should never be set manually as doing so will either have no effect at all or cause strange and difficult to predict behavior of the runner. Strictly for internal use by the parallel SDK. |
poll_tsunami | DLSDK_POLL_TSUNAMI | --poll-tsunami (-p ) | boolean | false | Poll Tsunami in background for fresh blocks. |
pull_awaken_ms | DLSDK_PULL_AWAKEN_MS | N/A | number | 500 | --poll-tsunami polling interval. |
pull_sleep_ms | DLSDK_PULL_SLEEP_MS | N/A | number | 10 | --poll-tsunami sleep interval. |
purge_block_data | DLSDK_PURGE_BLOCK_DATA | --purge-block-data | boolean | false | Indicates that the block data should be purged in addition to storage mutation records when purging old data. |
purge_every_blocks | DLSDK_PURGE_EVERY_BLOCKS | --purge-every-blocks | number | (turned off) | Indicates that the old mutation records should be purged from the db every X blocks. |
queue_name | DLSDK_RMQ_QUEUE | --queue-name (-q ) | string | (empty string) | RabbitMQ queue name to use. |
recovery_block_frame | DLSDK_RECOVERY_BLOCK_FRAME | N/A | number | 3 | Block frame size when fast-forwarding. |
recrec_attempts | DLSDK_RECREC_ATTEMPTS | N/A | number | 5 | Maximum number of cascade reorg recovery attempts. |
recrec_sleep_ms | DLSDK_RECREC_SLEEP_MS | N/A | number | 100 | Cascade reorg recovery delay. |
reset | N/A | --reset (-r ) | boolean | false | Nuke the Data Lake and reset to the initial state. |
rmq_quorum_queue | DLSDK_RMQ_QUORUM_QUEUE | --rmq-quorum-queue (-k ) | boolean | false | Use the quorum RabbitMQ queue type instead of classic. |
rmq_sleep_ms | DLSDK_RMQ_SLEEP_MS | N/A | number | 10 | Delay before attempting to fetch the next message from RabbitMQ. |
rmq_url | DLSDK_RMQ_URL | N/A | string | (empty string) | RabbitMQ URL. |
set_last | N/A | --set-last (-l ) | number | (off) | Reset the Data Lake to the specified block. |
show_config | DLSDK_SHOW_CONFIG | --show-config | boolean | false | Causes the SDK runner to log the complete effective configuration on start-up. |
sparse_blocks | DLSDK_SPARSE_BLOCKS | --sparse-blocks | boolean | false | Engages a different event retrieval mode for Tsunami API client. Blocks with no relevant events are ignored, potentially speeding up the processing in some applications. |
stats | N/A | --stats (-s ) | boolean | false | Display block processing time stats and exit. |
stats_from | N/A | --stats-from | number | first known block | Starting block for --stats . |
stats_to | N/A | --stats-to | number | last known block | Ending block for --stats . |
test_mode | N/A | N/A | boolean | false | For running internal test suites only. |
tsunami_recovery_attempts | DLSDK_TSUNAMI_RECOVERY_ATTEMPTS | N/A | number | 20 | Maximum number of retries on Tsunami API failures. |
tsunami_recovery_sleep_ms | DLSDK_TSUNAMI_RECOVERY_SLEEP_MS | N/A | number | 3000 | Delay between repeated attempts to query Tsunami on failures. |
unsafe | N/A | --unsafe (-u ) | boolean | false | Engages unsafe mode which omits some important sanity checks in the runner in exchange for a minor performance boost. |
use_citus | DLSDK_USE_CITUS | --use-citus | boolean | false | Makes StateStorage to run in Citus-compatible mode. |
verbose | DLSDK_VERBOSE | --verbose (-z ) | boolean | false | Produce more detailed logs while running. |
Database configuration
Data Lake runner uses PostgreSQL as a storage backend by default to persist both its own state and the state of the Data Lake. It can't function when it isn't available. Configuring PostgreSQL is simple enough.
The runner will make use of all the standard PG*
environment variables
supported by the pg
package since it invokes new pg.Pool()
without any
arguments. Unless you are using a particularly unusual set-up, you will need to
configure PGHOST
, PGPORT
, PGUSER
, PGPASSWORD
and PGDATABASE
.
As an alternative, you may skip the configuration through the environment variables entirely and pass the database connection pool a construct in your own code as an argument to the runner:
const runner = new sdk.DataLakeRunner({
dbPool: new pg.Pool(),
datalake: datalake,
tsunami: tsunami,
log: new sdk.ConsoleLogger()
})
Note that you don't need to initialize the database schema by hand, the runner will do that automatically.
When using Citus as a storage backend instead of plain old PostgreSQL, you need
to make sure to turn on Citus-compatible mode either by passing a --use-citus
command-line option to the runner or setting the DLSDK_USE_CITUS
environment
variable to true
.
RabbitMQ configuration and alternatives for retrieving new blocks
RabbitMQ serves as the default transport for incoming new block notifications as well as notifications about blockchain reorganizations. If you are using RMQ, you will need to configure it using the environment variables:
DLSDK_RMQ_URL
is used to specify the connection parameters.DLSDK_RMQ_QUEUE
should specify the name of the queue that the runner will listen to.DLSDK_RMQ_SLEEP_MS
specifies the queue polling interval for the runner. As the runner is stateful and processes incoming events synchronously, it polls the RMQ using.get()
, and sleeps for the specified number of milliseconds before attempting to fetch the next message.
RMQ queue name can also be specified through a command-line option
--queue-name
.
The SDK will default to classic RMQ queue type, but using the
--rmq-quorum-queue
command line option or the equivalent
DLSDK_RMQ_QUORUM_QUEUE
environment variable allows switching to the quorum
queue type instead.
Alernatively, as with the database connection pool, you can construct your own
amqplib
connection and pass it directly to the runner (DLSDK_RMQ_URL
will be
ignored if you go with this option):
const runner = new sdk.DataLakeRunner({
rmq: await amqplib.connect('...'),
datalake: datalake,
tsunami: tsunami,
log: new sdk.ConsoleLogger()
})
Note that the runner can function without RabbitMQ by actively polling Tsunami
for new blocks. To employ this mode, you need to run your Data Lake using the
--no-rmq
and --poll-tsunami
command-line options. You can also use
--poll-tsunami
without --no-rmq
, the runner will attempt to use both
RabbitMQ transport and active Tsunami polling simultaneously in that case.
When using --poll-tsunami
, you can use two environment variables to fine-tune
its behavior:
DLSDK_PULL_SLEEP_MS
controls how often the Tsunami polling thread checks whether it should try pulling new blocks.DLSDK_PULL_AWAKEN_MS
specifies the RabbitMQ timeout -- if the runner hasn't seen any new blocks from RMQ for longer that the value of this variable, it will attempt to find new blocks in Tsunami.
When using --poll-tsunami
without --no-rmq
, another environment variable
comes into play: a mutex is used to prevent RMQ and polling threads from trying
to process new blocks at the same time, so DLSDK_MODE_MUTEX_DELAY_MS
is used
to specify the delay between the attempts to obtain this mutex.
Performance tuning
Your Data Lake shouldn't require much performance tuning when running in real-time as it should be easily able to keep up to any blockchain. When processing historical data, performance consideration becomes much more important, as you normally need to munch through preexisting blocks as quickly as possible.
Recovery block frame and related options
The recovery procedure of the runner is used in several contexts:
- During the initial processing of historical data or while catching up after the Data Lake runner's been offline for a while.
- During the
--batch
processing of blocks. - During the processing of blockchain reorganizations, especially important in case of major reorgs.
It consists of two separate phases:
- Rollback attempts to deal with the consequences of a possible blockchain reorganization by reverting the runner state to the last known good block matching the current Tsunami blocks.
- After that, fast-forward procedure is designed to quickly process large numbers of blocks to catch up to the top of the blockchain.
Fast-forward recovery processes the blocks in 'frames' and the size of the frame
is an important performance tuning parameter. It is controlled through the
DLSDK_RECOVERY_BLOCK_FRAME
environment variable. The effect of this value on
performance is non-linear and non-monotonous, and depends on the following
factors:
- The specifics of the Data Lake you are running, in particular the volume and distribution of events you are monitoring as well as the utilization of the state storage.
- The hardware you have available for your runner and database backend as well as the properties of the network connection between the two.
- The specific performance-related options you are using for your runner.
As a result, tuning the block frame size is something of a dark art. We recommend trying the values in the 5 to 200 blocks range and measuring the results.
Among other things, block frame affects the number of blocks that the runner
tries to fetch the events for from Tsunami. This behavior is turned on by
default but it can be counter-productive, especially in cases where the Tsunami
filter that your Data Lake is using changes very often. To turn off this
behavior, use the --no-event-batching
command-line option.
By default, the runner wraps the state updates for every block into a single
transaction. This behavior can be changed so that the runner wraps the updates
for the entire block frame into a single transaction. To switch to that mode,
use the runner's --frame-level-isolation
command-line option. Note that the
effect on performance is likely to be fairly minor by itself but this
command-line option is a prerequisite for employing the in-memory cache, which
is covered in the next section of this document.
Configuring in-memory cache for state updates
The roundtrip time to the database backend can become an issue when processing
historical data, especially if your Data Lake is making a lot of state updates.
You can alleviate this issue by using the --memory-cache
option of the runner.
This is a space/time trade-off as the runner will accumulate the state updates
during the fast-forward recovery, dumping those to the database as a single
query at block frame boundary. The DLSDK_RECOVERY_BLOCK_FRAME
becomes an even
more important tuning parameter when using --memory-cache
.
Note that using --memory-cache
may lead to increased memory consumption, both
on average and in terms of max RAM usage. You may want to use
--force-gc FORCE_GC
command-line option, forcing a garbage collection cycle
after every FORCE_GC
block frames, and configure Node.JS to reduce space
requirements, which is covered in the next section.
--log-memory-usage X
command-line option can be used to dump basic RAM usage
stats to logs every X milliseconds. This may help in profiling the memory usage
of your Data Lake.
--flush-cache
option, or the equivalent DLSDK_FLUSH_CACHE
environment
variable, may be used to set a maximum number of blocks to be processed before
the cache is dumped into the database. Note that this only makes sense when
using the parallel runner in sparse_blocks
mode; otherwise tuning the
DLSDK_RECOVERY_BLOCK_FRAME
variable is a saner way to achieve the same
results.
Configuring Node.JS when using in-memory cache
If you are using --force-gc
option, you will need to run Node with
--expose-gc
command-line option, otherwise --force-gc
will have no effect.
Using --optimize-for-size
is recommended when running with --memory-cache
.
Lastly, make sure the --max-old-space-size
, limiting the arena size for
third-generation objects, is set to around 80-90% of the actual RAM available.
Miscellaneous performance-related options
--unsafe
command-line option turns off some important sanity checks and may
result in a slight performance improvement. Using it is not recommended in
general as the gain is going to be minor but your state will be far less
resistant to being corrupted.
Recursive recovery may happen in case of a blockchain reorganization cascade or in case the Tsunami database is corrupted. By default, the runner will attempt to recover from this by rolling back the current transaction, sleeping for a while, and restarting the recovery procedure; thus, under normal circumstances, it should be fairly successful in doing this. There are two environment variables controlling this procedure:
DLSDK_RECREC_SLEEP_MS
controls the interval in milliseconds between repeated recovery attempts.DLSDK_RECREC_ATTEMPTS
specifies the maximum number of attempts at recursive recovery before terminating the runner.
In case you prefer to rely on your orchestration to recover from unexpected
situations, you may turn off this behavior at all by using the
--no-recursive-recovery
command-line option.
Similarly to the options above, DLSDK_TSUNAMI_RECOVERY_SLEEP_MS
and
DLSDK_TSUNAMI_RECOVERY_ATTEMPTS
control the behavior of the runner in case of
generic problems with Tsunami API client, such as those caused by network
connectivity issues or Tsunami being temporarily down. The default values are
pretty generous and you should have little reason to change them.
--stats
command-line option, along with --stats-from
and --stats-to
to
specify the block range, can be used to display some simple stats on block
processing times. 'Raw' is the time interval between the moment a new_block
event has been received and the moment the block has been processed. Thus, it
may ignore the processing overhead outside of processNewBlockEvent()
, such as
dumping the accumulated state cache to the database. 'Real' computes the time
interval between new_block
events and may be overly pessimistic, especially
when processing real-time data, as opposed to historical data.
Google Cloud logger configuration
If you are using the simple Google Cloud logger class provided with the SDK, its configuration is specified by the three environment variables:
DLSDK_GOOGLE_APPLICATION_CREDENTIALS
is the filename for your app key.DLSDK_GCL_PROJECT_ID
specifies the project ID to be used.DLSDK_GCL_LOG_NAME
is used to specify your log name.
Options reserved for use by the parallel SDK runner
The following options are intended to be used by the parallel SDK runner and should not be used elsewhere:
parallel
current_block
Miscellaneous configuration options
Unless you explicitly override the getProperties()
method in your Data Lake,
specifying the Data Lake ID, the runner will check the environment variable
DLSDK_DATALAKE_ID
to see if one is defined there.
There are also a few more command-line options the runner supports.
--batch
can be used to process a fixed number of blocks instead of listening
to events in the background. This should be mostly useful during development
and/or testing of your Data Lake.
--reset
is also mostly intended for development and testing as it completely
nukes your Data Lake by reinitializing the database schema.
--set-last
should be used with care as it may corrupt the state of the Data
Lake. It is intended as a milder, but still dangerous, alternative of --reset
,
allowing you to roll back your Data Lake without resetting it completely.
--force-downgrade
allows the runner to run after package version downgrade.
--no-debug
forces the runner to discard all the DEBUG
-level log records both
from itself and from the Data Lake.
--show-config
option causes the run to dump its complete effective
configurtion to logs on start-up.
--verbose
mode causes the runner to dump extra debugging information to logs.
--test-mode
enables extra features needed for running integration tests and
shouldn't ever be explicitly used in other contexts.
The no_shutdown_handlers
option, only available as a run()
argument, should
be used in case you are invoking the runner as a part of a larger application,
intending to install your own signal handlers that call the runner's graceful
shutdown()
procedure as needed.
--version
displays the current SDK version and --help
displays the brief
help page similar to the following:
usage: simple.ts [-h] [-v] [-b BATCH] [--flush-cache FLUSH_CACHE] [--force-downgrade] [-g FORCE_GC] [-f] [--insert-only] [-o LOG_MEMORY_USAGE] [-m] [-d] [-x] [-y] [-n] [-p] [-q QUEUE_NAME] [-r] [-k]
[-l SET_LAST] [--show-config] [-s] [--stats-from STATS_FROM] [--stats-to STATS_TO] [-t] [-u] [--use-citus] [-z]
PARSIQ DataLake SDK (@parsiq/datalake-sdk, version 0.17.33) running [UNNAMED-DATALAKE]
optional arguments:
-h, --help show this help message and exit
-v, --version display version
-b BATCH, --batch BATCH
fetch (at most) the next BATCH blocks from Tsunami, process and exit
--flush-cache FLUSH_CACHE
flush the memory cache at specified number of processed blocks
--force-downgrade forces database version downgrade to current package version
-g FORCE_GC, --force-gc FORCE_GC
force garbage collection every FORCE_GC block frames (if available)
-f, --frame-level_isolation
stuff the entire block frame into a single transaction, as opposed to a single block normally
--insert-only disallow storage updates by dropping the ON CONLICT clause for storage INSERTs
-o LOG_MEMORY_USAGE, --log-memory_usage LOG_MEMORY_USAGE
log memory usage to debug log every LOG_MEMORY_USAGE milliseconds
-m, --memory-cache store the state updates in memory and dump to db at block frame boundaries
-d, --no-debug forcibly discard all runner and datalake DEBUG-level log records
-x, --no-event_batching
do not attempt to prefetch the events for multiple blocks while recovering
-y, --no-recursive_recovery
do not attempt to fix recursive recovery
-n, --no-rmq do not listen to RMQ for block events
-p, --poll-tsunami poll Tsunami for new blocks if none were received through RMQ for a while
-q QUEUE_NAME, --queue-name QUEUE_NAME
RMQ queue name to use
-r, --reset reset the datalake to a clean slate before running
-k, --rmq-quorum_queue
use quorum RabbitMQ queue type instead of classic
-l SET_LAST, --set-last SET_LAST
set the last processed block number to the specified value (nuking the states above that block)
--show-config log complete configuration on startup
-s, --stats display block processing stats and exit
--stats-from STATS_FROM
starting block for --stats
--stats-to STATS_TO ending block for --stats
-t, --test-mode enable additional features intended strictly for testing
-u, --unsafe unsafe mode omits some important sanity checks and should only be used for performance critical tasks
--use-citus adapt SQL queries to Citus storage
-z, --verbose produce more detailed log records
Configuration of parallel Data Lakes
Running a parallel Data Lake is similar to running a sequential one, you just
need to use ParallelMultiStorageDataLakeRunner
. Note that you can and
should run multiple instances of the runner safely. The SDK will take care
of safe schema initialization and task distribution across your workers.
Configuration options for parallel Data Lakes are covered in the next section.
Configuration reference
This sections contains reference tables for all the configuration options. Refer to the subsequent sections for details.
The constructor arguments for the ParallelMultiStorageDataLakeRunner
are the
same as for the MultiStorageDataLakeRunner
. The only differences are a
different type for the datalake
argument and the absence of the rmq
argument
as the parallel Data Lakes run only in historical mode.
Constructor argument | Type | Description |
---|---|---|
datalake | IParallelMultiStorageDataLake<ValueTypes, TsunamiFilter, TsunamiEvent> | Data Lake instance to run. |
dbPool | pg.Pool | DB connection pool to use. Optional. If omitted, runner will instantiate its own DB connection pool using the standard PG* enviornment variables. |
log | ILogger | Logger instance to use. Optional. NullLogger will be used by default. |
storageConfig | StorageOptionMapping<ValueTypes> | Only for MultiStorageDataLakeRunner . Describes state storage configuration. |
tsunami | ITsunamiApiClient<TsunamiFilter, TsunamiEvent> | Tsunami API client to use. |
Note that for boolean environment variables, i.e., 1
, yes
and true
(any
case) are interpreted as true
. All other values are interpreted as false
.
run() argument | Environment variable | Command-line option (abbrev.) | Type | Default value | Description |
---|---|---|---|---|---|
forced_unlock_chance | PDLSDK_FORCED_UNLOCK_CHANCE | N/A | number | 0 | A floating point number between 0 and 1 , specifying the chance for the forced task unlocking to occur upon finishing a task even though not all tasks have been exhausted yet. |
group_lock_attempts | PDLSDK_GROUP_LOCK_ATTEMPTS | N/A | number | 25 | Maximum number of repeated task group lock attempts on lock failure. |
group_lock_sleep_ms | PDLSDK_GROUP_LOCK_SLEEP_MS | N/A | number | 200 | Delay in milliseconds before repeated attempt to obtain a task group lock. |
init_sleep_ms | PDLSDK_INIT_SLEEP_MS | N/A | number | 3000 | Delay in milliseconds before repeated attempt to obtain schema initialization lock. |
init_timeout_sql | PDLSDK_INIT_TIMEOUT_SQL | N/A | string | "1 minute" | An SQL interval value for schema initialization timeout. |
metrics_server | PDLSDK_METRICS_SERVER | --metrics-server | boolean | false | Indicates that the metrics server should be started. |
metrics_server_port | PDLSDK_METRICS_SERVER_PORT | --metrics-server-port | number | 8091 | Port number to use for the metrics server. |
mutations_only | PDLSDK_MUTATIONS_ONLY | --mutations-only | boolean | false | Turns on the mutations_only mode for the K-V storages. |
no_debug | PDLSDK_NO_DEBUG | --no-debug (-d ) | boolean | false | Suppresses all debug-level log messages. |
no_shutdown_handlers | N/A | N/A | boolean | false | Suppresses the installation of default signal handlers. May be used when running in the context of a larger application that provides its own signal-handling logic. |
randomize_tasks | PDLSDK_RANDOMIZE_TASKS | --randomize-tasks | boolean | false | Randomizes task selection from a locked task group. Lowers the performance of task selection but essential when running in single_storage mode to reduce task contention across workers. |
reset | N/A | --reset (-r ) | boolean | false | Forces the schema reinitialization. In context of the parallel SDK, it is recommended to trigger forced schema init by other means, specifically manual dropping of the schema table. |
show_config | PDLSDK_SHOW_CONFIG | --show-config | boolean | false | Dumps the complete effective runner configuration to log on start-up. |
single_storage | PDLSDK_SINGLE_STORAGE | --single-storage | boolean | false | Prevents the use of custom sharding so that all workers use a single set of tables. Intended to be used when running with Citus as a storage backend. |
sparse_blocks | PDLSDK_SPARSE_BLOCKS | --sparse-blocks (-s ) | boolean | false | Allows sparse blocks in Tsunami responses and in event processing. Dispenses with the overhead of processing blocks with no relevant events at the cost of slightly weakening the consistency guarantees. |
unlock_stale_tasks_sql | PDLSDK_UNLOCK_STALE_TASKS_SQL | N/A | string | (turned off) | Specifies the SQL interval value for timeout on stale tasks. After the timeout expires, the tasks may be forcibly unlocked even if still marked as LOCKED . If omitted, stale tasks may remain stuck forever. |
update_lock_time_per_block | PDLSDK_UPDATE_LOCK_TIME_PER_BLOCK | N/A | number | (turned off) | Specifies the number of blocks to process before updating the lock time on the current task. If unspecified, the lock time will never be updated, potentially leading to premature forced unlocking. |
use_citus | PDLSDK_USE_CITUS | --use-citus | boolean | false | Switches on the Citus-compatible runner mode. Runner will generate queries, including schema definitions, compatible with Citus storage backend. |
verbose | PDLSDK_VERBOSE | --verbose (-z ) | boolean | false | When switched on, causes the runner to produce additional log records for debugging purposes. |
wait_timeout_sql | PDLSDK_WAIT_TIMEOUT_SQL | N/A | string | 5 minutes | Specifies the waiting state timeout for schema initialization purposes as an SQL interval value. |
Importantly, as the parallel runner uses sequential runners as worker processes,
it is affected by most of the DLSDK_*
environment variables listed above. The
parallel runner overrides the following configuration settings of the worker:
batch
is set to the current job size.current_block
andmax_block
are set to specify the job boundaries.parallel
,no_bdm
,no_shutdown_handlers
,no_recursive_recovery
,frame_level_isolation
andmemory_cache
are always set totrue
.sparse_blocks
,use_citus
,no_debug
andverbose
are always set to match the parallel runner's own configuration.