Skip to main content
Version: Previous

Consolidator - basics

You define a Consolidator service in a consolidator.kts file. Within the file, you can define as many Consolidators as you like. Each one is specified in a Consolidator block of code.

Here is an example of the simplest Consolidator you could define:

consolidator(TRADE, ORDER) {
select {
ORDER {
sum { price * quantity } into TOTAL_NOTIONAL
count() into TRADE_COUNT
}
}
groupBy { Order.ById(orderId) }
}

So, what was going on there?

  • The Consolidator is listening to the TRADE table.
  • It is publishing its aggregation to the ORDER table.
  • It is grouping its aggregation by the field orderID.
  • It is counting the number of trades into TRADE_COUNT and calculating price x quantity into TOTAL_NOTIONAL.

Elements of a Consolidator

In each Consolidator block, you must at least provide:

  • a name
  • an input table or view
  • an output table

In most cases, you will need a lot more than that. Let us look at the elements you can use to create a sophisticated, effective Consolidator.

The empty structure below shows the optional and mandatory code blocks in a single Consolidator block.
Comments are included to provide further information:

consolidators {
config {
// optional file-level configuration
}
// define a consolidation
consolidator("NAME", INPUT_TABLE_OR_VIEW, OUTPUT_TABLE) {
config {
// optional consolidation configuration
}
select {
// select block
}
onCommit {
// optional onCommit block
}
groupBy {
// groupBy block
} into {
//
}
where {
// predicate
}
indexScanOn {
// on demand index scan
}
reprocessSchedule {
//
}
}
}

Now we shall look at each of the possible code blocks in more detail.

config block (optional)

The config block is available at both the file and Consolidator level. File-level configuration will overwrite default properties, and Consolidator properties will overwrite both.

PropertyDescriptionSupports ValuesDefault Value
defaultLogLevelthe default log level for the ConsolidatorTRACE, DEBUG, INFO, WARN, ERRORTRACE
onNotFoundwhat to do if an output record is not foundBUILD, WARN, IGNORE, FAILMust be specified
batchingPeriodthe time in ms before writing to the databaseMust be specified
ignoreIndexScandisables index scansTRUE, FALSEFalse
defaultErrorHandlingwhat to do if an exception is thrownIGNORE, WARN, FAILMust be specified

select block

In the select block, you can specify functions and outputs, for example:

select {
// add the output table here for a more concise syntax
COMMISSION_AND_FEES_SUMMARY {
sum { feeAmount } into FEE_AMOUNT
sum { originalFeeAmount } into ORIGINAL_FEE_AMOUNT
sum { splitFeeAmount } into SPLIT_FEE_AMOUNT
}
}
select {
sum { feeAmount } into CommissionAndFeesSummary::feeAmount
sum { originalFeeAmount } into CommissionAndFeesSummary::originalFeeAmount
sum { splitFeeAmount } into CommissionAndFeesSummary::splitFeeAmount
}

logging

For debugging purposes, the select block also supports logging. By default, the Consolidator logs all events with default level TRACE, but this can be overwritten with custom messages. To do this, use the logJoin, logLeave and logNoop blocks:

select {
...
logJoin { LOG.info("row joined", input) }
logLeave { LOG.info("row left", input) }
logNoop { LOG.info("new row: {}, old row: {}", newInput, oldInput) }
}

onCommit block (optional)

This block is optional. In the onCommit block, you can amend the output row, after all the functions have been applied, but before it is written to the database. In the onCommit block, you have access to both the input and the output objects. The input property can be any one of the input rows picked up during the consolidation, so this should be handled with care.

This block can be useful to do further calculations based on the consolidated values, for example:

onCommit {
val accruedInterest = if (input.isDirtyPrice) input.accruedInterest else 0.0
val netAmount = input.notional + accruedInterest
output.netAmount = when (input.side) {
Side.BUY -> netAmount + output.totalTransactionCosts
Side.SELL -> netAmount - output.totalTransactionCosts
}
}

groupBy into syntax

The syntax of groupBy is significantly different for standard Consolidators and object Consolidators. For object Consolidators, table syntax is more complex, as records need to be loaded and created. Also, the table syntax supports index scans, which need to be configured.

The groupBy-into syntax determines:

  • how records are grouped groupBy { ... }
  • how the Consolidator interacts with the database into { ... }
    • how output records are loaded from the database into { lookup { ... } }
    • how output records are built when no record is found in the database into { build { ... } }
    • how to look up records after an index scan into { indexScan { ... } }

Syntax:

groupBy { /* return group id*/ } into {
lookup { /* return unique index on output table */ }
build { /* return new output record */ }
indexScan { /* return index on input table */ }
}

The groupBy-into syntax determines:

  • how records are grouped groupBy { ... }
  • how output records are constructed into { ... }

Syntax:

groupBy { /* return group id*/ } into { /* return new output record */ }

groupBy

The code you put in your groupBy block determines the groupId. That is important, because the groupId determines the level at which records are aggregated. For example, you can set up the code to group by instrument; in this case, the calculation would then aggregate per instrument.

