In previous blog Part I, we have prepared the on premise kafka envirement . Today we will try build nodejs Kafka rest api proxy and use cpi to produce and consume kafka message by deployed rest api .
We can follow the following steps :
Step 1 : deploy CPI iflow to consume kafka topic’s msessage
Step 2 : deploy nodejs rest api to produce message to Kafka and consume kafka message and forward the message to cpi .
code snippy :
Server.js
const express = require('express');
const bodyParser = require('body-parser');
const { Kafka, logLevel } = require('kafkajs');
const axios = require('axios');
const oauth = require('axios-oauth-client');
const cpiurl = 'https://1s4hcextension.it-cpi001-rt.cfapps.eu10.hana.ondemand.com/http/kafka/sender';
const getClientCredentials = oauth.client(axios.create(), {
url: 'https://1s4hcextension.authentication.eu10.hana.ondemand.com/oauth/token',
grant_type: 'client_credentials',
client_id: client_id_from_cpi_runtime_service_key,
client_secret: client_secret_from_cpi_runtime_service_key,
scope: ''
});
const app = express();
const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
logLevel: logLevel.INFO,
brokers: ['localhost:9092']
});
app.use(express.text())
app.use(bodyParser.json());
app.listen(4004, () => { console.log('===> Server started') })
// depoly api to produce message to kafka
app.post('/kafka/:topic', (req, res) => {
const topicv = req.params.topic;
console.log(topicv);
const kafka = new Kafka({
clientId: 'my-app',
requestTimeout: 25000,
connectionTimeout: 30000,
authenticationTimeout:30000,
retry: {
initialRetryTime: 3000,
retries: 0
},
brokers: ['localhost:9092']
})
const producer = kafka.producer();
const run = async () => {
await producer.connect();
await producer.send({
topic: topicv,
messages: [
{ value: req.body.msg.toString() },
],
});
await producer.disconnect();
res.status(200).send('message send');
}
run().catch((e)=>{
console.log(e);
res.status(500).send('error');
})
}) ;
// consume kafka message and forward the message to CPI
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'dblab01', fromBeginning: true })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
getClientCredentials().then((token1)=>{
// console.log(token1.access_token);
const config = {
headers:{
"Authorization": `Bearer ${token1.access_token}`,
"Content-Type": "application/json"
}
};
const data ={
msg: message.value.toString()
};
axios.post(cpiurl,data,config).then(res=>{
console.log(res);
}).catch(e=>{
console.log(e);
});
}).catch(e=>{
console.log(e);
})
},
})
}
run().catch((e)=>{
console.log(e);
})
package.json
{
"name": "capkafka",
"version": "1.0.0",
"description": "capkafka",
"main": "server.js",
"dependencies": {
"axios": "^0.27.2",
"axios-oauth-client": "^1.4.4",
"body-parser": "latest",
"express": "^4.17.1",
"kafkajs": "latest",
"passport": "^0.4.0"
},
"scripts": {
"test": "echo "Error: no test specified" && exit 1"
},
"author": "jacky liu",
"license": "ISC"
}
Start the nodejs application :
Step 3 : test the deployed rest api with postman locally .
Step 4 : check the message in CPI .
Step 4 : Deploy a iflow to call the nodejs rest api to producet messge in Kafka . .I will skip this . Cpi http or https adapter support calling op rest api with the help of sap cloud connector .
Subscribe
Login
Please login to comment
0 Comments