Skip to main content
Version: Current

How to ingest CSV files

Using Data Pipeline to ingest data

When you need to take in data from an external source, the first option you should consider is to use the platform's Data Pipeline component.

This enables you to define the data source, transform it to match fields in your tables, then insert or update the data in the specified table. This can be set up to handle streaming data or static files.

A good practical example is to be able to take Copp Clark holiday data that arrives in CSV files.

info

You can also use the Data Pipeline component to send out data to external systems. This works the same way in reverse, taking data from tables in your Genesis application and transforming it into the format required by the target system.

Here we are going focus on incoming data in static files.

Transforming data

To create a Data Pipeline for ingesting data, you create a -project-name_data-pipeline.kts file to define a pipeline that maps data from an external source (database, file) to tables in your application.

Each Data Pipeline must define three things:

  • source specifies the location of the incoming data
  • operator parses the source and maps the data to fields in the application's database; typically, you are looking to transform column headings on the incoming files into the relevant fields in the target table in your database
  • sink is the destination of the transformed data within your application. This can be a file or a queue, for example. You can use this to further update the data and store it somewhere else - such as a different database or a log

Source

Your pipeline needs a data source. The example below points to a local filesystem location. It pulls in any file that is dropped into the local file system’s directory $USER_HOME/run/runtime/coppClarkSimple:

source(
camelSource {
location = file://run/runtime/coppClarkSimple
}
)

Operators

Operators are used to transform input format to output format (although they can do more). You can chain them together, where each operator simply takes the output type of the previous one.

So in this case, the output of .split goes into .map and the output type of .map is a list of DbOperations, which the next sink (dbSync) can take as input.

            //1
.split(csvRawDecoder())
//2
.map { row ->
DbOperation.Upsert( ExchangeHoliday {
//The exclamations at the end tell the code "we expect this to always be present in the row data"
holidayDate = parseCCDateFormat(row.data["EventDate"])!!
holidayName = row.data["EventName"]
holidayType = getHolidayType(row.data["FileType"])!!
isoMic = row.data["ISO MIC Code"]!!
})
}

The example above has two operators. Let's look at them more closely.

  • split uses the pre-built decoder which converts the CSV file and outputs a map containing each row in column + value: CsvRow<Map<String, String>>
  • map takes the data we need from the row columns and formats it accordingly. We are building up ExchangeHoliday, a database entity in our app, wrapped in the Db operation we want to perform on it; in this case, we want to upsert.

For more complex requirements, you can create your own transformer in a seperate file. You can see this in two of our examples.

Also, you can build multiple database entities from a single file. You can see details of this in the third example.

Sink

Once the data is in the required format, we want to sink it. For file ingress, this means updating the database.

The example sink below takes the output from .map (a set of DB operations) and handles the updating of the database.

.sink(dbSink())

That’s all you need to ingest the data and put it in your database.

However, there are additional helpers for things such as onCompletion. This enables you specify what happens after the file processing is complete.

In the example below, we use this to log that we’ve completed it, and to send a notification to users:

.onCompletion { context ->
LOG.info("Completed processing of file ${context.data.name}")
}.onCompletion(
notifyAllScreensOnCompletion {
body = "Finished processing filename ${context.data.name}"
header = "Finished Processing Copp Clark Exchange Holidays File"
severity = Severity.Information
}
)

Three different examples

To show how to use Data Pipeline to ingest data, we have provided an example application, which has three different scenarios. All of these are based on incoming data in Copp Clark holiday files:

  • The first example simply covers the task of parsing the incoming data (a CSV file that contains Exchange Settlement Holiday details) and then updating a table in the application's database (the EXCHANGE_HOLIDAY table).

  • The second example looks at two incoming files - Trading and Settlement - and applies more sophisticated analysis and logic. The database is only updated when changes are detected, including deleting dates that are no longer designated holidays. In this example, the logic is defined in a separate transformer file, which is called from within the pipeline itself.

  • The third example is an extension of the second. It adds maintenance of EXCHANGE data, which is contained in every row of the inbound files.

The application also includes guidance for:

  • Sourcing from another location
    • Remove SFTP serving
    • AWS S3 Bucket

Throughout, the code in the application includes detailed comments explaining the steps.

Finding the examples

The examples are within one complete example application, which includes a front end so that you can run and see the data. You can clone the repo to see the code - which includes comments at the key points to highlight what is being specified - and run the application.

We shall look at each example in more detail.

Simple CSV ingestion

The example below parses a simple CSV and upserts the Exchange Settlement Copp Clark file into the application's EXCHANGE_HOLIDAY table.

