Skip to main content
Version: Current

How to consume and publish Kafka

Kafka is a standard messaging infrastructure used throughout financial markets. Genesis can consume messages from Kafka topicsand publish messages to Kafka topics.

This is handled transactionally and can be done in a few different ways, depending on the desired functionality.

Consuming from Kafka

This is done using a Genesis pipeline. A pipeline allows Genesis to process incoming, and outgoing, messages and can be used for a variety of connection methods including csv files as well as Kafka topics.

To implement a pipeline,you must add the following to your application's -processes.xml file:

<processes>
<process name="MYAPP_MANAGER">
...
<module>genesis-pal-datapipeline</module>
<package>global.genesis.pipeline</package>
<script>myapp-data-pipelines.kts</script>
...
</process>
</processes>

You can then configre Kafka in the pipeline.kts file like this:

val source = kafkaSource<String, Int> {
bootstrapServers = systemDefinition.getItem("BOOTSTRAP_SERVER").toString()
groupId = systemDefinition.getItem("CONSUMER_GROUP_ID").toString()
keyDeserializer = StringDeserializer()
valueDeserializer = IntegerDeserializer()
topic = systemDefinition.getItem("KAFKA_TOPIC").toString()
securityProtocol = systemDefinition.getItem("KAFKA_SECURITY_CONFIG").toString()
}

val operator: SplitOperator<ConsumerRecords<String, Int>, ConsumerRecord<String, Int>> = SplitOperator { consumerRecords ->
flow {
consumerRecords.forEach {
emit(it)
}
}
}

pipelines {
pipeline("KAFKA_TO_DB_PIPELINE") {
source(source)
.split(operator)
.map {
PriceReceived(it.key(), it.value())
}
.map {
DbOperation.Insert(it)
}
.sink(txDbSink())
}
}

The resulting message can be processed and stored in the database and can also trigger events via:

    codey codey codey

Publishing to Kafka

This can be done in several different ways:

  • Triggered by an event. You can use the AbstractProgrammaticSource to create a source implementation in one line that allows you to send to the pipeline from an event
package global.genesis.kafka

import global.genesis.gen.dao.PricePublished
import global.genesis.pipeline.event.AbstractProgrammaticSource

object ProgrammaticPriceSource : AbstractProgrammaticSource<PricePublished>()

Then in the event handler a call of:

  eventHandler<PricePublished>(name = "PRICE_PUBLISH") {
onCommit { event ->
val price = event.details
entityDb.insert(price)
ProgrammaticPriceSource.send(price)
ack()
}
}
  • Use a database subscription source which detects changes and publishes these to kafka code?

  • Using scheduled cron rules to trigger the execution of a pipeline and publish the related data to kafka code?

There is an example application here need link that shows these in action.

Data types

All standard data/message types can be supported by Genesis, including strings, json, XML, FIX format More detials on these can be found here need link.

Testing

info

Go to our Testing page for details of our testing tools and the dependencies that you need to declare.

To test your auth set-up on your app:

  • Details to follow shortly. Thank you for your patience.