Consolidator - advanced
Multiple groupBy
statements
Consolidators support multiple groupBy
statements in a single consolidation. This is useful when you want to consolidate the same table at multiple levels, and it ensures that the consolidation is consistent across multiple levels.
For example, the consolidator below has two groupBy
statements to ensure that it consolidates on order year and on order year and month:
consolidator("CON_ORDER_SUMMARY_FROM_ORDER", ORDER, ORDER_SUMMARY) {
select {
ORDER_SUMMARY {
sum { totalNotional } into TOTAL_NOTIONAL
sum { totalQuantity } into TOTAL_QUANTITY
sum { tradeCount } into TRADE_COUNT
}
}
groupBy { OrderSummary.byGroupId("${orderDate.year}") }
groupBy { OrderSummary.byGroupId("${orderDate.year}-${orderDate.monthOfYear}") }
}
Consolidator groups
Consolidator groups are essential if you are using a non-transaction database.
groupName
identifies a group of Consolidators.
groupName = "_name_"
If you include this statement in your consolidator
block, then the Consolidator will belong to the named group.
In a non-transaction database, a group is designed to offer consistent consolidation in the absence of ACID guarantees at the database level. Consolidators in the same group will not interfere with each other's calculations as they update - particularly where they output to the same table.
This is limited to Consolidator updates within a group in a single process. Updates in other groups, other processes or other nodes could still interfere. You must plan this carefully.
Below is an example where we have declared two consolidator
blocks. Each has groupName = "ORDER"
, so they are in the same group. The two consolidator
blocks handle different types of order - but they are aggregated into the same three output tables: TOTAL_NOTIONAL
, TOTAL_QUANTITY
and TRADE_COUNT
.
consolidator(SWAP_ORDERS, ORDER_SUMMARY) {
config {
groupName = "ORDER"
}
select {
ORDER_SUMMARY {
sum { totalNotional } into TOTAL_NOTIONAL
sum { totalQuantity } into TOTAL_QUANTITY
sum { tradeCount } into TRADE_COUNT
}
}
groupBy { OrderSummary.byGroupId("${orderDate.year}-${orderDate.monthOfYear}") }
}
consolidator(FX_ORDERS, ORDER_SUMMARY) {
config {
groupName = "ORDER"
}
select {
ORDER_SUMMARY {
sum { totalNotional } into TOTAL_NOTIONAL
sum { totalQuantity } into TOTAL_QUANTITY
sum { tradeCount } into TRADE_COUNT
}
}
groupBy { OrderSummary.byGroupId("${orderDate.year}-${orderDate.monthOfYear}") }
}
Custom functions
Consolidators also support custom functions that allow you to specify behaviour for join, leave and noop operations.
There are two parts to defining a custom functions:
- Select an input.
- Define the consolidation.
After the custom function has been defined, it supports the same syntax as other functions, including into
, pivotBy
,
etc.
In the example below, the sum
function is defined as a custom function. It uses feeAmount
as the input, and applies three operations:
using { feeAmount } withOperations {
onJoin { previousValue + input }
onLeave { previousValue - input }
onNoop { previousValue + newInput - oldInput }
} into value
usingRow
There are two types of input for custom functions:
using
takes an input from a row, like any other function. The return type determines the type of the function.usingRow
takes the whole row as input. The type of function must be specified.
The function in the example above can also be implemented with usingRow
, as shown below:
usingRow(DOUBLE) withOperations {
onJoin { previousValue + input.feeAmount.orZero() }
onLeave { previousValue - input.feeAmount.orZero() }
onNoop { previousValue + newInput.feeAmount.orZero() - oldInput.feeAmount.orZero() }
} into value
This shows the benefit of using
, as it handles null
values. The orZero()
call takes any nullable number and returns the value, or 0
if it is null.
withOperations
This block enables you to specify the behaviour of the different consolidation operations (including the previous example).
In that example:
- a join is when a row is added to the consolidation group
- a leave is when a record leaves the consolidation group
- a noop is when a record is modified while staying in the same consolidation group: for example, if a price or a fee is changed
Each operation has access to the operation context as follows:
onJoin
previousValue
input
onLeave
previousValue
input
onNoop
previousValue
newInput
oldInput
withAggregation
You can use this function for a number of requirements:
- where the function needs to consider all values, rather than just one at time
- where you require a different function outcome, other than update value
- where no update at all is required
- where the group id should be reconsolidated
For this function:
- The
input
variable holds aList
of aggregation events. - The
previousValue
holds the previous value. - the
sum
function useswithAggregation
input
contains only Join
, Leave
and Noop
values. (You can see how these are accessed in the example below.)
The example below uses the Kotlin function fold
to calculate the value acc
, which is the aggregated value for a group, such as total fees.
The function ends with an asUpdate()
call. This effectively says, use the value you now have.
using { feeAmount } withAggregation {
input.fold(previousValue) { acc, value ->
when (value) {
is Join -> acc + value.value
is Leave -> acc - value.value
is Noop -> acc + value.new - value.old
}
}.asUpdate()
}
withAggregation
does not have to end with an asUpdate()
call. Two other return values are also available:
Noop
causes the function to ignore the input for this particular field, and there is no change written to the database. For example, this is used during an iterative comparison to find a maximum value. The function compares the next value with the previous; if it is not higher, then returnNoop
.IndexScan
causes the function to re-evaluate every database value for that key. For example, if the record with the maximum value has been deleted from the database, go to the database and find the new maximum value.