How to consume and publish Kafka
Kafka is a standard messaging infrastructure used throughout financial markets. Genesis can consume messages from Kafka topicsand publish messages to Kafka topics.
This is handled transactionally and can be done in a few different ways, depending on the desired functionality.
Consuming from Kafka
This is done using a Genesis pipeline. A pipeline allows Genesis to process incoming, and outgoing, messages and can be used for a variety of connection methods including csv files as well as Kafka topics.
To implement a pipeline,you must add the following to your application's -processes.xml file:
<processes>
<process name="MYAPP_MANAGER">
...
<module>genesis-pal-datapipeline</module>
<package>global.genesis.pipeline</package>
<script>myapp-data-pipelines.kts</script>
...
</process>
</processes>
You can then configre Kafka in the pipeline.kts file like this:
val source = kafkaSource<String, Int> {
bootstrapServers = systemDefinition.getItem("BOOTSTRAP_SERVER").toString()
groupId = systemDefinition.getItem("CONSUMER_GROUP_ID").toString()
keyDeserializer = StringDeserializer()
valueDeserializer = IntegerDeserializer()
topic = systemDefinition.getItem("KAFKA_TOPIC").toString()
securityProtocol = systemDefinition.getItem("KAFKA_SECURITY_CONFIG").toString()
}
val operator: SplitOperator<ConsumerRecords<String, Int>, ConsumerRecord<String, Int>> = SplitOperator { consumerRecords ->
flow {
consumerRecords.forEach {
emit(it)
}
}
}
pipelines {
pipeline("KAFKA_TO_DB_PIPELINE") {
source(source)
.split(operator)
.map {
PriceReceived(it.key(), it.value())
}
.map {
DbOperation.Insert(it)
}
.sink(txDbSink())
}
}
The resulting message can be processed and stored in the database and can also trigger events via:
codey codey codey
Publishing to Kafka
This can be done in several different ways:
- Triggered by an event. You can use the AbstractProgrammaticSource to create a source implementation in one line that allows you to send to the pipeline from an event
package global.genesis.kafka
import global.genesis.gen.dao.PricePublished
import global.genesis.pipeline.event.AbstractProgrammaticSource
object ProgrammaticPriceSource : AbstractProgrammaticSource<PricePublished>()
Then in the event handler a call of:
eventHandler<PricePublished>(name = "PRICE_PUBLISH") {
onCommit { event ->
val price = event.details
entityDb.insert(price)
ProgrammaticPriceSource.send(price)
ack()
}
}
-
Use a database subscription source which detects changes and publishes these to kafka code?
-
Using scheduled cron rules to trigger the execution of a pipeline and publish the related data to kafka code?
There is an example application here need link that shows these in action.
Data types
All standard data/message types can be supported by Genesis, including strings, json, XML, FIX format More detials on these can be found here need link.
Testing
Go to our Testing page for details of our testing tools and the dependencies that you need to declare.
To test your auth set-up on your app:
- Details to follow shortly. Thank you for your patience.