How to consume and publish Kafka

Kafka is a standard messaging infrastructure used throughout financial markets. Genesis can consume messages from Kafka topicsand publish messages to Kafka topics.

This is handled transactionally and can be done in a few different ways, depending on the desired functionality.

Consuming from Kafka

This is done using a Genesis pipeline. A pipeline allows Genesis to process incoming, and outgoing, messages and can be used for a variety of connection methods including csv files as well as Kafka topics.

To implement a pipeline,you must add the following to your application's -processes.xml file:

<process name="MYAPP_MANAGER">

You can then configre Kafka in the pipeline.kts file like this:

val source = kafkaSource<String, Int> {
bootstrapServers = systemDefinition.getItem("BOOTSTRAP_SERVER").toString()
groupId = systemDefinition.getItem("CONSUMER_GROUP_ID").toString()
keyDeserializer = StringDeserializer()
valueDeserializer = IntegerDeserializer()
topic = systemDefinition.getItem("KAFKA_TOPIC").toString()
securityProtocol = systemDefinition.getItem("KAFKA_SECURITY_CONFIG").toString()

val operator: SplitOperator<ConsumerRecords<String, Int>, ConsumerRecord<String, Int>> = SplitOperator { consumerRecords ->
flow {
consumerRecords.forEach {

pipelines {
pipeline("KAFKA_TO_DB_PIPELINE") {
.map {
PriceReceived(it.key(), it.value())
.map {

The resulting message can be processed and stored in the database and can also trigger events via:

Publishing to Kafka

This can be done in several different ways:

  • Triggered by an event. You can use the AbstractProgrammaticSource to create a source implementation in one line that allows you to send to the pipeline from an event
package global.genesis.kafka

import global.genesis.gen.dao.PricePublished
import global.genesis.pipeline.event.AbstractProgrammaticSource

object ProgrammaticPriceSource : AbstractProgrammaticSource<PricePublished>()

Then in the event handler a call of:

  eventHandler<PricePublished>(name = "PRICE_PUBLISH") {
onCommit { event ->
val price = event.details
  • Use a database subscription source which detects changes and publishes these to kafka code?

  • Using scheduled cron rules to trigger the execution of a pipeline and publish the related data to kafka code?

There is an example application here that shows these in action.

Data types

All standard data/message types can be supported by Genesis, including strings, json, XML, FIX format.



