Skip to main content
Version: 2023.1

Data Pipeline - basics

Where to define

You can configure Data Pipelines in a file called pipeline-name-data-pipeline.kts. This must be located in your application's configuration directory.

A pipeline configuration contains a collection of sources, one or many map functions and one or many sink functions.

How to define a source

Each data pipeline source contains the configuration specifying how to access the data and the associated mapping and sink functionality.

note

Remote databases do not work with Data Pipelines by default; they require some set-up/configuration to enable Change Data Capture functionality.

The currently supported sources are:

Ingress:

  • PostgreSQL
  • MS SQL Server
  • Oracle Enterprise
  • Files
    • CSV
    • XML
    • JSON

Egress:

  • Genesis application database

External database

All databases share common configuration.

ParameterDefault valueSample usageValue typeDescription
sourceNameN/Apostgres("cdc-test")Stringname for the source
hostnameN/Ahostname = "localhost"Stringset the hostname of the remote database
port5432port = 5432Integerset the port on which Database is running
usernameN/Ausername = "postgres"Stringset the database user
passwordN/Apassword = "db-password"Stringset the database user password
databaseNameN/AdatabaseName = "postgres"Stringset the name of the database
pipelines {
postgresSource("cdc-test-psql") {
hostname = "localhost"
port = 5432
username = "postgres"
password = "db-password"
databaseName = "postgres"
}

msSqlSource("cdc-test-mssql") {
...
}

oracleSource("cdc-test-oracle") {
...
}
}

File

Genesis currently supports CSV, JSON and XML file sources. Below, you can see what options are available for each:

CSV

ParameterDefault valueSample usageValue typeDescription
nameN/AcsvSource("csv-cdc-test")Stringname for the source
locationN/Alocation = "file://runtime/testFiles?fileName=IMPORT_TRADES.csv"Stringset the location of the CSV file. See details below
delimiter,delimiter = ','Charset the value delimiter
hasHeadertruehasHeader = trueBooleanset whether the file has headers
headerOverridesnullheaderOverrides = arrayListOf("id", "name")Listset the column names to be used. If the file has a header, it is ignored and the specified names are used
readLazilyfalsereadLazily = trueBooleanset lazy reading
pipelines {
csvSource("csv-cdc-test") {
location = ""

map("mapper-name", TABLE) {

}
}
}

XML and JSON

ParameterDefault valueSample usageValue typeDescription
nameN/AxmlSource("xml-cdc-test")Stringname for the source
locationN/Alocation = "file://runtime/testFiles?fileName=trades_array.json"StringSet the location of the XML or Json file. See details below
Tag NameN/AtagName = "Trade"Stringset the root tag of the XML (does not apply to Json)
rootAt"$.[*]"rootAt = "$.[*]"Stringset the root of the Json/XML tree
pipelines {
xmlSource("xml-cdc-test") {
location = ""

map("mapper-name", TABLE) {

}
}

json("json-cdc-test") {
location = ""

map("mapper-name", TABLE) {

}
}
}

Defining file location

File location denotes where to watch for files. This can be one of the following:

  • Local file system
  • SFTP
  • S3

Local file system

Listening for files in a directory on the local filesystem is as simple as pointing at the directory in the location argument prepended by "file://".

You can define a path as absolute or relative, but we recommend that you specify the absolute filepath; this ensures that you get the right files in the right place.

If you use a relative file path and there is a mistake in the file path, no error message is generated. A new folder is created if the one specified does not exist, for example. The Data Pipeline is based on Apache Camel and you can find further details of this in the Camel documentation.

You can also pass arguments to the URI to change the behaviour of the source.

file:directoryName[?options]

ArgumentDefault valueDescription
deletetruedelete the file after processing
fileNameonly listen for files with the exact name
recursivefalsecheck sub directories

S3

To use S3 as a file source, you need access to an S3-like service such as AWS S3 or Dell ECS.

aws2-s3://bucketNameOrArn[?options]

