Data Pipelines - basics
Set-up
You can configure your data pipelines in a Kotlin script file:
server/appname/src/main/genesis/scripts/appname-pipelines.kts.
Once created, you need to add this script to your application’s processes configuration.
If you are not using the Genesis Gradle Settings Plugin you need to add the following dependency to your app module:
If you are using the Genesis Gradle Settings Plugin, you don't need to do this.
- Gradle
- Maven
implementation(genesis("pal-datapipeline"))
<dependency>
<groupId>global.genesis</groupId>
<artifactId>genesis-pal-datapipeline</artifactId>
<version>$genesisVersion</version>
</dependency>
Pipelines
There are two kinds of pipeline:
- Batch pipeline
- Real-time pipeline
Whether a pipeline is batch or real-time simply depends on the source used to configure it.
Each pipeline must have a single source and single sink. In between these stages, you can define any number of operator steps.
Batch pipeline
A batch pipeline will only run once and must be triggered manually via its execute method. The pipeline can be retrieved from the Pipeline Manager class.
The execute method takes in a map of input parameters, which provide the context to be used throughout the pipeline.
Here is an example of a batch pipeline:
// GPAL Script to define pipeline
val fileStorageSource = fileStorageSource()
pipelines {
pipeline("TEST_FILE_STORAGE") {
source(fileStorageSource)
.transform(StreamOperator<PipelineFile, String> { input -> flow {
input.collect { pipelineFile ->
pipelineFile.content.bufferedReader().use { it.readText() } }
}
})
.sink(fileSink)
}
}
Triggering execution
You can trigger the execution of the above example pipeline from anywhere in your codebase.
Here is an example of triggering inside an eventHandler
:
val pipelineManager = inject<PipelineManager>()
eventHandler<FileStorageData>(name = "FILE_PIPELINE_START") {
onCommit { event ->
val details = event.details
val pipeline = pipelineManager.getBatchPipeline("TEST_FILE_STORAGE")
pipeline?.execute(mapOf("FILE_STORAGE_ID" to details.fileStorageId, "FILE_NAME" to details.fileName))
ack()
}
}
Real-time pipeline
A real-time pipeline is started as soon as the process running the script is healthy. It is only stopped once the process has shut down.
If you need to stop or start the pipeline manually, you can do this using the Pipeline Manager; however, this is not recommended.
Here is an example of a real-time pipeline:
// GPAL Script to define pipeline
val batchPollSource = dbBatchQuery<Trade> {
source = "MY_SOURCE"
index = Trade.ByTimestamp
}
pipelines {
pipeline("TEST_BATCH_POLL_PIPELINE") {
source(batchPollSource)
.sink(testSink)
}
}