Database interface - Entity Db
The entity db enables you to interact with the database layer; you can use any generated type-safe entities for tables and views. The interface supports the same operations as the generated repositories, but will accept any entity. It supports read operations for views and tables and write operations for tables only.
The entity db differs from the generated repositories in that it can handle any table and most view entities. It differs from RxDb
in that all operations are type-safe.
The entity db is available in the Kotlin Event Handler.
It can be injected in Kotlin using AsyncEntityDb
and in Java using RxEntityDb
.
There are three versions of the entity db:
AsyncEntityDb
- this API supports Kotlin coroutinesSyncEntityDb
- this API is blockingRxEntityDb
- this API supports RxJava
The SyncEntityDb
API was introduced in version 7.0 of the Genesis platform.
Which version to use?
The version to use depends on:
- whether you are writing code in Kotlin or Java
- whether there is a need to write asynchronous (async) or synchronous (blocking) code
Asynchronous | Blocking | |
---|---|---|
Kotlin | AsyncEntityDb | SyncEntityDb |
Java | RxEntityDb | SyncEntityDb |
For more information on the differences between the APIs, see Types of API.
EntityDb | |
---|---|
Supports tables | ✔️ |
Supports views | ✔️ |
Supports any data type | ✔️ |
Class to import | AsyncEntityDb RxEntityDb |
Type-safe read and write | ✔️ |
Type-safe write result | ✔️ |
Returns data as | table or view entities |
Writes data as | table or view entities |
References indexes as | index entities |
Programming interface | Async or RxJava |
Write (input) | Modify Details |
Write (output) | Write Result |
Subscribe | Record Update of entity |
Bulk or Range Subscribe | Bulk of entity |
Available in Custom Event Handlers | ✔️ |
Available in Custom Request Servers | ✔️ |
When referring to indices in the database operations, the database accepts index classes or entity class in combination with index references. For comparison:
Type convention
Type | Meaning | Example |
---|---|---|
E | A table or view entity | Trade |
T | A table entity | Trade |
V | A view entity | TradeView |
EntityIndex<E> | An index of E | Trade.ById |
UniqueEntityIndex<E> | A unique index of E | Trade.ById |
NonUniqueEntityIndex<E> | A non unique index of E | Trade.ByDate |
EntityIndexReference<E> | An index reference of E | Trade.ById |
UniqueEntityIndexReference<E> | A unique index reference of E | Trade.ById |
NonUniqueEntityIndexReference<E> | A non unique index reference of E | Trade.ByDate |
F<E> | The full table /view name for E | TRADE |
Class<E> | The class reference for E | Trade.class |
KClass<E> | The Kotlin class reference for E | Trade::class |
Read operations
get
Get is a simple lookup on the database; it will return a single entity if a match is found, or no records if none is found.
The following overloads exist for get; fields
is a Set<String>
.
get(E, EntityIndexReference<E>, fields) : E?
get(E, fields) : E?
get(E, EntityIndexReference<E>) : E?
get(UniqueEntityIndex<E>, fields) : E?
get(UniqueEntityIndex<E>) : E?
Syntax
- Kotlin
- Java - RxJava
- Java - Blocking
// we can look up trades by passing in a unique index class:
val trade = db.get(Trade.byId("TRADE_1"))
// a trade object with the primary key set
val trade = db.get(trade)
// a trade object and a reference to unique index
val trade = db.get(trade, Trade.ByTypeId)
// or you can access the index class from the entity
val trade = db.get(trade.byTypeId())
// we can look up trades by passing in a unique index class:
final var trade = db.get(Trade.byId("TRADE_1"))
.blockingGet();
// a trade object with the primary key set
final var trade = db.get(trade)
.blockingGet();
// a trade object and a reference to unique index
final var trade = db.get(trade,Trade.ByTypeId.Companion)
.blockingGet();
// or you can access the index class from the entity
final var trade = db.get(trade.byTypeId())
.blockingGet();
// we can look up trades by passing in a unique index class:
final var trade = db.get(Trade.byId("TRADE_1"))
// a trade object with the primary key set
final var trade = db.get(trade)
// a trade object and a reference to unique index
final var trade = db.get(trade,Trade.ByTypeId.Companion)
// or you can access the index class from the entity
final var trade = db.get(trade.byTypeId())
getAll
This takes multiple unique index class instances and returns the type entity type for the record. It takes
a List<Pair<String, UniqueEntityIndex<E>>>
, where the String
is a unique reference to each request.
Overloads
getAll(requestDetails: Flow<Pair<String, UI<E>>): Map<String, E?>
getAll(requestDetails: List<Pair<String, UI<E>>): Map<String, E?>
val map = db.getAll(listOf("A" to Trade.byId("TRADE_A"), "B" to Trade.byId("TRADE_B")))
val recordA = map["A"]
val recordB = map["B"]
getAllAsList
This operation is similar to the one above, but takes a List<UniqueEntityIndex<E>>
, and returns a List<E?>
.
The results are returned in the order they were requested and will be null
if no record was found. The result
list is guaranteed to be the same count as the input.
Overloads
getAllAsList(Flow<UI<E>>): List<E?>
getAllAsList(List<UI<E>>): List<E?>
getAllAsList(vararg UI<E>): List<E?>
val list = db.getAllAsList(Trade.byId("TRADE_A"), Trade.byId("TRADE_B"))
val recordA = list[0]
val recordB = list[1]
getBulk
This operation creates a Flowable
of the whole table.
The records will be sorted in ascending order by the index provided, or by the primary key if none is provided.
There is also the getBulkFromEnd
function, which returns records in descending order.
There are also a number of continuation operations, which return the whole table after the provided record. These methods are deprecated and should not be used going forwards.
Overloads
getBulk<E>(): Flow<E>
(Kotlin only)getBulk([Class<E> / KClass<E>]): Flow<E>
getBulk(UR<E>): Flow<E>
getBulk(UR<E>, fields): Flow<E>
getBulk(UR<E>, E, fields): Flow<E>
(continuation) (Deprecated)getBulkFromEnd(UR<E>): Flow<E>
getBulkFromEnd(UR<E>, E), E: Flow<E>
(continuation) (Deprecated)getBulkFromEnd(UR<E>, E, fields), E: Flow<E>
(continuation) (Deprecated)getBulkFromEnd(UR<E>, fields): Flow<E>
Syntax
- Kotlin
- Java
// we can pass in Trade as a type parameter
val flow = db.getBulk<Trade>()
// we can pass in the TRADE object
val flow = db.getBulk(TRADE)
// or we can pass in an index reference
val flow = db.getBulk(Trade.ByTypeId)
// we can pass in Trade as a type parameter
final var flowable = db.getBulk(Trade.class);
// we can pass in the TRADE object
final var flowable = db.getBulk(TRADE.INSTACE);
// or we can pass in an index reference
final var flowable = db.getBulk(Trade.ById.Companion);
getRange
getRange
is an operation that can return multiple records from the database.
There are two types of getRange
operation:
- Single-value range, e.g. trades with CURRENCY set to USD
- Interval range, e.g. all trades with TRADE_DATE between 01-Jul-2024 and 01-Aug-2024.
Unlike the get
operation, the getRange
operation can always return multiple records.
We pass index entities into getRange
. When using an interval range, both index entities should be of the same type. in the example below, this is Trade.byDate
.
- Kotlin
- Java
// single value
val range = db.getRange(Trade.byCurrency("USD"))
// interval
val range = db.getRange(
Trade.byDate(myStartDate),
Trade.ByDate(myEndDate)
)
// single value
var range = db.getRange(Trade.byCurrencyId("USD"));
// interval
var range = db.getRange(
Trade.byDate(startDate),
Trade.byDate(endDate)
);
Table entities provide methods for building index entities:
- Kotlin
- Java
// single value
val range = getRange(myUsdTrade.byCurrency())
// interval
val range = db.getRange(myStartTrade.byDate(), myEndTrade.byDate())
// single value
var range = getRange(myUsdTrade.byCurrency());
// interval
var range = db.getRange(myStartTrade.byDate(), myEndTrade.byDate());
numKeyFields
property
This allows users to query on part of the index. When using the getRange
operation, you can tell the database how many fields in the index to use.
By default, all fields are used, but you can override this.
Order matters here; you can get a range on just field 1, field 1 and field 2, but not on field 2 and field 1.
For example, if on the ORDER table, we had an index on COUNTERPARTY and INSTRUMENT_ID, we could:
- perform a single-value
getRange
operation onCOUNTERPARTY
. For this, setnumKeyFields
to1
. - perform a single-value
getRange
operation on bothCOUNTERPARTY
andINSTRUMENT
. For this, setnumKeyFields
to2
.
For example:
- Kotlin
- Java
// we could look up all rows for both COUNTERPARTY and INSTRUMENT:
val range = db.getRange(myOrder.byCounterpartyAndInstrument())
// which would be equivalent to
val range = db.getRange(myOrder.byCounterpartyAndInstrument(), 2)
// and also
val range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A", "VOD"))
// or we could look pu by just COUNTERPARTY
val range = db.getRange(myOrder.byCounterpartyAndInstrument(), 1)
// which would be equivalent to
val range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A"))
// we could look up all rows for both COUNTERPARTY and INSTRUMENT:
var range = db.getRange(myOrder.byCounterpartyAndInstrument());
// which would be equivalent to
var range = db.getRange(myOrder.byCounterpartyAndInstrument(), 2);
// and also
var range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A", "VOD"));
// or we could look pu by just COUNTERPARTY
var range = db.getRange(myOrder.byCounterpartyAndInstrument(), 1));
// which would be equivalent to
var range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A"));
numKeyFields
is required in the following instances:
- when passing in a table entity and an index reference
- when passing in an index entity that refers to a unique index
Fields
When using a getRange
operation, you can provide a Set<String>
with the field names you want to select.
This will only be used as a hint; non-nullable fields will always be returned from the database.
- Kotlin
- Java
val range = getRange(
myUsdTrade.byCurrency(),
fields = setOf("PRICE", "QUANTITY")
)
var range = db.getRange(
Trade.byCurrencyId("USD"),
1,
Set.of("PRICE", "QUANTITY")
);
Reversed range lookups
In addition to getRange
there is also the getRangeFromEnd
method.
This works in the same way as getRange
. However, rows are returned in the reverse order.
Parameters when using index entities
Name | Type | Required | Meaning | Default value |
---|---|---|---|---|
Index | EntityIndex<E> | ✔️, For single value ranges | The index entry to read | |
Start index | EntityIndex<E> | ✔️, For interval ranges | The index entry to read from | |
End index | EntityIndex<E> | ✔️, For interval ranges | The index entry to read to | |
numKeyFields | Int | ❌ | The number of key fields to take into account | All fields in the index |
fields | Set<String> | ❌ | The names of the fields to return | return all fields |
Write operations
All write operations have versions that take a single entity and versions that take multiple entities.
The return values for these operations are type-safe (see details below), provided all entries are of the same type. For example, when inserting multiple Trade
entries, the return type will be List<InsertResult<Trade>>
. Different entity types can be inserted in the same operation; however, the return type will be List<InsertResult<Entity>>
. Also, modify operations
only accept table entities.
Default and generated values
When writing a record to the database, typically all non-null properties should be set on the entity. An entity property becomes non-nullable if:
- it has a default value
- it is generated by the database, i.e. sequence or auto increment fields
- the column is included in an index or is specifically declared not null in the schema
Default values
Properties with a default value will have the value set by default, unless set explicitly in the builder.
Generated properties
Generated properties are left in an indeterminate state if not set in the builder. When writing to the database, this indeterminate state is set in the return value. Trying to read the property while it is in this state results in an IllegalArgumentException
.
Each generated property has only two read-only associated properties to access these properties in a safe manner.
is[FieldName]Generated
boolean property[fieldName]OrNull
property
For example:
- Kotlin
- Java
// tradeId is generated
trade.tradeId // will cause an exception if not initialised
trade.tradeIdOrNull // will return the tradeId if set, or else null
trade.isTradeIdInitialised // will return true if set
// tradeId is generated
trade.getTradeId(); // will cause an exception if not initialised
trade.getTradeIdOrNull(); // will return the tradeId if set, or else null
trade.isTradeIdInitialised(); // will return true if set
Columns in indices or are not null explicitly
Columns in indices or declared not null should always be set in a builder, unless it has a default value or is a generated column. In all other instances, a NullPointerException
will be thrown when building the object.
Insert
This inserts a new record into the database. The insert
function takes a single table entity. The insertAll
function takes multiple records, and has several overloads:
Overloads
insert(E): InsertResult<E>
insertAll(vararg E): List<InsertResult<E>>
insertAll(List<E>): List<InsertResult<E>>
insertAll(Flow<E>): List<InsertResult<E>>
Modify
This tries to modify a record in the database. If the record does not exist, an error is thrown.
Overloads
modify(EntityModifyDetails<E>): ModifyResult<E>
modify(E): ModifyResult<E>
modify(E, UniqueEntityIndexReference<E>): ModifyResult<E>
modifyAll(vararg E): List<ModifyResult<E>>
modifyAll(vararg EntityModifyDetails<E>): List<ModifyResult<E>>
modifyAll(List<EntityModifyDetails<E>>): List<ModifyResult<E>>
modifyAll(Flow<EntityModifyDetails<E>>): List<ModifyResult<E>>
Upsert
This tries to modify a record in the database. If the record does not exist, the record is inserted instead.
Overloads
upsert(EntityModifyDetails<E>): UpsertResult<E>
upsert(E): UpsertResult<E>
upsertAll(vararg E): List<UpsertResult<E>>
upsertAll(vararg EntityModifyDetails<E>): List<UpsertResult<E>>
upsertAll(List<EntityModifyDetails<E>>): List<UpsertResult<E>>
upsertAll(Flow<EntityModifyDetails<E>>): List<UpsertResult<E>>
Delete
This tries to delete a record.
Overloads
delete(E): DeleteResult<E>
delete(UniqueEntityIndex<E>): DeleteResult<E>
deleteAll(vararg E): List<DeleteResult<E>>
deleteAll(vararg UniqueEntityIndex<E>): List<DeleteResult<E>>
deleteAll(List<E>): List<DeleteResult<E>>
deleteAll(Flow<E>): List<DeleteResult<E>>
Update
The update operation is a condensed syntax for modifying data in the database. It works by providing a scope and a transformation. The scope could be one of:
updateBy
- a unique indexupdateRangeBy
- an index rangeupdateAll
- a whole table
The transformation is a lambda, where the rows that are in scope are provided one by one. The rows are provided as in
the database, and can be modified in place, with the changes applied to the database. All update calls use the
safeWriteTransaction
and are transactional if the database supports it.
-
In the async entity db, the lambda will be of type
E.() -> Unit
. The entity will be the receiver and in the lambda,this
will refer to the row, which should be modified in place. -
In the rx entity db, the lambda will be of type
Consumer<E>
. The lambda will have a single parameter, the entity. Similar to the async version, the row should be modified in place.
In both cases, the full record will be provided, and values can be read as well as updated. The operations return
List<ModifyResult<E>>
for updateAll
and updateRangeBy
methods and ModifyResult<E>
for the updateBy
operation.
For example:
- Kotlin
- Java
db.updateBy(Trade.byId("xxxxx")) {
price = 15.0
}
db.updateByRange(Trade.byOrderId("xxxx")) {
orderStatus = OrderStatus.CANCELLED
}
db.updateByRange(Trade.byOrderId("xxxx"), Trade.byOrderId("yyyy") {
orderStatus = OrderStatus.CANCELLED
}
db.updateAll<Trade> {
orderStatus = OrderStatus.CANCELLED
}
db.updateBy(Trade.byId("xxx"), trade -> {
trade.setPrice(15.0);
}).blockingGet();
db.updateByRange(Trade.byOrderId("xxxx"), trade -> {
trade.setTradeType(OrderStatus.CANCELLED);
}).blockingGet();
db.updateByRange(Trade.byOrderId("xxxx"), Trade.byOrderId("yyyy"), trade -> {
trade.setTradeType(OrderStatus.CANCELLED);
}).blockingGet();
db.updateAll(Trade.class, trade -> {
trade.setTradeType(OrderStatus.CANCELLED);
}).blockingGet();
Recover
The recover
operation enables you to insert a document into the database using the record's preset timestamp and ID.
The following recover operations are supported:
recover
recoverAll
This API must be used with caution. Integrity checks are skipped, so it can leave your Genesis application in a poor state if used incorrectly. Record IDs and timestamps are assumed to be unique.
Transactions
If the underlying database supports transactions, then the entity db provides type-safe access to these. A read transaction will support the same read operations as the entity db, and a write transaction will support the same read and write operations. If a write transaction fails, all operations will be reverted. Subscribe operations are not supported within transactions.
Transactions are supported on database layers: FoundationDb, MS SQL, Oracle and Postgresql.
When code is expected to run on multiple database types, transactions should be used when available. You can use safeReadTransaction
and safeWriteTransaction
. These run operations in the block in a single transaction, if supported.
There is a distinction between using Kotlin and Java here.
- When using Kotlin, the transaction is the receiver in the
readTransaction
call. This means that within the block,this
refers to the transaction. - When using Java, the transaction is the first parameter of the lambda.
Read transactions
Read transactions ensure that all read operations are consistent. Intervening writes will not affect reads within the
transaction. The return value in the transaction will also be returned from the transaction. For the RxEntityDb
, it
will be a Single<*>
.
- Kotlin
- Java
val orderTrade = db.readTransaction {
val trade = get(Trade.byId("TR_123"))
val order = get(Order.byId(trade.orderId))
buildOrderTrade(order, trade)
}
Single<OrderTrade> orderTrade = db.readTransaction(transaction -> {
final var trade = transaction.get(Trade.byId("TR_123")).blockingGet();
final var order = transaction.get(Order.byId(trade.orderId)).blockingGet();
return buildOrderTrade(order, trade);
});
Write transactions
Write transactions ensure all read and write operations are consistent. If any exception reaches the
transaction level, all writes are rolled back. The writeTransaction
will return a Pair<T, List<EntityWriteResult<*>>>
,
where T
is the value returned in the writeTransaction
lambda.
- Kotlin
- Java
val (orderId, writeResults) = db.writeTransaction {
insert(trade)
val orderInsert = insert(order)
orderInsert.record.orderId
}
final var pair = db.writeTransaction(transaction -> {
insert(trade).blockingGet();
final var orderInsert = insert(order).blockingGet();
return orderInsert.getRecord.getOrderId();
}).blockingGet();
final var orderId = pair.getFirst();
final var writeResults = pair.getSecond();
Subscribe operations
Subscribe
The subscribe
operation starts a database listener that receives updates to tables or views.
Subscribe parameters
Subscribe supports the following parameters:
Name | Type | Required | Meaning | Default value |
---|---|---|---|---|
Table name | String | ✔️ | The table to subscribe to | n/a |
fields | Set<String> | ❌ | Only listen to changes on selected fields | listen to all fields |
delay | Int | ❌ | Group and publish updates every x ms | no grouping |
subscribeLocally | Boolean | ❌ | When in a cluster, only listen to local updates | false |
Overloads
The rx entity db takes a Class<E>
, whereas the async entity db takes a KClass<E>
.
Parameters marked with an asterisk(*) are optional.
subscribe([KClass<E> / Class<E>], delay*, fields*, subscribeLocally*): Flow<E>
These functions are available in kotlin only:
subscribe<E>(delay*, fields*, subscribeLocally*): Flow<E>
- Kotlin
- Java
val subscription = launch {
db.subscribe<Trade>()
.collect { update ->
println("Received a trade update! $update")
}
}
final var subscription = db.subscribe(Trade.class)
.subscribe(update -> {
System.out.println("Received a trade update! " + update);
});
Bulk subscribe
The bulkSubscribe
operation combines a getBulk
and a subscribe
call into a single operation.
This is useful when a class needs to read a full table and then receive updates of changes to the underlying table or view.
This operation supports backwards joins for views. This means that it can receive updates on both the
root table and the joined tables. The view needs to support this in the definition, and it must be enabled on the bulkSubscribe
call.
Example
- Kotlin
- Java
val bulk = db.bulkSubscribe<Trade>()
var bulk = db.bulkSubscribe(Trade.class);
Parameters
bulkSubscribe
supports the following parameters:
Name | Type | Required | Meaning | Default value |
---|---|---|---|---|
Table | Class<E> | ✔️ | The table to subscribe to | |
Index | UniqueEntityIndexReference<E> | ❌ | The index to sort the getBulk operation by | |
fields | Set<String> | ❌ | Only listen to changes on selected fields (filters ModifyResult on fields) | listen to all fields |
delay | Int | ❌ | Batch updates x ms | no batching |
subscribeLocally | Boolean | ❌ | When in a cluster, only listen to local updates | false |
backwardJoins | Boolean | ❌ | subscribe to changes on sub tables (backwards joins) | false |
Java support
There are many optional parameters for bulk subscribe operations. In Kotlin, this works well in a single method due to named parameters and default values.
In Java, however, we provide a fluent API to set optional parameters.
The function bulkSubscribeBuilder
returns a builder.
Optional parameters can be set with a "with-function", e.g. withDelay(...)
.
Once all values have been set, the toFlowable()
function transforms the builder into a Flowable
.
More complex example
- Kotlin
- Java
val bulk = db.bulkSubscribe<Trade>(
fields = setOf("TRADE_DATE"),
delay = 500,
subscribeLocally = false,
index = Trade.ByTypeId,
backwardJoins = true,
)
var bulk = db.bulkSubscribeBuilder(Trade.class)
.withFields(Set.of("TRADE_DATE"))
.withDelay(500)
.withSubscribeLocally(false)
.withIndex(Trade.ByTypeId.Companion)
.withBackwardJoins(true)
.toFlowable();
Range subscribe
rangeSubscribe
is similar to bulkSubscribe
. The rangeSubscribe
operation combines a getRange
and a subscribe
call into a single operation.
This is useful when a class needs to read a range of values from table and then receive updates of changes to the relevant records in the underlying table or view.
This operation supports [backwards joins for views]](../../../database/fields-tables-views/views/views-examples/). This means that it can receive updates on both the root table and the joined tables. The view needs to support this in the definition, and it must be enabled on the bulkSubscribe
call.
Different range options
Like getRange
, rangeSubscribe
supports single-value and interval ranges.
In addition, range subscribe supports both static and dynamic ranges. See below for details.
Note on Java
Like bulkSubscribe
there are many optional parameters for range subscribe operations. In Kotlin, this works well in a single method due to named parameters and default values.
In Java however, we provide a fluent API to set optional parameters.
The functions staticRangeSubscribeBuilder
and dynamicRangeSubscribeBuilder
return a builder.
Optional parameters can be set with a "with-function", e.g. withDelay(...)
.
Once all values have been set, the toFlowable()
function transforms the builder into a Flowable
.
Static ranges
By default, ranges are static; they are set at the start of the subscription and not updated afterwards.
- Kotlin
- Java
// single-value range
val range = db.rangeSubscribe(Trade.byCurrencyId("USD))
// interval range
val range = db.rangeSubscribe(
Trade.byDate(startDate),
Trade.byDate(endDate)
)
// single-value range
var range = db.staticRangeSubscribeBuilder(Trade.byCurrencyId("USD"))
.toFlowable();
// interval range
var range = db.staticRangeSubscribeBuilder(
Trade.byDate(startDate),
Trade.byDate(endDate)
)
.toFlowable();
Dynamic ranges
Additionally, rangeSubscribe
also supports dynamic ranges. These are ranges that are refreshed either at a specified time, or interval.
This can be useful for long-running subscriptions on dates. For example, you might want to keep an eye on trades booked in the last hour. Unless that range is refreshed, it will fall behind.
In the example below, we have two dynamic ranges.
- The first one filters on a specific date, and is updated at midnight.
- The second one has a 2-hour window, and is updated every 5 minutes.
- Kotlin
- Java
// single-value range
db.rangeSubscribe(
fromProvider = { Trade.byDate(DateTime.now().withTimeAtStartOfDay()) },
updateFrequency = PalDuration.At(LocalTime.MIDNIGHT)
)
// interval range
db.rangeSubscribe(
fromProvider = { Trade.byDate(DateTime.now().minusHours(1)) },
toProvider = { Trade.byDate(DateTime.now().plusHours(1)) },
updateFrequency = PalDuration.Every(Duration.ofMinutes(5))
)
// single-value range
var range = db.dynamicRangeSubscribeBuilder(
() -> Trade.byDate(DateTime.now().withTimeAtStartOfDay())
).withUpdateFrequency(new PalDuration.At(LocalTime.MIDNIGHT))
.toFlowable();
// interval range
var range = db.dynamicRangeSubscribeBuilder(
() -> Trade.byDate(DateTime.now().minusHours(1)),
() -> Trade.byDate(DateTime.now().plusHours(1))
).withUpdateFrequency(new PalDuration.Every(Duration.ofMinutes(5)))
.toFlowable();
Parameters
For static ranges rangeSubscribe
supports the parameters below.
In the Java API, the function returns a builder, where optional parameters can be set.
Name | Type | Required | Meaning | Default value |
---|---|---|---|---|
Start index | EntityIndex<E> | ✔️ | The index entry to read from | |
End index | EntityIndex<E> | For interval ranges | The index entry to read to | |
numKeyFields | Int | ❌️ | The number of key fields to take into account for the range | |
updateFrequency | PalDuration | For dynamic ranges | A schedule to update dynamic range boundaries | ? |
delay | Int | ❌ | Group and publish updates every x ms | 200ms |
fields | Set<String> | ❌ | Only listen to changes on selected fields (filters ModifyResult on fields) | listen to all fields |
subscribeLocally | Boolean | ❌ | When in a cluster, only listen to local updates | false |
backwardJoins | Boolean | ❌ | subscribe to changes on sub-tables (backwards joins) | false |
Example
- Kotlin
- Java
val range = asyncEntityDb.rangeSubscribe(
from = Trade.byDate(startDate),
to = Trade.byDate(endDate),
numKeyFields = 1,
delay = 500,
fields = emptySet(),
subscribeLocally = true,
backwardJoins = true
)
val range = db.staticRangeSubscribeBuilder(
Trade.byDate(startDate),
Trade.byDate(endDate)
)
.withNumKeyFields(1)
.withDelay(500)
.withFields(Collections.emptySet())
.withSubscribeLocally(true)
.withBackwardJoins(true)
.toFlowable();
Subscribing to views
There are some subtleties when it comes to subscriptions on views.
By default, when subscribing to updates on a view, only updates on the root table are sent. If you want updates on the joined table or tables, you must [enable a backwardsJoin
on each join.
Inner joins can produce different results. For example, a modification to a table could become a delete or insert on the view. There are details of inner joins in our page on views.
Overall, the better you understand Data Servers, the better your chances of implementing Entity Db without errors. It is well worth looking through the entire section in our documentation.
Update batching
For subscribe operations, by default, Genesis does not publish every update received. Updates are batched and compressed before publishing. This is designed to reduce the load on high-volume systems.
Batching is controlled by the delay
parameter in the subscribe method. This is used to specify the batching interval for updates.
For example, if a trade is inserted, and then modified rapidly in quick succession within the same batching period, there is no requirement for publishing any intermediate states. It is easier and more efficient for the system to publish a single update, so the insert and modify are compressed into a single insert update.
Batching phases
Batching happens in two phases:
- The updates are gathered together.
- At the end of the batching period, they are compressed and the resulting updates are published.
So what does this mean in practice? The batching period covers the whole subscription. It is always set globally, i.e. for an update queue subscription or on a data server query. It is never set at the individual row level.
The goal of this batching is to minimise work, and to publish updates that are accurate, rather than to publish every change.
In this regard it is better to think in terms of a batching interval, rather than a batching period, which looks something like this:
...
*
|---- gathering phase ----| (batching period)
*
*
*
*
*
|---- flushing phase -----| (commence compress and publish)
* compress updates
* publish updates
|---- gathering phase ----| (next batching period)
*
*
...
Batching is non-deterministic
Batching occurs within discrete batching and only within each period.
- If a trade is inserted and then modified within a single batching period, the two will be compressed into a single update.
- If the trade is inserted one millisecond before the end of a batching period it is modified right at the start of the next period, the insert is sent at the end of the first period and the update is sent at the end of the next period - there are two updates.
The point here is that there is no way of determining whether a series of updates and modifies will be compressed or not. It is entirely determined by the timing of the individual inserts and modifies. All we can determine is that the smaller the gap between the insert and modify, the more likely they are to be compressed.
Batching at different levels
Note that batching happens at both the data server and the update-queue level.
- Genesis batches on the queue level to minimise the amount of work the data server has to do. Because we pre-allocate LMDB when the data sever starts, having too many needless updates would eat into that allocation.
- Batching is performed on the data server to minimise the load on the client and the connection.
Out-of-sequence updates on the queue
Updates are published on the update queue once a record has been written to the database, or when a transaction completes.
Within a high-throughput set-up, where updates on the same table are published from multiple sources, updates can be received out of sequence. This means that when listening to updates, in a high-throughput application, it is essential to check record timestamps to detect and handle out-of-sequence updates. For views, this is less of a problem, as Genesis refreshes the record from the database within a transaction to make sure that the published update is valid.
These multiple sources could be:
- Different event handlers on the same node modifying the same table
- The same event handler running on multiple nodes