Skip to main content
Version: Current

Data Pipelines - Implementations

Sources

Camel File Source (Real-time)

This is a real-time acknowledgement source that uses Camel to poll a location periodically. It returns a flow of type PipelineFile, which includes the file name, file path and input stream of its contents.

The next element is only emitted once acknowledgement is given that the previous one has been successfully processed.

There are two ways to create a camel source. One is by passing the location directly, the second is by passing in a route builder.

Passing the the location string

Passing in the location directly will automatically create a route builder and add it to the internal pipeline processors:

    pipeline("TEST_CAMEL_ROUTE_PIPELINE") {
source(
camelSource {
location = "file://src/test/resources/camel-source-test?noop=true"
}
).sink(filesSink)
}

Passing the route builder

First of all, lets explain what a route builder is. In this context, it is Apache Camels API for defining where and how to read messages. If you are already familar with Camel APIs, you should feel right at home here. If not, you can read more about them here.

Passing in a route builder gives you the most flexability and control over your routes logic, allowing for more advanced configurations, such as specifying processors and idempotency logic. Also, within the routeHandler block you can always access the dataPipelineProcessor variable, this is a custom processor we create that will emit the messages from the data pipeline source into your genesis pipeline.

    pipeline("TEST_CAMEL_ROUTE_PIPELINE") {
source(
camelFileSource {
routeHandler {
from("file://src/test/resources/camel-source-test?noop=true&idempotent=true")
.idempotentConsumer(
header("CamelFileName"),
FileIdempotentRepository.fileIdempotentRepository(File("factInPathStrConsumerRepo"), 300000, 15000000)
).process(dataPipelineProcessor)
}
}
).sink(filesSink)
}

Database Batch Poll Source (Real-time)

This is a real-time acknowledgement source that returns a stream of updates via a batch poll on a particular database, dependent on an acknowledgement having been received of the previous element.

If no data is received, the source waits for a delay before attempting to poll again. This source implementation uses the PersistenceManager to persist the latest index using the provided lambda; by default the DbPersistenceManager is used to manage the state of this source. See Persistence Manager for more details.

The type of object that you would like this source to return must be given as part of the source definition, as shown in the example below.

When configuring the db batch poller to use as part of your pipeline you must provide the following as part of the configuration:

  • source (required): this is what the PersistenceManager will use to find the last persisted record and should be unique across all pipelines
  • index (required): this is the table index used to order the records and get the value for the last persisted record for recovery - if single field index is provided, ensure this value is either the generated TIMESTAMP field in the table or of type Long. Else, see below.
  • buildIndex (required for multi-field index only): if you would like to use an index that contains multiple fields, you need to provide a lambda that converts from a Long value (the last persisted value) to your EntityIndex object using this value - you must ensure that the Long value which you would like to be used as the persistence index needs to be the final value inside your index object
  • dbLookupDelayMs (optional): this is the delay time that the poller will wait for in milliseconds after not receiving any new data from the database - the default is 200ms, and is not required as part of the provided config

Here is a GPAL example of a Batch Poll source definition:

// Single Field Index Example
val batchPollSource = dbBatchQuery<TestIndexTable> {
source = "SINGLE_INDEX_SOURCE"
index = TestIndexTable.ByIndex
}

// Multiple Field Index Example
val batchPollMultiIndexSource = dbBatchQuery<TestMultiIndexTable> {
source = "MULTI_INDEX_SOURCE"
index = TestMultiIndexTable.ByNameIndex
buildIndex { value -> TestMultiIndexTable.ByNameIndex("NAME", value) }
}

pipelines {
pipeline("TEST_BATCH_POLL_PIPELINE") {
source(batchPollSource)
.sink(testSink)
}
}

Abstract Programmatic Source (Real-time)

This is an abstract implementation of a real-time acknowledgement source that additionally has a send method which can be used by any custom code to provide data to the pipeline.

This source is dependent on the calling code being a part of the same process as the pipeline using this source - ensure this is done by updating your process definition. See processes

Here is an example of using this source to provide Trade objects:

object ProgrammaticTradeSource : AbstractProgrammaticSource<Trade>()

