Skip to main content
Version: Current

Postgres data pipeline - declare source and mappers

In this part of the tutorial we will declare the PostgreSQL server as a data source and map the incoming rows to a Genesis Table object.

Declare Data Source

To define the data pipeline, start by defining the data source. Create a new Kotlin script trades-data-pipeline.kts under datapipeline-sandbox/server/jvm/datapipeline-trades-script-config/src/main/resources/cfg (if you chose another project name don't forget to replace datapipeline-trades with it) and fill in the following.

In this example the hostname needs to be set to the address of the PostgreSQL server.

pipelines {

postgresSource("trade-pipeline") {
hostname = "alpha.west.master"
port = 5432
username = "postgres"
password = systemDefinition.getItem("CdcPostgresPassword").toString()
databaseName = "postgres"

}
}
tip

If you don't have intellisense when editing the data pipeline file check the contents of datapipeline-trades-script-config/build.gradle.kts. Under dependencies it should contain api("global.genesis:genesis-pal-datapipeline"). If that entry is not present add it to the list of dependencies. Once done the file should look like:

dependencies {
...
api("global.genesis:genesis-pal-datapipeline")
..
}

The next step is to declare the mapper for each table. In our case we are only interested in one table - trades.

Declare the Mapper

Enter the code from the next section. Don't worry if it looks confusing now. We'll go through it in a bit.

pipelines {

postgresSource("trade-pipeline") {
hostname = "alpha.west.master"
port = 5432
username = "postgres"
password = systemDefinition.getItem("CdcPostgresPassword").toString()
databaseName = "postgres"

table {
"public.trades" to map("e2e-test", TRADE) {

val tradeId = stringValue("trd_id")
val instrument = stringValue("inst")
val tradedAt = longValue("traded_at")
val side = stringValue("side")
val tradeState = stringValue("trade_state")

TRADE {
TRADE_ID {
//lookup from ALT_TRADE_ID based on trd_id column
transform {
val tradeCode: String = input.get(tradeId)
entityDb.get(AltTradeId.byCode(tradeCode, "TradeStore"))?.tradeId
}
}

INSTRUMENT_ID {
//lookup from ALT_INSTRUMENT_ID based on INSTRUMENT_CODE
transform {
val code: String = input.get(instrument)
val instrumentType = "RIC"
val altInstrumentId: AltInstrumentId? =
entityDb.get(AltInstrumentId.byCode(code, instrumentType))
if (altInstrumentId != null) {
altInstrumentId.instrumentId
} else {
val newInstrumentId = entityDb.insert(Instrument {
//We don't have a name supplied by postgres, so use the code as the name
instrumentName = code
}).record.instrumentId

entityDb.insert(AltInstrumentId {
alternateType = instrumentType
instrumentCode = code
instrumentId = newInstrumentId
})

newInstrumentId
}
}
}

PRICE {
property = "price"
}

QUANTITY {
property = "quantity"
}

SIDE {
transform {
Side.valueOf(input.get(side).toUpperCase())
}
}

TRADE_DATETIME {
transform {
val tradedAtLong = input.get(tradedAt) / 1000L
DateTime(tradedAtLong)
}
}

TRADE_DATE {
transform {
val tradedAtLong = input.get(tradedAt) / 1000L
DateTime(tradedAtLong).withTimeAtStartOfDay()
}
}

ENTERED_BY {
property = "trader"
}

TRADE_STATUS {
transform {
val readState = input.get(tradeState)
when(readState) {
"new" -> TradeStatus.NEW
"mod" -> TradeStatus.MODIFIED
"canc" -> TradeStatus.CANCELLED
else -> throw IllegalStateException("Unknown trade state: $readState")
}
}
}

UNSOLICITED {
property = "unsolicited"
}

PREV_TRADE_ID {
property = "orig_trd_id"
}
}
}
}
}
}

When declaring a mapper, the first thing is to give it a name. This is just to identify it; it doesn't have any functionality associated with it. The second argument is the Table to be mapped to. In our case this is the TRADE table.

Here are the mappings for each Field of the Table. There are three ways to define a Field mapping:

  • when the source property name is the same as the Field name, then there is no need to specify anything
  • when the source property name is different from the Field name, however, the type is the same as the Field type or is one that can be converted out of the box. In this case, only the name has to be mapped and this is done by specifying the property field
  • when the source property name and type are different from the Field name and type, a transform function can be used to calculate the mapped value

Looking at the code above, you can notice that TRADE_ID, INSTRUMENT_ID, SIDE, TRADE_DATETIME, TRADE_DATE and TRADE_STATUS are calculated Fields and all the rest are mapped by name.

The transform function for INSTRUMENT_ID is the most complex one as it actually performs additional operations based on the current state of the Genesis database. On the first three lines in the function we try to map the incoming instrument code to a instrument that's already in the Genesis database. If such an instrument exists then its ID is returned, however, if it's not there a new instrument is created.

Follow the links for more information on mapping and the transform function.

Finally, to ensure your Genesis application picks up your new script, we must declare the runtime.