Skip to main content
Version: Current

Document Management - `Data Pipelines

There are some pipeline implementations provided with the Document Manager that can be used within any data pipeline you have set up. See Data Pipelines for more information.

Set Up

In order to have access to the following out of box GPAL extension functions you will need to add the following dependency to the app module of your project:

implementation("global.genesis:file-server-pipelines:$fileServerVersion")

File Storage Source (Batch)

Batch source which retrieves a single file that has already been uploaded to the file storage and returns the respective details as a PipelineFile object. When running execute to trigger this source, at least one of the following should be provided as part of the inputs map with the following keys:

  • FILE_STORAGE_ID
  • FILE_NAME

If at least one of these values is not provided then the source will throw an exception.

GPAL Example:

// GPAL Script to define pipeline
val fileStorageSource = fileStorageSource()

pipelines {
pipeline("TEST_FILE_STORAGE") {
source(fileStorageSource)
.sink(fileSink)
}
}

Example of how you would trigger the execution of this pipeline in an event handler:

val pipelineManager = inject<PipelineManager>()

eventHandler<FileStorageData>(name = "FILE_PIPELINE_START") {
onCommit { event ->
val details = event.details

val pipeline = pipelineManager.getBatchPipeline("TEST_FILE_STORAGE")
pipeline?.execute(mapOf("FILE_STORAGE_ID" to details.fileStorageId, "FILE_NAME" to details.fileName))
// note: both id and file name are not necessary but can be provided
ack()
}
}

File Storage Sink

A sink into a file which takes a stream of strings as input. It requires a FileStorageClient and some config of type FileStorageSinkConfig. The config is used to set the userName and the fileName. There is also a buildFileName lambda function which allows the user to do add some logic to make the fileName dynamic.

GPAL Example:

// Create a counter which we use in the buildFileName lambda.
val counter = AtomicInteger(0)

// db source which emits a stream of table entities
val dbSource = dbBulkQuery<TableEntity>()

// file sink, we use the fileStorageSink {} lambda to initialise it.
val fileSink = fileStorageSink {
userName = "JohnDoe"
buildFileName {
"processedFile" + counter.getAndIncrement() + ".csv"
}
}

pipelines {
pipeline("TEST_FILE_STORAGE") {

// since the source outputs a different type to what the sink takes we convert it to the respective type, i.e. a string.
source(dbSource)
.map{
"${it.recordId}, ${it.timestamp}"
}
.sink(fileSink)
}
}