The result of the groupBy block can be any kotlin type, as long as it can be used to uniquely identify a grouping. That is, as long as the result has a consistent equals method. This includes but is not limited to:

  • single fields from the input table:
groupBy { allocationId }
  • a type safe tuple of input table fields:
groupBy { tuple(allocationId, feeGroup) }
  • a string concatenation of input table fields:
groupBy { group(allocationId, feeGroup) }
  • unique index entries on the output table (table only):
groupBy { CommissionAndFeesSummary.ByAllocationId(allocationId, feeGroup) }

Consolidations support single or multiple groupings. Multiple groupings are useful when aggregating data by different levels: for example, where you want to calculate trade totals per currency as well as by counterparty.

Using tuple and group in groupBy can later be interacted with in sub blocks such as lookup.

groupBy { group(orderDate.year) } into {
lookup { OrderSummary.byGroupId(groupId) }
}
groupBy { Tuple2(orderDate.year, orderDate.monthOfYear) } into {
lookup { OrderSummary.byGroupId("${groupId.value1}-${groupId.value2}") }
}

This can also be shortened into the following format, which is more concise and can be easier to understand:

groupBy { OrderSummary.byGroupId("${orderDate.year}") }
groupBy { OrderSummary.byGroupId("${orderDate.year}-${orderDate.monthOfYear}") }

into

The into statement is different for standard and object Consolidators:

lookup

The lookup block is optional when grouping by a unique index on the output table. In all other cases, the lookup should be defined. In this block, you have access to the input and groupId properties.

Example:

groupBy { tradeId } into {
lookup { Trade.ById(groupId) }
}

build

The build block is required if the output table has non-null fields without default values.

Example:

groupBy { Trade.ById(tradeId) } into {
build {
Trade {
tradeId = groupId.tradeId
feeAmount = input.feeAmount ?: 0
}
}
}

indexScan

If any of the functions triggers an index scan, the Consolidator needs to know which records are affected. indexScan will tell the Consolidator how to do that. For example:

groupBy { Order.ById(orderid) } into {
indexScan { Trade.ByOrderId(groupId.orderId) }
}

Consolidator objects need to be able to build output objects on demand. There is no need to interact with the database at this point.

groupBy { orderId } into {
Order {
orderId = groupId
}
}

where block (optional)

The where block will filter records prior to consolidation. There are two modes for this filter. In the default mode, the consolidation events will be modified, depending on the predicate. This means that a modify event might appear as an insert or delete.

example:

where {
quantity > 1000
}

Optionally, the where block takes an ignore parameter that will cause it to ignore certain records. Any records matching the qualifications specified will be completely ignored.

where(ignore = true) {
tradeDate < today()
}

reprocessSchedule block (optional)

Some consolidations might require periodic reprocessing of data. This will trigger a cold start on a selected range of data.

Functions

Functions are the building blocks of the select block.

With one exception, all functions require input.

The exception is count, which can either have an input or no input.

The syntax for an input to a GPAL function is sum { feeAmount }.

Within the curly brackets of the function, you can access all the fields on a row, and you can use any Kotlin operation on the row. The function will be applied over the result, unless the result is null, in which case it will be ignored.

FunctionDescriptionInputOutputIndex Scan
sumsums values in the value fieldany numbersame as inputnever
countcounts all records-INTEGERnever
counts records that have a valueanythingINTEGERnever
countDistinctcounts distinct value valuesanythingINTEGERalways
countBigcounts all records-LONGnever
counts records that have a valueany valueLONGnever
avgaverage valueany numbersame as inputalways
minminimum valueany numbersame as inputsometimes *
maxmaximum valueany numbersame as inputsometimes *
stdevstandard deviation for valueany numberDOUBLEalways
stdevppopulation standard deviation for valueany numberDOUBLEalways
variancestatistical variance for valueany numberDOUBLEalways
varianceppopulation statistical variance for valueany numberDOUBLEalways
stringAggstring concatenationany stringSTRINGsometimes +
checksumcalculates a hash over the inputany valueLONGalways

* if previous min or max value is removed
+ if previous any value is changed

Function examples

Here are some simple examples of functions:

sum { feeAmount }                   // sums the FEE_AMOUNT
sum { feeAmount + otherAmount } // sums the total of FEE_AMOUNT plus OTHER_AMOUNT
sum { feeAmount ?: otherAmount } // sum FEE_AMOUNT or OTHER_AMOUNT if FEE_AMOUNT is null
count () // counts the number of records
count { feeAmount } // counts the records with a FEE_AMOUNT
// etc.

Note that you can also create custom functions.

Assigning functions to fields

After a function is defined, its output can be directed to an output field. To do this, use the into keyword. Note that functions can only be applied to fields that match in type. For example, you can only assign Double value to a DOUBLE field.

sum { feeAmount } into FEE_AMOUNT
sum { originalFeeAmount } into ORIGINAL_FEE_AMOUNT
sum { splitFeeAmount } into SPLIT_FEE_AMOUNT
sum { feeAmount } into Order::feeAmount
sum { originalFeeAmount } into Order::originalFeeAmount
sum { splitFeeAmount } into Order::splitFeeAmount

