Skip to main content
Version: Current

Data Pipeline - advanced

Enriching data from the Genesis database

The transform function of the mappers has the parameter entityDb, which can be used to interact with the Genesis database. It provides a CRUD interface and enables you to implement complex use cases, such as enriching data and inserting or updating missing data.

The example below shows mapping a value from the source data to the Genesis database. In this example, if a value is missing, it gets created on the fly.

pipelines {

postgresSource("cdc-test") {
hostname = "localhost"
port = 5432
username = "postgres"
password = "docker"
databaseName = "postgres"

table {
"public.trades" to map("e2e-test", TRADE) {

val instrument = stringValue("inst")

TRADE {
INSTRUMENT_ID {
transform {
val code: String = input.get(instrument) // The instrument code from the source row
val instrumentType = "RIC"
val altInstrumentId: AltInstrumentId? =
entityDb.get(AltInstrumentId.byCode(code, instrumentType)) // Lookup of the instrument id from the database
if (altInstrumentId != null) { //if the instrument id exists return it
altInstrumentId.instrumentCode
} else { //otherwise create a new one and return it's id
val newInstrumentId = entityDb.insert(Instrument {
instrumentName = ""
}).record.instrumentId

entityDb.insert(AltInstrumentId {
alternateType = instrumentType
instrumentCode = code
instrumentId = newInstrumentId
})

newInstrumentId
}
}
}
}
}
}
}
}

System definition properties

System definition variables can be used as part of the source configuration.

pipelines {

postgresSource("cdc-test") {
hostname = POSTGRES_HOST
port = POSTGRES_PORT
username = DB_USERNAME
password = DB_PASSWORD
databaseName = DB_DATABASE_NAME
}
}

Alternatively, you can access systemDefinitions in a programmatic way:

pipelines {

postgresSource("cdc-test") {
hostname = systemDefinition["db_host"].orElse("localhost")
}
}

It is vital to ensure that any system definition variables that are used by the configuration definition are properly defined in your application-system-definition.kts file.

Declaring multiple pipelines

You may declare multiple pipelines in the same .kts file. All sources should be placed within a single pipelines block.

pipelines {
postgresSource("cdc-postgres") {
hostname = "localhost"
port = 5432
username = "postgres"
password = "docker"
databaseName = "postgres"

table {
// table to mapper definition pairs
}
}

csvSource("cdc-csv") {
location = "file://some/directory?fileName=example.xml"

// mapper definition
}
}

Declaring multiple mapping functions

If you would like to perform different mapping operations over the same data source, you can use multiple mappers. You can also optionally use a where clause to conditionally map rows from your data source. Should the where clause be false, no mapping will be performed. These conditional mappers allow you to create more complex and powerful data ingestion pipelines.

For example, if you want to map over a trades source, you could map and transform your data in a different way, depending on the region the trade was made:

pipelines {
csvSource("cdc-csv") {

location = "file://some/directory?fileName=example.xml"

map("EMEA-order", TABLE_OBJECT) {
where { input.get(stringValue("region") == "emea") }

FIELD {}
...
}

map("NAM-order", TABLE_OBJECT) {
where { input.get(stringValue("region") == "nam") }

FIELD {}
...
}
}
}

Auditable sink operations

All database operations are audited if the table is declared as auditable. Each sink operation is then stored to the audit table with the default event type of custom-sink-operation. However, you can change this by passing another type as an argument to the sink function:

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).sink("delete-sell-trades") {
if (mappedEntity.tradeType == "sell") {
entityDb.delete(mappedEntity)
} else {
entityDb.insert(mappedEntity)
}
}
}
}

You can also define multiple sink functions that are then executed separately. This can be useful when defining sinks as variables for later reuse in multiple pipelines.

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).sink(someSink).sink(someOtherSink)
}
}