Here is an eventHandler that sends to this ProgrammaticTradeSource:

eventHandler<Trade>("TRADE_INSERT", transactional = true) {
onCommit { event ->
val trade = event.details
ProgrammaticTradeSource.send(trade)
ack()
}
}

The resulting pipeline:

pipelines {
pipeline("TEST_PIPELINE") {
source(ProgrammaticTradeSource)
.sink(testSink)
}
}

HTTP Source (batch)

There are two sources that you can use to make HTTP requests. These use the Genesis HTTP Client.

  • httpSource - only requires the type of the response
  • typedHttpSource - requires the type of the response as well as the request object being used (which should be provided in the execute method when triggering this pipeline)

You need to provide a request builder, which contains information about the HTTP request you would like to send. The default HttpMethod is GET.

When supplying this builder, you have access to:

  • what is being provided to the execute method
  • the context parameters map
  • (typedHttpSource only) the request object being used

Example of httpSource:

val source = httpSource<Trade> {
requestBuilder {
url = "https://api.example.com/get-trade?tradeId=${contextParameters["tradeId"]}"
}
}

pipelines {
pipeline("TEST_PIPELINE") {
source(source)
.sink(testSink)
}
}

Triggering the above pipeline in an eventHandler:

val pipelineManager = inject<PipelineManager>()

eventHandler<Trade>(name = "TEST_PIPELINE_START") {
onCommit { event ->
val details = event.details

val pipeline = pipelineManager.getBatchPipeline("TEST_PIPELINE")
// provide values in the context map that you have used in the request builder when setting up the pipeline
pipeline?.execute(mapOf("tradeId" to details.tradeId))
ack()
}
}

Example of typedHttpSource:

val source = httpSource<Trade, List<Trade>> {
requestBuilder {
// using the request which will be provided in the execute call
url = "https://api.example.com/all-trades?tradeId=${request.tradeId}"
}
}

pipelines {
pipeline("TEST_PIPELINE") {
source(source)
.sink(testSink)
}
}

Triggering the above pipeline in an eventHandler:

val pipelineManager = inject<PipelineManager>()

eventHandler<Trade>(name = "TEST_PIPELINE_START") {
onCommit { event ->
val trade = event.details

val pipeline = pipelineManager.getBatchPipeline("TEST_PIPELINE")
// provide the request as shown below in the context map that you have used in the request builder when setting up the pipeline
pipeline?.execute(trade, emptyMap())
ack()
}
}

Kafka Source (Real-time)

This source is available through the kafka-genesis module. To use this, you must add the following dependency to your application:

implementation("global.genesis:kafka-genesis:${properties["genesisIntegrationVersion"]}")

This is a real-time acknowledgement source that creates a consumer and polls a Kafka topic. It returns a flow of ConsumerRecords and the consumer will only commit to kafka once acknowledgement has been received of the previous batch.

The types specified when instantiating the source refer to the type of the Key and Value for the record being read from the kafka topic.

When you configure the kafka source to use as part of your pipeline, you must provide the following as part of the configuration:

  • bootstrapServers (required): these are the bootstrap servers for your kafka instance
  • groupId (required): this is the group id you would like to specify for the consumer
  • keyDeserializer (required): this is the Deserializer for the key of your consumer record - this Deserializer must be for an object that matches the key type defined in the source instantiation
  • valueDeserializer (required): this is the Deserializer for the value of your consumer record - this Deserializer must be for an object that matches the value type defined in the source instantiation
  • topic (required): this is the kafka topic you would like to consume from
  • securityProtocol (optional): this is the security protocol you would like to use, the default is "SSL"
  • maxPollRecords (optional): this is the maximum number of records returned when the consumer polls, the default is 500
  • pollTimeoutMs (optional): this is the amount of time in milliseconds the consumer will wait when receiving no records before polling again, the default is 200
  • additionalConfig (optional): if you wish to set additional config for your source that has not been specified above, you can do so using this map (see example below)

Here is a GPAL example of a Kafka source definition:

val source = kafkaSource<String, Int> {
bootstrapServers = "localhost"
groupId = "my-group"
keyDeserializer = StringDeserializer()
valueDeserializer = IntegerDeserializer()
topic = "my-topic"
securityProtocol = "SSL"
additionalConfig = mapOf("auto.offset.reset" to "earliest")
}

pipelines {
pipeline("TEST_KAFKA_PIPELINE") {
source(source)
.sink(testSink)
}
}

JMS Source (Real-time)

This source is available through the jms-genesis module. To use it, add the following dependency to your application:

implementation("global.genesis:jms-genesis:${properties["genesisIntegrationVersion"]}")

This is a real-time acknowledgement source that creates a JMS consumer and polls a message queue. It returns a flow of Message objects. The consumer only acknowledges the message has been received once the pipeline confirms it has been processed by the sink successfully.

If the source is unable to connect to the message queue, the process running the pipeline will enter a WARNING state.

The message displayed next to this state includes a monitor name, which you can configure (as shown below); the default name generated is based on the queue name. If you provide your own monitor name, ensure this is unique across all JMS Sources and Sinks for that process.

When you configure the JMS source to use with your pipeline, you must provide the following as part of the configuration:

  • factory (required): this is any instance of QueueConnectionFactory e.g; MQQueueConnectionFactory (IBM MQ), ActiveMQConnectionFactory (Active MQ) etc
  • queueName (required): the name of the message queue you would like to consume from
  • username (required): the username used to create the queue connection
  • password (required): the password used to create the queue connection
  • retryDelayMs (optional): how long the source should wait before attempting to connect to the queue again after failure to do so
  • pollTimeoutMs (optional): how long the consumer should wait in milliseconds before attempting to poll the message queue again after no data has been received; the default is 200
  • monitorName (optional): name to be displayed when the process state is set to WARNING; the default name has the format: JMSSourceMonitor-0-QUEUE_NAME

Here is a GPAL example of a JMS source definition:

val source = jmsSource {
queueName = systemDefinition.getItem("QUEUE_NAME").toString()
factory = MQQueueConnectionFactory().apply {
hostName = systemDefinition.getItem("HOST_NAME").toString()
port = systemDefinition.getItem("PORT").toString().toInt()
queueManager = systemDefinition.getItem("QUEUE_MANAGER").toString()
transportType = systemDefinition.getItem("TRANSPORT_TYPE").toString().toInt()
channel = systemDefinition.getItem("CHANNEL").toString()
}
username = systemDefinition.getItem("USER_NAME").toString()
password = systemDefinition.getItem("PASSWORD").toString()
}
pipelines {
pipeline("TEST_JMS_PIPELINE") {
source(source)
.sink(testSink)
}
}

Operators

CSV Decoder

This is a split Operator that decodes from an InputStream (from a PipelineFile object) to a CsvRow object that contains:

  • the line number for each CSV row
  • an object of a user-defined type representing the data of each CSV line; by default, this is of type Map<String, String>

GPAL functions available are:

  • csvRawDecoder() to deserialise to Map<String, String>
  • csvDecoder<T>() to deserialise to the specified class, represented by T here.

Here are some GPAL examples of a CSV Decoder:

// Decoding to Map<String, String>
pipeline("TEST_PIPELINE_RAW_DECODER") {
source(camelSource {
location = "file://src/test/resources/camel-source-test?noop=true&idempotent=true"
}).split(csvRawDecoder()) // Default to deserialising to Map<String, String>
.map { input: CsvRow<Map<String, String>> ->
DbOperation.Insert(
System {
this.systemKey = input.data["SYSTEM_KEY"]!!
this.systemValue = input.data["SYSTEM_VALUE"]!!
}
)
}
.sink(txDbSink())
}

// Decoding to another class
pipeline("TEST_PIPELINE_DECODER") {
source(camelSource {
location = "file://src/test/resources/camel-source-test?noop=true&idempotent=true"
}).split(csvDecoder<System>()) // Deserialise to System class
.map { input: CsvRow<System> ->
DbOperation.Insert(input.data)
}
.sink(txDbSink())
}

Sinks

Database sink

