Sap cloud integration has kafka adapter , but currently the kafka adapter does not support  calling kafka on premise  through SAP Cloud connector . We can refer to note 316484 for this . There are 2 options for resolving this currently :

1,  Expost the kafka on premise to public web, then CPI kafka adapter can connect to it .

2,  Build rest api proxy for kafka , then CPI can produce or consume message on kafka op by using CPI http adapter with the help of sap cloud connector .

Today I want try to investigate option 2 . There are existing open source kafka rest proxies, but maybe customer has some concern to use them . In this blog I will try to  develope kafka rest api with nodejs to make it callable from CPI through sap cloud connector . In this part I , I will  prepare kafka  envirement .

To build the scenario, first  let me use docker to create kafka service  on my  computer . Of course I have installed docker on my laptop . The following is the steps .

Step 1 : pull docker image for zookeeper and kafka

docker pull wurstmeister/zookeeper

docker pull wurstmeister/kafka

Step 2 : Create and start zookeeper container

docker run -d –name zookeeper -p 2181:2181 wurstmeister/zookeeper

Step 3 : Create and start kafka container and connect kafka to zookeeper container in step 2

docker run -d –name kafka –publish 9092:9092 –link zookeeper –env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 –env KAFKA_ADVERTISED_HOST_NAME=localhost –env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

Step 4 : Enter kafka container

docker ps

docker exec -it 664dcfafd35c /bin/bash

 

Step 5  : create and check topic in  kafka container

kafka-topics.sh –create –zookeeper zookeeper:2181 –replication-factor 1 –partitions 1 –topic dblab01

kafka-topics.sh –list –zookeeper zookeeper:2181

Step 6  : start producer and produce data in  kafka container

kafka-console-producer.sh –broker-list localhost:9092 –topic dblab01

Step 7  : start consumer and consume data in  kafka container

Step 8 :  start server.js to to test  kafka locally  .

The following is the code .

const { Kafka } = require('kafkajs')
 
const kafka = new Kafka({
  clientId: 'my-app',
  requestTimeout: 25000,
  connectionTimeout: 30000,
  authenticationTimeout:30000,
  retry: {
    initialRetryTime: 3000,
    retries: 0
  },
  brokers: ['localhost:9092']
})
 
const producer = kafka.producer()
const consumer = kafka.consumer({ groupId: 'test-group' })
 
const run = async () => {
  // Consuming
  await consumer.connect()
  await consumer.subscribe({ topic: 'dblab01', fromBeginning: true })
 
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        // await new Promise(r=>setTimeout(r,3000))
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}
 
run().catch((e)=>{
    debugger
})

setInterval(async ()=>{
    await producer.connect()
    await producer.send({
        topic: 'dblab01',
        messages: [
          { value: 'Hello KafkaJS user!' },
        ],
      })
},1000)

npm install kafkajs

node server.js

 

to be continue in Part II .

Sara Sampaio

Sara Sampaio

Author Since: March 10, 2022

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x