Skip to main content
Version: Previous

Database interface - Entity Db

The entity db enables you to interact with the database layer; you can use any generated type-safe entities for tables and views. The interface supports the same operations as the generated repositories, but will accept any entity. It supports read operations for views and tables and write operations for tables only.

The entity db differs from the generated repositories in that it can handle any table and most view entities. It differs from RxDb in that all operations are type-safe.

The entity db is available in the Kotlin Event Handler.

It can be injected in Kotlin using AsyncEntityDb and in Java using RxEntityDb.

There are three versions of the entity db:

  • AsyncEntityDb - this API supports Kotlin coroutines
  • SyncEntityDb - this API is blocking
  • RxEntityDb - this API supports RxJava
important

The SyncEntityDb API was introduced in version 7.0 of the Genesis platform.

Which version to use?

The version to use depends on:

  • whether you are writing code in Kotlin or Java
  • whether there is a need to write asynchronous (async) or synchronous (blocking) code
AsynchronousBlocking
KotlinAsyncEntityDbSyncEntityDb
JavaRxEntityDbSyncEntityDb

For more information on the differences between the APIs, see Types of API.

EntityDb
Supports tables✔️
Supports views✔️
Supports any data type✔️
Class to importAsyncEntityDb
RxEntityDb
Type-safe read and write✔️
Type-safe write result✔️
Returns data astable or view entities
Writes data astable or view entities
References indexes asindex entities
Programming interfaceAsync or RxJava
Write (input)Modify Details
Write (output)Write Result
SubscribeRecord Update of entity
Bulk or Range SubscribeBulk of entity
Available in Custom Event Handlers✔️
Available in Custom Request Servers✔️

When referring to indices in the database operations, the database accepts index classes or entity class in combination with index references. For comparison:

Type convention

TypeMeaningExample
EA table or view entityTrade
TA table entityTrade
VA view entityTradeView
EntityIndex<E>An index of ETrade.ById
UniqueEntityIndex<E>A unique index of ETrade.ById
NonUniqueEntityIndex<E>A non unique index of ETrade.ByDate
EntityIndexReference<E>An index reference of ETrade.ById
UniqueEntityIndexReference<E>A unique index reference of ETrade.ById
NonUniqueEntityIndexReference<E>A non unique index reference of ETrade.ByDate
F<E>The full table /view name for ETRADE
Class<E>The class reference for ETrade.class
KClass<E>The Kotlin class reference for ETrade::class

Read operations

get

Get is a simple lookup on the database; it will return a single entity if a match is found, or no records if none is found.

The following overloads exist for get; fields is a Set<String>.

  • get(E, EntityIndexReference<E>, fields) : E?
  • get(E, fields) : E?
  • get(E, EntityIndexReference<E>) : E?
  • get(UniqueEntityIndex<E>, fields) : E?
  • get(UniqueEntityIndex<E>) : E?

Syntax

// we can look up trades by passing in a unique index class:
val trade = db.get(Trade.byId("TRADE_1"))

// a trade object with the primary key set
val trade = db.get(trade)

// a trade object and a reference to unique index
val trade = db.get(trade, Trade.ByTypeId)

// or you can access the index class from the entity
val trade = db.get(trade.byTypeId())

getAll

This takes multiple unique index class instances and returns the type entity type for the record. It takes a List<Pair<String, UniqueEntityIndex<E>>>, where the String is a unique reference to each request.

Overloads

  • getAll(requestDetails: Flow<Pair<String, UI<E>>): Map<String, E?>
  • getAll(requestDetails: List<Pair<String, UI<E>>): Map<String, E?>
val map = db.getAll(listOf("A" to Trade.byId("TRADE_A"), "B" to Trade.byId("TRADE_B")))

val recordA = map["A"]
val recordB = map["B"]

getAllAsList

This operation is similar to the one above, but takes a List<UniqueEntityIndex<E>>, and returns a List<E?>. The results are returned in the order they were requested and will be null if no record was found. The result list is guaranteed to be the same count as the input.

