Skip to main content
Version: Previous

Data Pipeline - basics

Where to define

You can configure Data Pipelines in a file called pipeline-name-data-pipeline.kts. This must be located in your application's configuration directory.

A pipeline configuration contains a collection of sources, one or many map functions and one or many sink functions.

Script properties

The following properties are available for use within the script:

val systemDefinition: SystemDefinitionService
val serviceDiscovery: ServiceDiscovery

How to define a source

Each Data Pipeline source contains the configuration specifying how to access the data and the associated mapping and sink functionality.

note

Remote databases do not work with Data Pipelines by default; they require some set-up/configuration to enable Change Data Capture functionality. More details can be found in the pages on Pipeline set-up.

The currently supported sources are:

Ingress:

  • PostgreSQL
  • MS SQL Server
  • Oracle Enterprise
  • Files
    • CSV
    • XML
    • JSON

Egress:

  • Genesis application database

External database

All databases share a common configuration.

ParameterDefault valueSample usageValue typeDescription
sourceNameN/Apostgres("cdc-test")Stringname for the source
hostnameN/Ahostname = "localhost"Stringset the hostname of the remote Database
port5432port = 5432Integerset the port on which Database is running
usernameN/Ausername = "postgres"Stringset the database user
passwordN/Apassword = "db-password"Stringset the database user password
databaseNameN/AdatabaseName = "postgres"Stringset the name of the database
errorStrategyBestEfforterrorStrategy = StopOnFirstFailure()"ErrorStrategySet the source's error strategy
pipelines {
postgresSource("cdc-test-psql") {
hostname = "localhost"
port = 5432
username = "postgres"
password = "db-password"
databaseName = "postgres"
}

msSqlSource("cdc-test-mssql") {
...
}

oracleSource("cdc-test-oracle") {
...
}
}

File

Genesis currently supports CSV, JSON and XML file sources.

In addition to the configuration properties explained below, each file type also has an optional onCompletion block, which can be used to specify what to do after the file has been processed. The following are in scope:

  • entityDb: a read/write handle on the database
  • result: contains information about the lines that were processed successfully and the lines that failed
  • context: contains pipeline contextual information such as fileName and movePath (if a move path was specified in the location)

Below, you can see what options are available for each:

CSV

ParameterDefault valueSample usageValue typeDescription
nameN/AcsvSource("csv-cdc-test")Stringname for the source
locationN/Alocation = "file://runtime/testFiles?fileName=IMPORT_TRADES.csv"Stringset the location of the CSV file. See details below
delimiter,delimiter = ','Charset the value delimiter
hasHeadertruehasHeader = trueBooleanset whether the file has headers
headerOverridesnullheaderOverrides = arrayListOf("id", "name")Listset the column names to be used. If the file has a header, it is ignored and the specified names are used
readLazilyfalsereadLazily = trueBooleanSet lazy reading
errorStrategyBestEfforterrorStrategy = StopOnFirstFailure()"ErrorStrategyset the source's error strategy
pipelines {
csvSource("csv-cdc-test") {
location = ""

map("mapper-name", TABLE) {

}

onCompletion {
val successfulRows = result.successfulRows
val failedRows = result.failedRows
val existingRecords = entityDb.getBulk(TABLE).toList()
val fileName = context.fileName
// ...
}
}
}

XML and JSON

ParameterDefault valueSample usageValue typeDescription
nameN/AxmlSource("xml-cdc-test")StringName for the source
locationN/Alocation = "file://runtime/testFiles?fileName=trades_array.json"Stringset the location of the XML or Json file. See details below
tagNameN/AtagName = "Trade"Stringset the root tag of the XML (does not apply to Json)
rootAt"$.[*]"rootAt = "$.[*]"Stringset the root of the Json/XML tree
errorStrategyBestEfforterrorStrategy = StopOnFirstFailure()"ErrorStrategyset the source's error strategy
pipelines {
xmlSource("xml-cdc-test") {
location = ""

map("mapper-name", TABLE) {

}

onCompletion {
// ...
}
}

json("json-cdc-test") {
location = ""

map("mapper-name", TABLE) {

}
}
}

Defining file location

File location denotes where to watch for files. This can be one of the following:

  • Local file system
  • SFTP
  • S3

Local file system

Listening for files in a directory on the local filesystem is as simple as pointing at the directory in the location argument prepended by "file://".

You can define a path as absolute or relative, but we recommend that you specify the absolute filepath; this ensures that you get the right files in the right place.

If you use a relative file path and there is a mistake in the file path, no error message is generated. A new folder is created if the one specified does not exist, for example. The Data Pipeline is based on Apache Camel and you can find further details of this in the Camel documentation.

You can also pass arguments to the URI to change the behaviour of the source.

file:directoryName[?options]

ArgumentDefault valueDescription
delete trueShould delete the file after processing
fileName Only listen for files with the exact name
recursive falseShould check sub directories

S3

To use S3 as a file source, you need access to an S3-like service such as AWS S3 or Dell ECS.

aws2-s3://bucketNameOrArn[?options]

ArgumentDefault valueDescription
region the region in which S3 client needs to work
deleteAfterRead truedelete objects from S3 after they have been retrieved
destinationBucket the destination bucket where an object must be moved when moveAfterRead is set to true
moveAfterRead falsemove objects from S3 bucket to a different bucket after they have been retrieved
fileName file name of the bucket to get the object from
accessKey Amazon AWS Access Key
secretKey Amazon AWS Secret Key

SFTP

sftp:host:port/directoryName[?options]

ArgumentDefault valueDescription
username username to use for login
password password to use for login
knownHostsUri set the known_hosts file (loaded from classpath by default), so that the SFTP endpoint can do host key verification
privateKeyUri set the private key file (loaded from classpath by default) so that the SFTP endpoint can do private key verification
delete truedelete the file after processing
fileName only listen for files with the exact name
recursive falsecheck sub directories

Genesis Table

The Genesis Table source attaches a listener to a chosen table in your application.

All inserts, modifications and deletions to the table will be processed. Records from before pipeline start-up will not be replayed.

ParameterSample usageValue typeDescription
tablegenesisTableSource(TRADE)GPalTabletable to be listened to
keykey = TRADE.BY_IDGPalIndexused for table lookups
pipelines {
genesisTableSource(TABLE) {
key = TABLE.KEY_FIELD

map("mapper-name", TABLE) {

}
}
}

Error strategies

You can choose how to handle errors that occur during processing on a per-source basis by setting the errorStrategy property on a source. The default error strategy is Best Effort.

When an error occurs, by default, the error strategy will set the state of the process to ERROR. This can be changed by setting the setProcessOnError property for the strategy.

errorStrategy = BestEffort(setProcessOnError = false)

The error strategies available are described below.

Best Effort

The Best Effort strategy processes all rows and logs any errors that occur.

errorStrategy = BestEffort()

Stop On First Failure

The Stop On First Failure strategy logs an error if it occurs and stops processing.

errorStrategy = StopOnFirstFailure()

All Or Nothing

The All Or Nothing strategy processes all rows in a single transaction using the Genesis Entity DB API. If one row fails, then none of the rows are committed.

This is only relevant to databases that support Entity DB transactions.

errorStrategy = AllOrNothing()

Map functions

A map function is a step between the reading of a source event and the resulting sinking of this event. Data is read, one record at a time and is mapped to or from a Genesis Table Entity.

  • For ingress, all data is mapped to a Table Entity before being sent to the sink operation.
  • For egress, data is read from the database and mapped to an intermediary state that is used by the sink operation.

Mapping by column name

If the column name of the source row is the same as the field name, then there is no need for explicit mapping.

If the column name of the source row is not the same as the field name, then it can be specified using the property parameter:

TRADE_SIDE {
property = "side"
}

If the type of the source row is different from the field type, then it is converted using Best Effort.

Transform function

There are cases when the source value is not directly mapped to the destination value. For example:

  • type conversion is complex
  • data enrichment
  • data obfuscation
  • calculated value based on input

For such cases, each mapper can declare a transform function. Here are a few examples:

TRADE_DATE {
transform {
DateTime(input.get(tradedAt))
}
}

INSTRUMENT_ID {
transform {
"${input.get(instrument)}-RIC"
}
}

CURRENCY_ID {
transform {
"GBP"
}
}

The transform function has two parameters:

  • entityDb - object to access the underlying Genesis database
  • input - object to access the current source row

For egress pipelines, the input takes the type of the database table entity you are mapping from.

For ingress, the input is strongly typed and null safe. In order to be able to read it, an accessor must be defined. Since the source data is external, you must declare an accessor that can read it in a specific type. Genesis provides the following accessor functions:

  • stringValue(name: String)
  • nullableStringValue(name: String)
  • intValue(name: String)
  • nullableIntValue(name: String)
  • longValue(name: String)
  • nullableLongValue(name: String)
  • doubleValue(name: String)
  • nullableDoubleValue(name: String)
  • booleanValue(name: String)
  • nullableBooleanValue(name: String)

Here is some sample usage:

table {
"public.source_trades" to map("incoming_trades", TRADE) {
val tradeId = stringValue("trd_id")
TRADE {
TRADE_ID {
transform {
"${input.get(tradeId)}-TradeStore"
}
}
}
}
}

Declaring mappers for database sources

For PostgreSQL sources, mappers must be declared per table using the following syntax:

"table-name" to map("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

Multiple tables can be mapped and all mappers are part of the table configuration:

table {
"public.source_trades" to map("incoming_trades", TRADE) {}
"public.source_audit" to map("incoming_audits", AUDIT) {}
}

Mapper for a file source and Genesis tables

For a file source, a single mapper can be declared using the following syntax:

map("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

Conditional map functions

There may be a time when you wish to map based on some business logic. Within your map function, you can define a where block that is evaluated before any mapping operations are started.

map("mapper-name", TABLE_OBJECT) {
where { input.get(stringValue("BuySell")) == "s" }

TABLE_OBJECT {
FIELD {}
...
}
}

You are provided with the following variables within a where block:

ParameterDescription
entityDbobject to access the underlying Genesis database
inputthe data object of the Data Pipeline trigger event
operationthe operation of the Data Pipeline trigger event

For ingress, the input is strongly typed and null safe. Like transform functions an accessor is required. For egress, the input takes the type of the database table entity you are mapping from.

Declaring reusable map functions

You can declare map and sink functions outside the pipeline block and reuse these in your pipeline definition. These map functions look mostly the same but use a slightly different function name:

val inboundMapper = buildInboundMap("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

val outboundMapper = buildOutboundMap("mapper-name", TABLE_OBJECT) {
TABLE_OBJECT {
FIELD {}
...
}
}

pipelines {
xmlSource("xml-cdc-test") {
location = ""

map(inboundMapper).sink {
...
}
}

genesisTableSource(TABLE_OBJECT) {
key = TABLE_OBJECT.KEY_FIELD

map(outboundMapper).sink(preDefinedSink)
}
}

Sink functions

A sink function is where you define the logic to do something with the data that has been picked up by your Data Pipelines and successfully mapped. This usually involves storing the data in another data store (database, log, etc.) either directly or after applying some additional logic of your choosing. transform functions are also available for applying business logic inside your map functions as shown above.

Ingress

The default behaviour of an ingress Data Pipeline is to store the mapped Table object in the Genesis database. However, there are times when you might want to delete or modify that entity, or perform other conditional operations that do not interact with the database at all. For those cases, the sink function can be used. The function has two parameters:

  • entityDb - object to access the underlying Genesis database
  • mappedEntity - the mapped Table object

Recognising that inserting, modifying or deleting mapped entities will be the most commonly used operations, those are already defined under SinkOperations:

  • SinkOperations.INSERT
  • SinkOperations.MODIFY
  • SinkOperations.DELETE

They can be used like this:

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).sink(SinkOperations.DELETE)

}
}

Conditional sink operations

You can use the where function within the map to delete or modify records conditionally, so you don't have to map all rows:

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).apply {
where { input.get(stringValue("side") == "sell") }
}.sink(SinkOperations.DELETE)
}
}

In other cases when you want to act based on the state of the mapped entity, you can declare a custom sink method:

pipelines {
postgresSource("cdc-postgres") {

...

map(someMap).sink {
if (mappedEntity.tradeType == "sell") {
entityDb.delete(mappedEntity)
} else {
entityDb.insert(mappedEntity)
}
}
}
}

Egress

An egress Data Pipeline provides a sink operation for SQL-based JDBC databases. For convenience, the platform provides helper classes for Postgres, MS SQL Server and Oracle databases.

In the example below, we define a Postgres configuration object and pass this into our sink declaration. Our sink takes the database configuration and provides with methods to describe the behaviour for each operation that the pipeline can pick up.

val postgresConfig = postgresConfiguration(
databaseName = "",
hostname = "",
port = 5432,
username = "",
password = ""
)

val postgresSink = sink(postgresConfig) {
onInsert = insertInto("tableName")
onModify = updateTable("tableName")
onDelete = deleteFrom("tableName")
}

pipelines {
genesisTableSource(TABLE_OBJECT) {
key = TABLE_OBJECT.KEY_FIELD

map(someMapper).sink(postgresSink)
}
}

The sink function has three optional settings:

ArgumentDefault valueDescription
onInsert nulloperation to run on Genesis DB insert operation
onModify nulloperation to run on Genesis DB modify operation
onDelete nulloperation to run on Genesis DB delete operation

All these operations are executed via SQL and take the order of the mapped field entities as they are found in your map configuration.

onInsert

Takes either insertInto("table name") or callProcedure("stored proc name")

onModify

Takes updateTable("table name")

onDelete

Takes deleteTable("table name")

Conditional sink operations

Egress sinks can use the where function within the map, giving you the ability to map rows conditionally before the sink operation:

pipelines {
genesisTableSource(TABLE_OBJECT) {
key = TABLE_OBJECT.KEY_FIELD

map(someMapper).apply {
where { input.side == SIDE.SELL }
}.sink(postgresSink)
}
}