This example is designed to highlight how simple it is to ingest the data. The example works, but, for initial clarity, it excludes things that you would include in a finished application:

  • It assumes that the file is in a valid format in file. It will not generate errors gracefully if a row has missing or badly formatted data.
  • It simply upserts records (inserts, else if present modifies). So, if you are using auditing, the audit trail can get very busy unless you typically expect delta files (where each new file contains only additions and modifications from the previous version).
  • It only logs upon completion.
Take a look at the code

The .kts file for simple csv ingress contains the main configuration. Comments are inserted in the code to explain each key point.

Take a look at the app
  1. Run the app.
  2. Copy the files in the folder sampleData/simple into localDevFileDrop/coppClarkSimple.

See the HOWTO_CSV_INGRESS_COMPACT_PROCESS logs for logged details.

To view the data, run the GUI or use the Genesis DbMon command.

Snapshot file CSV ingestion

This example shows something a bit more robust, and looks at the nuances of Copp Clark files:

  • The file sent each day is a full snapshot of holidays. If the latest file does not include a holiday that was present in the previous version, then it must be deleted.
  • There are two separate files, one for Trading and one for Settlement. Each file can contain multiple lines for the same holiday date on a given exchange. Our app only needs to know if a given date is a holiday; for this reason, we key the exchange table on just Holiday Type (Settlement vs Trading) + ISO MIC (the Exchange) and Holiday Date in our EXCHANGE table definition - so we don't need the redundant lines.

Two further things elevate this snapshot above the simple one:

  • It includes auditing: the EXCHANGE table is auditable.
  • It only writes updates if something has changed (whereas the simple example simply upserts with the same data each time).

Keeping the data transformation separate

A major difference between this example and the simple one is that the operators have been built separately in the CoppClarkHolidayTransformer.kt file. This file uses a Data Pipeline StreamOperator, which reads a map of the dcoded CSV rows and ouptuts a series of DbOperations (inserts, modifies, deletes and upserts) on the ExchangeHolidays table.

The operators are called from within the pipelines. This avoids large pipeline codes with complex logic.

Take a look at the code

The .kts file for snapshot data contains the main configuration. Comments are inserted in the code to explain each key point.

The CoppClarkHolidayTransformer.kt file contains all the logic. Comments are inserted in the code to explain each key point.

Take a look at the app
  1. Run the app.
  2. Copy the files in the folder sampleData/snapshot into localDevFileDrop/coppClarkSnapshot.

See the HOWTO_CSV_INGRESS_COMPACT_PROCESS logs for logged details.

To view the data, run the GUI or use the Genesis DbMon command.

Updating multiple different entities from a single source

You can modify more than one type of entity record from your file upload.

This example is an extension of the previous Snapshot CSV file example. Additionally, it maintains EXCHANGE data, which is contained in every row of the inbound files. So, it ensures that there is an EXCHANGE entry for every calendar we're adding.

Take a look at the code

You can see the main configuration in the snapshot-exchange-data-pipelines.kts file. This file is very similar to the snapshot-data-pipelines.kts example. So, the comments in the file are exclusively focused on the logic that is related to exchange data.

In the CoppClarkHolidayAndExchangeTransformer.kt file, the main difference to note is the StreamOperator output is of type DbOperation<out TableEntity>. This means it can be a DbOperation on any database type. Here we have just two table entities, but it could be any mix of entities in our application.

Take a look at the app
  1. Run the app.
  2. Copy the files in the folder sampleData/snapshot into localDevFileDrop/coppClarkSnapshotExchange.

See the HOWTO_CSV_INGRESS_COMPACT_PROCESS logs for logged details.

To view the data, run the GUI or DbMon task.

Setting up a pipeline in your own app

When you want to create a Data Pipeline in your own application, there are other things you need to do in addition to creating the pipeline files themselves.

Processes.xml

Every process in your app must be configured in the application's -processes.xml file. See the howto-csv-ingress-processes.xml file for a simple example.

At minimum, you need to

For example:

<processes>
<process name="MYAPP_MANAGER">
...
<module>genesis-pal-datapipeline</module>
<package>global.genesis.pipeline</package>
<script>myapp-simple-data-pipelines.kts</script>
...
</process>
</processes>

We would then add code into the data pipeline file like this:

pipelines {
pipeline("MY_PIPELINE") {
source(camelSource { location = getDefaultLocalFileCamelLocation(systemDefinition.get("FolderLocation").get(),"FileName") })
.map { LOG.info("Triggered MY_PIPELINE"); it}
.split(csvRawDecoder())
.map { row ->
...
}
.sink(dbSink())
.onCompletion
}
}

System definition

Every application also has a main configuration file called genesis-system-definition.kts.

Check this file and make sure that the application knows where to source the files from.

Testing

info

Go to our Testing page for details of our testing tools and the dependencies that you need to declare.

To test your auth set-up on your app:

  • Details to follow shortly. Thank you for your patience.

Full technical details

You can find full technical details of how to use the Data Pipeline in our reference documentation.