This is a sink into the Genesis database, which takes a DbOperation object as input with an object extending TableEntity i.e. INPUT : DbOperation<out TableEntity>.

DbOperation has 4 subtypes: Insert, Modify, Upsert and Delete.

GPAL functions available are:

  • dbSink() for non-transactional
  • txDbSink() for transactional

The GPAL example below contains both a transactional sink and a non-transactional sink:

// Transactional db sink
pipeline("TEST_DB_SINK") {
source(dbBulkSubscribe<Trade>())
.map {
when (it) {
Bulk.Prime.Completed -> null
is Bulk.Prime.Record -> null
is Bulk.Update.Delete -> DbOperation.Delete(it.record)
is Bulk.Update.Insert -> DbOperation.Insert(it.record)
is Bulk.Update.Modify -> DbOperation.Modify(it.record)
}
}
.sink(txDbSink())
}

// Non-transactional db sink
pipeline("TEST_DB_SINK") {
source(dbBulkSubscribe<Trade>())
.map {
when (it) {
Bulk.Prime.Completed -> null
is Bulk.Prime.Record -> null
is Bulk.Update.Delete -> DbOperation.Delete(it.record)
is Bulk.Update.Insert -> DbOperation.Insert(it.record)
is Bulk.Update.Modify -> DbOperation.Modify(it.record)
}
}
.sink(dbSink())
}

Kafka Sink

This sink is available through the kafka-genesis module. To use this, you must add the following dependency to your application:

implementation("global.genesis:kafka-genesis:${properties["genesisIntegrationVersion"]}")

This is a sink that creates a producer and sends ProducerRecords to a kafka topic. You can either send a single ProducerRecord or a flow of ProducerRecords.

The types specified when instantiating the sink refer to the type of the Key and Value for the record being sent to the kafka topic.

When you configure the kafka sink to use as part of your pipeline, you must provide the following as part of the configuration:

  • bootstrapServers (required): these are the bootstrap servers for your kafka instance
  • clientId (required): this is the client id you would like to specify for the producer
  • keySerializer (required): this is the Serializer for the key of your producer record - this Serializer must be for an object that matches the key type defined in the sink instantiation
  • valueSerializer (required): this is the Serializer for the value of your producer record - this Serializer must be for an object that matches the value type defined in the sink instantiation
  • securityProtocol (optional): this is the security protocol you would like to use, the default is "SSL"
  • additionalConfig (optional): if you wish to set additional config for your sink that has not been specified above, you can do so using this map (see example below)

Here is a GPAL example of a Kafka sink definition:

val sink = kafkaSink<String, Int> {
bootstrapServers = "localhost"
clientId = "my-client"
keySerializer = StringSerializer()
valueSerializer = IntegerSerializer()
securityProtocol = "SSL"
additionalConfig = mapOf("request.timeout.ms" to "500")
}

pipelines {
pipeline("TEST_KAFKA_PIPELINE") {
source(testSource)
.sink(sink)
}
}

JMS Sink

This sink is available through the jms-genesis module. To use it, add the following dependency to your application:

implementation("global.genesis:jms-genesis:${properties["genesisIntegrationVersion"]}")

This is a sink that creates a JMS producer and sends Message objects to a message queue. You can send either a single Message or a flow of Message objects.

The type specified when instantiating the sink refers to the type of element that the sink can expect to receive. As part of the configuration, you must provide a valid lambda to convert this element type to a valid Message object to send to the message queue.

If the sink is unable to connect to the message queue, the process running the pipeline will enter a WARNING state.

The message displayed next to this state includes a monitor name, which you can configure (as shown below). The default name generated is based on the queue name. If you provide your own monitor name, ensure this is unique across all JMS Sources + Sinks for that process.