Overloads

  • getAllAsList(Flow<UI<E>>): List<E?>
  • getAllAsList(List<UI<E>>): List<E?>
  • getAllAsList(vararg UI<E>): List<E?>
val list = db.getAllAsList(Trade.byId("TRADE_A"), Trade.byId("TRADE_B"))

val recordA = list[0]
val recordB = list[1]

getBulk

This operation creates a Flowable of the whole table. The records will be sorted in ascending order by the index provided, or by the primary key if none is provided.

There is also the getBulkFromEnd function, which returns records in descending order.

There are also a number of continuation operations, which return the whole table after the provided record. These methods are deprecated and should not be used going forwards.

Overloads

  • getBulk<E>(): Flow<E> (Kotlin only)
  • getBulk([Class<E> / KClass<E>]): Flow<E>
  • getBulk(UR<E>): Flow<E>
  • getBulk(UR<E>, fields): Flow<E>
  • getBulk(UR<E>, E, fields): Flow<E> (continuation) (Deprecated)
  • getBulkFromEnd(UR<E>): Flow<E>
  • getBulkFromEnd(UR<E>, E), E: Flow<E> (continuation) (Deprecated)
  • getBulkFromEnd(UR<E>, E, fields), E: Flow<E> (continuation) (Deprecated)
  • getBulkFromEnd(UR<E>, fields): Flow<E>

Syntax

// we can pass in Trade as a type parameter
val flow = db.getBulk<Trade>()
// we can pass in the TRADE object
val flow = db.getBulk(TRADE)
// or we can pass in an index reference
val flow = db.getBulk(Trade.ByTypeId)

getRange

getRange is an operation that can return multiple records from the database.

There are two types of getRange operation:

  • Single-value range, e.g. trades with CURRENCY set to USD
  • Interval range, e.g. all trades with TRADE_DATE between 01-Jul-2024 and 01-Aug-2024.

Unlike the get operation, the getRange operation can always return multiple records.

We pass index entities into getRange. When using an interval range, both index entities should be of the same type. in the example below, this is Trade.byDate.

// single value
val range = db.getRange(Trade.byCurrency("USD"))

// interval
val range = db.getRange(
Trade.byDate(myStartDate),
Trade.ByDate(myEndDate)
)

Table entities provide methods for building index entities:

// single value
val range = getRange(myUsdTrade.byCurrency())

// interval
val range = db.getRange(myStartTrade.byDate(), myEndTrade.byDate())

numKeyFields property

This allows users to query on part of the index. When using the getRange operation, you can tell the database how many fields in the index to use.

By default, all fields are used, but you can override this.

note

Order matters here; you can get a range on just field 1, field 1 and field 2, but not on field 2 and field 1.

For example, if on the ORDER table, we had an index on COUNTERPARTY and INSTRUMENT_ID, we could:

  • perform a single-value getRange operation on COUNTERPARTY. For this, set numKeyFields to 1.
  • perform a single-value getRange operation on both COUNTERPARTY and INSTRUMENT. For this, set numKeyFields to 2.

For example:

// we could look up all rows for both COUNTERPARTY and INSTRUMENT:
val range = db.getRange(myOrder.byCounterpartyAndInstrument())
// which would be equivalent to
val range = db.getRange(myOrder.byCounterpartyAndInstrument(), 2)
// and also
val range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A", "VOD"))

// or we could look pu by just COUNTERPARTY
val range = db.getRange(myOrder.byCounterpartyAndInstrument(), 1)
// which would be equivalent to
val range = db.getRange(Order.byCounterpartyAndInstrument("CPTY_A"))
info

numKeyFields is required in the following instances:

  • when passing in a table entity and an index reference
  • when passing in an index entity that refers to a unique index

Fields

When using a getRange operation, you can provide a Set<String> with the field names you want to select. This will only be used as a hint; non-nullable fields will always be returned from the database.

val range = getRange(
myUsdTrade.byCurrency(),
fields = setOf("PRICE", "QUANTITY")
)

Reversed range lookups

In addition to getRange there is also the getRangeFromEnd method. This works in the same way as getRange. However, rows are returned in the reverse order.

Parameters when using index entities

NameTypeRequiredMeaningDefault value
IndexEntityIndex<E>✔️, For single value rangesThe index entry to read
Start indexEntityIndex<E>✔️, For interval rangesThe index entry to read from
End indexEntityIndex<E>✔️, For interval rangesThe index entry to read to
numKeyFieldsIntThe number of key fields to take into accountAll fields in the index
fieldsSet<String>The names of the fields to returnreturn all fields

Write operations

All write operations have versions that take a single entity and versions that take multiple entities.

The return values for these operations are type-safe (see details below), provided all entries are of the same type. For example, when inserting multiple Trade entries, the return type will be List<InsertResult<Trade>>. Different entity types can be inserted in the same operation; however, the return type will be List<InsertResult<Entity>>. Also, modify operations only accept table entities.

Default and generated values

When writing a record to the database, typically all non-null properties should be set on the entity. An entity property becomes non-nullable if:

  • it has a default value
  • it is generated by the database, i.e. sequence or auto increment fields
  • the column is included in an index or is specifically declared not null in the schema

Default values

Properties with a default value will have the value set by default, unless set explicitly in the builder.

Generated properties

Generated properties are left in an indeterminate state if not set in the builder. When writing to the database, this indeterminate state is set in the return value. Trying to read the property while it is in this state results in an IllegalArgumentException.

Each generated property has only two read-only associated properties to access these properties in a safe manner.

  • is[FieldName]Generated boolean property
  • [fieldName]OrNull property

For example:

// tradeId is generated
trade.tradeId // will cause an exception if not initialised
trade.tradeIdOrNull // will return the tradeId if set, or else null
trade.isTradeIdInitialised // will return true if set

Columns in indices or are not null explicitly

Columns in indices or declared not null should always be set in a builder, unless it has a default value or is a generated column. In all other instances, a NullPointerException will be thrown when building the object.

Insert

This inserts a new record into the database. The insert function takes a single table entity. The insertAll function takes multiple records, and has several overloads:

Overloads

  • insert(E): InsertResult<E>
  • insertAll(vararg E): List<InsertResult<E>>
  • insertAll(List<E>): List<InsertResult<E>>
  • insertAll(Flow<E>): List<InsertResult<E>>

Modify

This tries to modify a record in the database. If the record does not exist, an error is thrown.

Overloads

  • modify(EntityModifyDetails<E>): ModifyResult<E>
  • modify(E): ModifyResult<E>
  • modify(E, UniqueEntityIndexReference<E>): ModifyResult<E>
  • modifyAll(vararg E): List<ModifyResult<E>>
  • modifyAll(vararg EntityModifyDetails<E>): List<ModifyResult<E>>
  • modifyAll(List<EntityModifyDetails<E>>): List<ModifyResult<E>>
  • modifyAll(Flow<EntityModifyDetails<E>>): List<ModifyResult<E>>

Upsert

This tries to modify a record in the database. If the record does not exist, the record is inserted instead.

Overloads

  • upsert(EntityModifyDetails<E>): UpsertResult<E>
  • upsert(E): UpsertResult<E>
  • upsertAll(vararg E): List<UpsertResult<E>>
  • upsertAll(vararg EntityModifyDetails<E>): List<UpsertResult<E>>
  • upsertAll(List<EntityModifyDetails<E>>): List<UpsertResult<E>>
  • upsertAll(Flow<EntityModifyDetails<E>>): List<UpsertResult<E>>

Delete

This tries to delete a record.

Overloads

  • delete(E): DeleteResult<E>
  • delete(UniqueEntityIndex<E>): DeleteResult<E>
  • deleteAll(vararg E): List<DeleteResult<E>>
  • deleteAll(vararg UniqueEntityIndex<E>): List<DeleteResult<E>>
  • deleteAll(List<E>): List<DeleteResult<E>>
  • deleteAll(Flow<E>): List<DeleteResult<E>>

Update