ArgumentDefault valueDescription
regionthe region in which S3 client needs to work
deleteAfterReadtruedelete objects from S3 after they have been retrieved
destinationBucketdefine the destination bucket where an object must be moved when moveAfterRead is set to true
moveAfterReadfalsemove objects from S3 bucket to a different bucket after they have been retrieved
fileNamefile name of the bucket to get the object from
accessKeyAmazon AWS Access Key
secretKeyAmazon AWS Secret Key

SFTP

sftp:host:port/directoryName[?options]

ArgumentDefault valueDescription
usernameusername to use for login
passwordpassword to use for login
knownHostsUriset the known_hosts file (loaded from classpath by default), so that the SFTP endpoint can do host key verification
privateKeyUriset the private key file (loaded from classpath by default) so that the SFTP endpoint can do private key verification
deletetruedelete the file after processing
fileNameonly listen for files with the exact name
recursivefalsecheck sub directories

Genesis Table

The Genesis Table source attaches a listener to a chosen table in your application. All inserts, modifications and deletions to the table are processed. Records before the pipeline start-up will not be replayed.

ParameterSample usageValue typeDescription
tablegenesisTableSource(TRADE)GPalTabletable to be listened to
keykey = TRADE.BY_IDGPalIndexused for table lookups
pipelines {
genesisTableSource(TABLE) {
key = TABLE.KEY_FIELD

map("mapper-name", TABLE) {

}
}
}

Map functions

A map function is a step between the reading of a source event and the resulting sinking of this event. Data is read, one record at a time and is mapped to or from a Genesis Table Entity.

-For ingress, all data is mapped to a Table Entity before being sent to the sink operation.

  • For egress, data is read from the database and mapped to an intermediary state that is used by the sink operation.

Mapping by column name

If the column name of the source row is the same as the field name, then there is no need for explicit mapping.

If the column name of the source row is not the same as the field name, then it can be specified using the property parameter:

TRADE_SIDE {
property = "side"
}

If the type of the source row is different from the field type, then it will be converted using best effort.

Transform function

There are cases when the source value is not directly mapped to the destination value. For example:

  • type conversion is complex
  • data enrichment
  • data obfuscation
  • calculated value based on input

For such cases, each mapper can declare a transform function. Here are a few examples:

TRADE_DATE {
transform {
DateTime(input.get(tradedAt))
}
}

INSTRUMENT_ID {
transform {
"${input.get(instrument)}-RIC"
}
}

CURRENCY_ID {
transform {
"GBP"
}
}

The transform function has two parameters:

  • entityDb - object to access the underlying Genesis database
  • input - object to access the current source row

For egress pipelines, the input takes the type of the database table entity you are mapping from.

For ingress, the input is strongly typed and null safe. In order to be able to read it, an accessor must be defined. Since the source data is external, you must declare an accessor that can read it in a specific type. Genesis provides the following accessor functions:

  • stringValue(name: String)
  • nullableStringValue(name: String)
  • intValue(name: String)
  • nullableIntValue(name: String)
  • longValue(name: String)
  • nullableLongValue(name: String)
  • doubleValue(name: String)
  • nullableDoubleValue(name: String)
  • booleanValue(name: String)
  • nullableBooleanValue(name: String)

Here is some sample usage:

table {
"public.source_trades" to map("incoming_trades", TRADE) {
val tradeId = stringValue("trd_id")
TRADE {
TRADE_ID {
transform {
"${input.get(tradeId)}-TradeStore"
}
}
}
}
}

Declaring mappers for database sources

For PostgreSQL sources, mappers must be declared per table using the following syntax:

"table-name" to map("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

Multiple tables can be mapped and all mappers are part of the table configuration:

table {
"public.source_trades" to map("incoming_trades", TRADE) {}
"public.source_audit" to map("incoming_audits", AUDIT) {}
}

Mapper for a file source and Genesis tables

For a file source, a single mapper can be declared using the following syntax:

map("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

Conditional map functions

There may be a time when you wish to map based on some business logic. Within your map function, you can define a where block that is evaluated before any mapping operations are started.

map("mapper-name", TABLE_OBJECT) {
where { input.get(stringValue("BuySell")) == "s" }

TABLE_OBJECT {
FIELD {}
...
}
}

You are provided with the following variables within a where block:

ParameterDescription
entityDbobject to access the underlying Genesis database
inputthe data object of the data pipeline trigger event
operationthe operation of the data pipeline trigger event

For ingress, the input is strongly typed and null safe. As with transform functions, an accessor is required. For egress, the input takes the type of the database table entity you are mapping from.

Declaring reusable map functions

You can declare map and sink functions outside the pipeline block and reuse these in your pipeline definition. These map functions look mostly the same, but use a slightly different function name:

val inboundMapper = buildInboundMap("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

val outboundMapper = buildOutboundMap("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

pipelines {
xmlSource("xml-cdc-test") {
location = ""

map(inboundMapper).sink {
...
}
}

genesisTableSource(TABLE_OBJECT) {
key = TABLE_OBJECT.KEY_FIELD

map(outboundMapper).sink(preDefinedSink)
}
}

Sink functions

A sink function is where you define the logic to do something with the data that has been picked up by your data pipelines and successfully mapped. This usually involves storing the data in another data store (database, log, etc.) either directly or after applying some additional logic. transform functions are also available for applying business logic inside your map functions, as shown above.

Ingress

The default behaviour of a ingress data pipeline is to store the mapped Table object in the Genesis database. However, there are times when you might want to delete or modify that entity, or perform other conditional operations that do not interact with the database at all. To achieve this, use the sink function. This has two parameters:

  • entityDb - object to access the underlying Genesis database
  • mappedEntity - the mapped Table object

Recognising that inserting, modifying or deleting mapped entities will be the most commonly used operations, these are already defined under SinkOperations:

  • SinkOperations.INSERT
  • SinkOperations.MODIFY
  • SinkOperations.DELETE

They can be used like this:

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).sink(SinkOperations.DELETE)

}
}

Conditional sink operations

You can use the where function within the map to delete or modify records conditionally, so you don't have to map all rows:

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).apply {
where { input.get(stringValue("side") == "sell") }
}.sink(SinkOperations.DELETE)
}
}

In other cases when you want to act based on the state of the mapped entity, you can declare a custom sink method:

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).sink {
if (mappedEntity.tradeType == "sell") {
entityDb.delete(mappedEntity)
} else {
entityDb.insert(mappedEntity)
}
}
}
}

Egress

An egress Data Pipeline provides a sink operation for SQL-based JDBC databases. For convenience, the platform provides helper classes for Postgres, MS SQL Server and Oracle databases.

In the example below, we define a Postgres configuration object and pass this into our sink declaration. Our sink takes the database configuration and provides us with methods to describe the behaviour we would like for each operation that our pipeline might pick up.

val postgresConfig = postgresConfiguration(
databaseName = "",
hostname = "",
port = 5432,
username = "",
password = ""
)

val postgresSink = sink(postgresConfig) {
onInsert = insertInto("tableName")
onModify = updateTable("tableName")
onDelete = deleteFrom("tableName")
}

pipelines {
genesisTableSource(TABLE_OBJECT) {
key = TABLE_OBJECT.KEY_FIELD

map(someMapper).sink(postgresSink)
}
}

The sink function has three optional settings:

ArgumentDefault valueDescription
onInsertnulloperation to run on Genesis DB insert operation
onModifynulloperation to run on Genesis DB modify operation
onDeletenulloperation to run on Genesis DB delete operation

All operations are executed via SQL and take the order of the mapped field entities as they are found in your map configuration.

onInsert

Takes either insertInto("table name") or callProcedure("stored proc name")

onModify

Takes updateTable("table name")

onDelete

Takes deleteTable("table name")

Conditional sink operations

Egress sinks can use the where function within the map, giving you the ability to map rows conditionally before the sink operation:

pipelines {
genesisTableSource(TABLE_OBJECT) {
key = TABLE_OBJECT.KEY_FIELD

map(someMapper).apply {
where { input.side == SIDE.SELL }
}.sink(postgresSink)
}
}