When you configure the JMS sink for your pipeline, you must provide the following as part of the configuration:

  • factory (required): this is any instance of QueueConnectionFactory e.g; MQQueueConnectionFactory (IBM MQ), ActiveMQConnectionFactory (Active MQ) etc
  • queueName (required): the name of the message queue you would like to publish to
  • username (required): the username used to create the queue connection
  • password (required): the password used to create the queue connection
  • buildMessage (required): the lambda to convert from the element sent to your sink to a valid Message object using the JMS session provided by the sink
  • retryDelayMs (optional): how long the sink should wait before attempting to connect to the queue again after failure to do so
  • maxRetries` (optional): how many times the producer should attempt to resend the message upon failure; the default is 100
  • monitorName (optional): name to be displayed when process state is set to WARNING; the default name has the format: JMSSinkMonitor-0-QUEUE_NAME

Here is a GPAL example of a JMS sink definition:

val sink = jmsSink<MessageSent> {
queueName = systemDefinition.getItem("QUEUE_NAME").toString()
factory = MQQueueConnectionFactory().apply {
hostName = systemDefinition.getItem("HOST_NAME").toString()
port = systemDefinition.getItem("PORT").toString().toInt()
queueManager = systemDefinition.getItem("QUEUE_MANAGER").toString()
transportType = systemDefinition.getItem("TRANSPORT_TYPE").toString().toInt()
channel = systemDefinition.getItem("CHANNEL").toString()
}
username = "admin"
password = "AdminPassword123"
buildMessage { element, session -> session.createTextMessage(element.text) }
}
pipelines {
pipeline("TEST_JMS_PIPELINE") {
source(testSource)
.map { it }
.sink(sink)
}
}

Handlers

On Completion Handler

This is a handler that executes after a successful pipeline operation. Multiple handlers can be added to a pipeline.

  • For batch pipelines, this is at the end of the whole pipeline.

  • For real-time pipelines, this is at the end of each processed element.

GPAL Example:

pipeline("TEST_ON_COMPLETION_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.sink(logSink)
.onCompletion { context: PipelineContext<Bulk<Trade>> ->
// do something
}
}

Send Event On Completion Handler

This is a type of on completion handler that triggers an event.

It takes in the name of the event as well as the input object that the event will need to run.

For example, for an event that does some logic based on the Trade object:

eventHandler<TradeDetails>("TRADE_LOGIC", transactional = true) {
onCommit { event ->
val trade = event.details
// further logic here using trade object
ack()
}
}

The following handler can be used:


val insertTradeOnCompletionHandler = sendEventOnCompletion<Bulk<Bond>, TradeDetails> {
eventName = "EVENT_TRADE_LOGIC"
buildEvent { context: PipelineContext<Bulk<Bond>> ->
TradeDetails(
// construct TradeDetails object based on provided context object
)
}
}

pipeline("TEST_TRADE_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.sink(logSink)
.onCompletion(insertTradeOnCompletionHandler)
}

On Uncaught Error Handler

This is a handler that allows the user to set up custom exception handling for errors that have not been caught during the course of the pipeline.

Multiple handlers can be added to the pipeline. Each handler has access to the throwable.

For example:

pipeline("TEST_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.sink(logSink)
.onUncaughtError { t ->
LOG.error("An exception was thrown in TEST_PIPELINE", t)
}
}

On Operation Error Handler

This is a handler that allows the user to set up custom exception handling for errors thrown at the individual operation level.

You can add multiple handlers to each operation stage. Each handler has access to:

  • the element that caused the exception
  • the PipelineContext
  • the throwable

To specify what the pipeline should do once when it catches an exception, you must provide an action for each handler:

  • SKIP_ELEMENT: skip the element that caused this exception (e.g; in an operation processing lines from a file, skip that individual line)
  • SKIP_STREAM: skip the stream that caused this exception (e.g; for the file example mentioned above, skip the entire file - any lines already processed will continue to the sink); this is only for real-time pipelines
  • STOP_PIPELINE: stop the pipeline from processing any more data
tip

SKIP_STREAM is only for real-time pipelines. If you use SKIP_STREAM in a batch pipeline, this effectlively acts as STOP_PIPELINE`.

For example:

pipeline("TEST_PIPELINE") {
source(dbBulkRealtimeSource)
.map(mapper)
.onOperationError { element, context, throwable ->
LOG.error("An exception was thrown in TEST_PIPELINE while processing $element", throwable)
OperationErrorAction.SKIP_ELEMENT
}
.sink(logSink)
}

See the following Component pages for more implementations: