In previous blog Part I, we have prepared the on premise  kafka envirement . Today we will try build nodejs Kafka rest api proxy and use cpi to produce and consume kafka message by deployed rest api .

We can follow the following steps :

Step 1 :  deploy  CPI iflow to consume  kafka topic’s msessage

Step 2 : deploy  nodejs rest api  to produce message to Kafka  and  consume kafka message and forward the message to cpi  .

code snippy :

Server.js

const express = require('express');
const bodyParser = require('body-parser');
const { Kafka, logLevel } = require('kafkajs');
const axios = require('axios');
const oauth = require('axios-oauth-client');
const cpiurl = 'https://1s4hcextension.it-cpi001-rt.cfapps.eu10.hana.ondemand.com/http/kafka/sender';

const getClientCredentials = oauth.client(axios.create(), {
    url: 'https://1s4hcextension.authentication.eu10.hana.ondemand.com/oauth/token',
    grant_type: 'client_credentials',
    client_id: client_id_from_cpi_runtime_service_key,
    client_secret: client_secret_from_cpi_runtime_service_key,
    scope: ''
  });

const app = express();

const kafka = new Kafka({
    clientId: 'my-app',
    requestTimeout: 25000,
    connectionTimeout: 30000,
    authenticationTimeout:30000,
    retry: {
      initialRetryTime: 3000,
      retries: 0
    },
    logLevel: logLevel.INFO,
    brokers: ['localhost:9092']
  });

app.use(express.text())
app.use(bodyParser.json());

app.listen(4004,  () => { console.log('===> Server started') })


// depoly api to  produce  message to kafka
app.post('/kafka/:topic', (req, res) => {
    const topicv = req.params.topic;
    console.log(topicv);
const kafka = new Kafka({
    clientId: 'my-app',
    requestTimeout: 25000,
    connectionTimeout: 30000,
    authenticationTimeout:30000,
    retry: {
      initialRetryTime: 3000,
      retries: 0
    },
    brokers: ['localhost:9092']
  })
  const producer = kafka.producer();
  const run = async () => {
  await producer.connect();
  await   producer.send({
    topic: topicv,
    messages: [
      { value: req.body.msg.toString() },
    ],
  });
  await producer.disconnect();

  res.status(200).send('message send');
}
run().catch((e)=>{
    console.log(e);
    res.status(500).send('error');
})   
})  ;


// consume kafka message and forward the message to CPI 
const consumer = kafka.consumer({ groupId: 'test-group' });
 
const run = async () => {
  // Consuming

  await consumer.connect()
  await consumer.subscribe({ topic: 'dblab01', fromBeginning: true })
 
  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {

      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
      getClientCredentials().then((token1)=>{  
          // console.log(token1.access_token);    
          const config = {
            headers:{
                "Authorization": `Bearer ${token1.access_token}`,
              "Content-Type": "application/json"
            }
          };  
          const data ={
            msg: message.value.toString()
          };
        axios.post(cpiurl,data,config).then(res=>{
            console.log(res);
        }).catch(e=>{
            console.log(e);
        });
    
    }).catch(e=>{
            console.log(e);
        })

    },
  })
}
 
run().catch((e)=>{
    console.log(e);
})





package.json

{
  "name": "capkafka",
  "version": "1.0.0",
  "description": "capkafka",
  "main": "server.js",
  "dependencies": {
    "axios": "^0.27.2",
    "axios-oauth-client": "^1.4.4",
    "body-parser": "latest",
    "express": "^4.17.1",
    "kafkajs": "latest",
    "passport": "^0.4.0"
  },
  "scripts": {
    "test": "echo "Error: no test specified" && exit 1"
  },
  "author": "jacky liu",
  "license": "ISC"
}

Start the nodejs application :

 

 

Step 3 : test the deployed rest api with postman locally .

 

Step 4 : check the message in CPI  .

 

 

 

Step 4 :  Deploy  a iflow to call the nodejs rest api to producet messge in Kafka . .I will skip this . Cpi  http or https adapter support  calling op rest api with the help of  sap cloud connector .

Sara Sampaio

Sara Sampaio

Author Since: March 10, 2022

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x