The update operation is a condensed syntax for modifying data in the database. It works by providing a scope and a transformation. The scope could be one of:

  • updateBy - a unique index
  • updateRangeBy - an index range
  • updateAll - a whole table

The transformation is a lambda, where the rows that are in scope are provided one by one. The rows are provided as in the database, and can be modified in place, with the changes applied to the database. All update calls use the safeWriteTransaction and are transactional if the database supports it.

  • In the async entity db, the lambda will be of type E.() -> Unit. The entity will be the receiver and in the lambda, this will refer to the row, which should be modified in place.

  • In the rx entity db, the lambda will be of type Consumer<E>. The lambda will have a single parameter, the entity. Similar to the async version, the row should be modified in place.

In both cases, the full record will be provided, and values can be read as well as updated. The operations return List<ModifyResult<E>> for updateAll and updateRangeBy methods and ModifyResult<E> for the updateBy operation.

For example:

db.updateBy(Trade.byId("xxxxx")) {
price = 15.0
}

db.updateByRange(Trade.byOrderId("xxxx")) {
orderStatus = OrderStatus.CANCELLED
}

db.updateByRange(Trade.byOrderId("xxxx"), Trade.byOrderId("yyyy") {
orderStatus = OrderStatus.CANCELLED
}

db.updateAll<Trade> {
orderStatus = OrderStatus.CANCELLED
}

Recover

The recover operation enables you to insert a document into the database using the record's preset timestamp and ID.

The following recover operations are supported:

  • recover
  • recoverAll
danger

This API must be used with caution. Integrity checks are skipped, so it can leave your Genesis application in a poor state if used incorrectly. Record IDs and timestamps are assumed to be unique.

Transactions

If the underlying database supports transactions, then the entity db provides type-safe access to these. A read transaction will support the same read operations as the entity db, and a write transaction will support the same read and write operations. If a write transaction fails, all operations will be reverted. Subscribe operations are not supported within transactions.

Transactions are supported on database layers: FoundationDb, MS SQL, Oracle and Postgresql.

When code is expected to run on multiple database types, transactions should be used when available. You can use safeReadTransaction and safeWriteTransaction. These run operations in the block in a single transaction, if supported.

There is a distinction between using Kotlin and Java here.

  • When using Kotlin, the transaction is the receiver in the readTransaction call. This means that within the block, this refers to the transaction.
  • When using Java, the transaction is the first parameter of the lambda.

Read transactions

Read transactions ensure that all read operations are consistent. Intervening writes will not affect reads within the transaction. The return value in the transaction will also be returned from the transaction. For the RxEntityDb, it will be a Single<*>.

val orderTrade = db.readTransaction {
val trade = get(Trade.byId("TR_123"))
val order = get(Order.byId(trade.orderId))
buildOrderTrade(order, trade)
}

Write transactions

Write transactions ensure all read and write operations are consistent. If any exception reaches the transaction level, all writes are rolled back. The writeTransaction will return a Pair<T, List<EntityWriteResult<*>>>, where T is the value returned in the writeTransaction lambda.

val (orderId, writeResults) = db.writeTransaction {
insert(trade)
val orderInsert = insert(order)
orderInsert.record.orderId
}

Subscribe operations

Subscribe

The subscribe operation starts a database listener that receives updates to tables or views.

Subscribe parameters

Subscribe supports the following parameters:

NameTypeRequiredMeaningDefault value
Table nameString✔️The table to subscribe ton/a
fieldsSet<String>Only listen to changes on selected fieldslisten to all fields
delayIntGroup and publish updates every x msno grouping
subscribeLocallyBooleanWhen in a cluster, only listen to local updatesfalse

Overloads

The rx entity db takes a Class<E>, whereas the async entity db takes a KClass<E>. Parameters marked with an asterisk(*) are optional.

  1. subscribe([KClass<E> / Class<E>], delay*, fields*, subscribeLocally*): Flow<E>

These functions are available in kotlin only:

  1. subscribe<E>(delay*, fields*, subscribeLocally*): Flow<E>
val subscription = launch {
db.subscribe<Trade>()
.collect { update ->
println("Received a trade update! $update")
}
}

Bulk subscribe

The bulkSubscribe operation combines a getBulk and a subscribe call into a single operation. This is useful when a class needs to read a full table and then receive updates of changes to the underlying table or view.

This operation supports backwards joins for views. This means that it can receive updates on both the root table and the joined tables. The view needs to support this in the definition, and it must be enabled on the bulkSubscribe call.

Example

val bulk = db.bulkSubscribe<Trade>()

Parameters

bulkSubscribe supports the following parameters:

NameTypeRequiredMeaningDefault value
TableClass<E>✔️The table to subscribe to
IndexUniqueEntityIndexReference<E>The index to sort the getBulk operation by
fieldsSet<String>Only listen to changes on selected fields (filters ModifyResult on fields)listen to all fields
delayIntBatch updates x msno batching
subscribeLocallyBooleanWhen in a cluster, only listen to local updatesfalse
backwardJoinsBooleansubscribe to changes on sub tables (backwards joins)false

Java support

There are many optional parameters for bulk subscribe operations. In Kotlin, this works well in a single method due to named parameters and default values.

In Java, however, we provide a fluent API to set optional parameters.

The function bulkSubscribeBuilder returns a builder.

Optional parameters can be set with a "with-function", e.g. withDelay(...).

Once all values have been set, the toFlowable() function transforms the builder into a Flowable.

More complex example

val bulk = db.bulkSubscribe<Trade>(
fields = setOf("TRADE_DATE"),
delay = 500,
subscribeLocally = false,
index = Trade.ByTypeId,
backwardJoins = true,
)

Range subscribe

rangeSubscribe is similar to bulkSubscribe. The rangeSubscribe operation combines a getRange and a subscribe call into a single operation. This is useful when a class needs to read a range of values from table and then receive updates of changes to the relevant records in the underlying table or view.

This operation supports [backwards joins for views]](../../../database/fields-tables-views/views/views-examples/). This means that it can receive updates on both the root table and the joined tables. The view needs to support this in the definition, and it must be enabled on the bulkSubscribe call.