Transformations on functions

The Consolidator also supports higher-level functions; this is where you can apply a transformation on the function, before it is assigned.

onlyIf

Where this is present, the function only applies to rows that meet the condition specified, for example:

sum { feeAmount } onlyIf { feeGroup == FeeGroup.COMMISSION } into TOTAL_COMMISSION

withInitialValue

Some functions support an initial value. Within this context, you can access the first input row, as well as the output object, for example:

sum { -feeAmount } withInitialValue { output.expectedFees } into OUTSTANDING_FEES

pivotBy

This function can direct a function result across different columns, and change the into keyword. Within the into tag, the pivot property will contain the value of the value returned in the pivotBy { ... } tag.

sum { feeAmount } pivotBy { feeGroup } into {
when (pivot) {
FeeGroup.STAMP -> TOTAL_STAMP
FeeGroup.COMMISSION -> TOTAL_COMMISSION
FeeGroup.FEE -> TOTAL_FEES
FeeGroup.LEVY -> TOTAL_LEVY
FeeGroup.TAX -> TOTAL_TAX
FeeGroup.OTHER -> TOTAL_OTHER
FeeGroup.LOCAL -> TOTAL_LOCAL
FeeGroup.CHARGE -> TOTAL_CHARGE
FeeGroup.RESEARCH -> TOTAL_RESEARCH
}
}

The values within this when statement must be exhaustive. If this is not ideal then we would recommend using onlyIf.

Shared function definitions

Function definitions can also be assigned to variables and assigned to multiple outputs, for example:

val feeSum = sum { feeAmount }
feeSum into TOTAL_FEES
feeSum onlyIf { feeGroup == FeeGroup.COMMISSION } into TOTAL_COMMISSION

index scans

Functions can sometimes trigger an index scan. This is when a Consolidator needs to re-read previously consolidated rows in order to calculate the correct value.

  • For some functions, this is never required: for example, sum and count.
  • For some functions, it is required sometimes: for example, min and max.
  • For some functions, it is always required: for example, stDev.

Starting and killing the process

All Genesis processes can be started or killed using the startServer and killServer commands. But here's the thing; if the Consolidator process stops for any reason, you should almost certainly perform a cold start when you restart the process. This ensures that any changes to data while the process was not running are properly recalculated before any real-time calculations are triggered.

warning

If you simply restart the Consolidator process, then any changes to data that occurred while the process was not running will not be recalculated. Got that? The changed data will not be recalculated.

The startProcess command (cold start)

A cold start avoids the danger of losing your calculated data. To make a cold start, run the command

startProcess GENESIS_CONSOLIDATOR --coldStart

In this case, the Consolidator process is called GENESIS_CONSOLIDATOR. If in doubt, you can find the exact name of your consolidator in the service-definitions file.

Alternatively, you can enable --coldStart persistently by adding it to the <arguments></arguments> option. More info can be found in configuring runtime.

<process name="POSITION_CONSOLIDATOR">
<groupId>POSITION</groupId>
<description>Consolidates trades to calculate positions</description>
<start>true</start>
<options>-Xmx256m -DRedirectStreamsToLog=true -DXSD_VALIDATE=false -XX:MaxHeapFreeRatio=70 -XX:MinHeapFreeRatio=30 -XX:+UseG1GC -XX:+UseStringDeduplication -XX:OnOutOfMemoryError="handleOutOfMemoryError.sh %p"</options>
<module>genesis-pal-consolidator</module>
<package>global.genesis.pal.consolidator</package>
<primaryOnly>false</primaryOnly>
<script>position-consolidator.kts</script>
<loggingLevel>DEBUG,DATADUMP_ON</loggingLevel>
<language>pal</language>
<arguments>--coldStart</arguments>
</process>
</process>

This command consolidates all records in the system before starting the real-time event-driven consolidations. At the beginning of a cold start, all fields in consolidationFields of the consolidation table are zeroed (or deleted, if transient) before initiating the re-consolidation of all the records in the database.

Troubleshooting

You can set the default logging level for all the Consolidators in your application-consolidator.kts file using a config statement at the beginning. However, within any individual Consolidator, you can also set a logging level that overrides this setting. If a Consolidator is not functioning as expected, raise its logging level to INFO, or even higher. Let's see a very simple example. Here the default logging level has been set to INFO. However, Consolidator B has its own loglevel, DEBUG, which overrides the file-level setting:

consolidators {
config {
logLevel = INFO
}
consolidator ("A", ...) {
...
}
consolidator ("B", ...) {
config {
logLevel = DEBUG
}
...
}
}

Custom Log messages

Genesis provides a class called LOG that can be used to insert custom log messages. This class provides you with 5 methods to be used accordingly: info, warn, error,trade,debug. To use these methods, you need to provide, as an input, a string that will be inserted into the Logs.

Here is an example where you can insert an info message into the log file:

LOG.info("This is an info message")

The LOG instance is a Logger from SLF4J.

note

In order to see these messages, you must set the logging level accordingly. You can do this using the logLevel command.