Document Management - `Data Pipelines
There are some pipeline implementations provided with the Document Manager that can be used within any data pipeline you have set up. See Data Pipelines for more information.
Set Up
In order to have access to the following out of box GPAL extension functions you will need to add the following dependency to the app module of your project:
- Gradle
- Maven
implementation("global.genesis:file-server-pipelines:$fileServerVersion")
<dependency>
<groupId>global.genesis</groupId>
<artifactId>file-server-pipelines</artifactId>
<version>$fileServerVersion</version>
</dependency>
File Storage Source (Batch)
Batch source which retrieves a single file that has already been uploaded to the file storage and returns the respective details as a PipelineFile
object.
When running execute to trigger this source, at least one of the following should be provided as part of the inputs map with the following keys:
- FILE_STORAGE_ID
- FILE_NAME
If at least one of these values is not provided then the source will throw an exception.
GPAL Example:
// GPAL Script to define pipeline
val fileStorageSource = fileStorageSource()
pipelines {
pipeline("TEST_FILE_STORAGE") {
source(fileStorageSource)
.sink(fileSink)
}
}
Example of how you would trigger the execution of this pipeline in an event handler:
val pipelineManager = inject<PipelineManager>()
eventHandler<FileStorageData>(name = "FILE_PIPELINE_START") {
onCommit { event ->
val details = event.details
val pipeline = pipelineManager.getBatchPipeline("TEST_FILE_STORAGE")
pipeline?.execute(mapOf("FILE_STORAGE_ID" to details.fileStorageId, "FILE_NAME" to details.fileName))
// note: both id and file name are not necessary but can be provided
ack()
}
}
File Storage Sink
A sink into a file which takes a stream of strings as input.
It requires a FileStorageClient
and some config of type FileStorageSinkConfig
.
The config is used to set the userName
and the fileName
.
There is also a buildFileName
lambda function which allows the user to do add some logic to make the fileName
dynamic.
GPAL Example:
// Create a counter which we use in the buildFileName lambda.
val counter = AtomicInteger(0)
// db source which emits a stream of table entities
val dbSource = dbBulkQuery<TableEntity>()
// file sink, we use the fileStorageSink {} lambda to initialise it.
val fileSink = fileStorageSink {
userName = "JohnDoe"
buildFileName {
"processedFile" + counter.getAndIncrement() + ".csv"
}
}
pipelines {
pipeline("TEST_FILE_STORAGE") {
// since the source outputs a different type to what the sink takes we convert it to the respective type, i.e. a string.
source(dbSource)
.map{
"${it.recordId}, ${it.timestamp}"
}
.sink(fileSink)
}
}