Skip to main content
Version: Current

Data Pipelines - basics

Set-up

You can configure your data pipelines in a file called app-name-pipelines.kts. This must be located in your application’s scripts directory.

Once created, you need to add this script to your application’s processes configuration.

If you are not using the Genesis Gradle Settings Plugin you need to add the following dependency to your app module:

Only if you are not using the Gradle Settings Plugin

If you are using the Genesis Gradle Settings Plugin, you don't need to do this.

implementation(genesis("pal-datapipeline"))

Pipelines

There are two kinds of pipeline:

  • Batch pipeline
  • Realtime pipeline

Whether a pipeline is batch or realtime simpley depends on the source used to configure it.

Each pipeline must have a single source and single sink. In between these stages, you can define any number of operator steps.

Batch pipeline

A batch pipeline will only run once and must be triggered manually via its execute method. The pipeline can be retrieved from the Pipeline Manager class.

The execute method takes in a map of input parameters, which provide the context to be used throughout the pipeline.

Here is an example of a batch pipeline:

// GPAL Script to define pipeline
val fileStorageSource = fileStorageSource()

pipelines {
pipeline("TEST_FILE_STORAGE") {
source(fileStorageSource)
.transform(StreamOperator<PipelineFile, String> { input -> flow {
input.collect { pipelineFile ->
pipelineFile.content.bufferedReader().use { it.readText() } }
}
})
.sink(fileSink)
}
}

Triggering execution

You can trigger the execution of the above example pipeline from anywhere in your codebase.

Here is an example of triggering inside an eventHandler:

val pipelineManager = inject<PipelineManager>()

eventHandler<FileStorageData>(name = "FILE_PIPELINE_START") {
onCommit { event ->
val details = event.details

val pipeline = pipelineManager.getBatchPipeline("TEST_FILE_STORAGE")
pipeline?.execute(mapOf("FILE_STORAGE_ID" to details.fileStorageId, "FILE_NAME" to details.fileName))
ack()
}
}

Real-time pipeline

A real-time pipeline is started as soon as the process running the script is healthy. It is only stopped once the process has shut down.

If you need to stop or start the pipeline manually, you can do this using the Pipeline Manager; however, this is not recommended.

Here is an example of a real-time pipeline:

// GPAL Script to define pipeline
val batchPollSource = dbBatchQuery<Trade> {
source = "MY_SOURCE"
index = Trade.ByTimestamp
}

pipelines {
pipeline("TEST_BATCH_POLL_PIPELINE") {
source(batchPollSource)
.sink(testSink)
}
}