Different range options

Like getRange, rangeSubscribe supports single-value and interval ranges. In addition, range subscribe supports both static and dynamic ranges. See below for details.

Note on Java

Like bulkSubscribe there are many optional parameters for range subscribe operations. In Kotlin, this works well in a single method due to named parameters and default values.

In Java however, we provide a fluent API to set optional parameters.

The functions staticRangeSubscribeBuilder and dynamicRangeSubscribeBuilder return a builder.

Optional parameters can be set with a "with-function", e.g. withDelay(...).

Once all values have been set, the toFlowable() function transforms the builder into a Flowable.

Static ranges

By default, ranges are static; they are set at the start of the subscription and not updated afterwards.

// single-value range
val range = db.rangeSubscribe(Trade.byCurrencyId("USD))

// interval range
val range = db.rangeSubscribe(
Trade.byDate(startDate),
Trade.byDate(endDate)
)

Dynamic ranges

Additionally, rangeSubscribe also supports dynamic ranges. These are ranges that are refreshed either at a specified time, or interval.

This can be useful for long-running subscriptions on dates. For example, you might want to keep an eye on trades booked in the last hour. Unless that range is refreshed, it will fall behind.

In the example below, we have two dynamic ranges.

  • The first one filters on a specific date, and is updated at midnight.
  • The second one has a 2-hour window, and is updated every 5 minutes.
// single-value range
db.rangeSubscribe(
fromProvider = { Trade.byDate(DateTime.now().withTimeAtStartOfDay()) },
updateFrequency = PalDuration.At(LocalTime.MIDNIGHT)
)
// interval range
db.rangeSubscribe(
fromProvider = { Trade.byDate(DateTime.now().minusHours(1)) },
toProvider = { Trade.byDate(DateTime.now().plusHours(1)) },
updateFrequency = PalDuration.Every(Duration.ofMinutes(5))
)

Parameters

For static ranges rangeSubscribe supports the parameters below.

In the Java API, the function returns a builder, where optional parameters can be set.

NameTypeRequiredMeaningDefault value
Start indexEntityIndex<E>✔️The index entry to read from
End indexEntityIndex<E>For interval rangesThe index entry to read to
numKeyFieldsInt❌️The number of key fields to take into account for the range
updateFrequencyPalDurationFor dynamic rangesA schedule to update dynamic range boundaries?
delayIntGroup and publish updates every x ms200ms
fieldsSet<String>Only listen to changes on selected fields (filters ModifyResult on fields)listen to all fields
subscribeLocallyBooleanWhen in a cluster, only listen to local updatesfalse
backwardJoinsBooleansubscribe to changes on sub-tables (backwards joins)false

Example

val range = asyncEntityDb.rangeSubscribe(
from = Trade.byDate(startDate),
to = Trade.byDate(endDate),
numKeyFields = 1,
delay = 500,
fields = emptySet(),
subscribeLocally = true,
backwardJoins = true
)

Subscribing to views

There are some subtleties when it comes to subscriptions on views.

By default, when subscribing to updates on a view, only updates on the root table are sent. If you want updates on the joined table or tables, you must [enable a backwardsJoin on each join.

Inner joins can produce different results. For example, a modification to a table could become a delete or insert on the view. There are details of inner joins in our page on views.

Overall, the better you understand Data Servers, the better your chances of implementing Entity Db without errors. It is well worth looking through the entire section in our documentation.

Update batching

For subscribe operations, by default, Genesis does not publish every update received. Updates are batched and compressed before publishing. This is designed to reduce the load on high-volume systems.

Batching is controlled by the delay parameter in the subscribe method. This is used to specify the batching interval for updates.

For example, if a trade is inserted, and then modified rapidly in quick succession within the same batching period, there is no requirement for publishing any intermediate states. It is easier and more efficient for the system to publish a single update, so the insert and modify are compressed into a single insert update.

Batching phases

Batching happens in two phases:

  1. The updates are gathered together.
  2. At the end of the batching period, they are compressed and the resulting updates are published.

So what does this mean in practice? The batching period covers the whole subscription. It is always set globally, i.e. for an update queue subscription or on a data server query. It is never set at the individual row level.

The goal of this batching is to minimise work, and to publish updates that are accurate, rather than to publish every change.

In this regard it is better to think in terms of a batching interval, rather than a batching period, which looks something like this:

...
*
|---- gathering phase ----| (batching period)
*
*
*
*
*
|---- flushing phase -----| (commence compress and publish)
* compress updates
* publish updates
|---- gathering phase ----| (next batching period)
*
*
...

Batching is non-deterministic

Batching occurs within discrete batching and only within each period.

  • If a trade is inserted and then modified within a single batching period, the two will be compressed into a single update.
  • If the trade is inserted one millisecond before the end of a batching period it is modified right at the start of the next period, the insert is sent at the end of the first period and the update is sent at the end of the next period - there are two updates.

The point here is that there is no way of determining whether a series of updates and modifies will be compressed or not. It is entirely determined by the timing of the individual inserts and modifies. All we can determine is that the smaller the gap between the insert and modify, the more likely they are to be compressed.

Batching at different levels

Note that batching happens at both the data server and the update-queue level.

  • Genesis batches on the queue level to minimise the amount of work the data server has to do. Because we pre-allocate LMDB when the data sever starts, having too many needless updates would eat into that allocation.
  • Batching is performed on the data server to minimise the load on the client and the connection.

Out-of-sequence updates on the queue

Updates are published on the update queue once a record has been written to the database, or when a transaction completes.

Within a high-throughput set-up, where updates on the same table are published from multiple sources, updates can be received out of sequence. This means that when listening to updates, in a high-throughput application, it is essential to check record timestamps to detect and handle out-of-sequence updates. For views, this is less of a problem, as Genesis refreshes the record from the database within a transaction to make sure that the published update is valid.

These multiple sources could be:

  • Different event handlers on the same node modifying the same table
  • The same event handler running on multiple nodes