Consuming Kafka messages – Asset Intelligence Network
Asset Intelligence Network (AIN) is part of Intelligent Asset Management portfolio alongside Asset Strategy and Performance Management (ASPM) and Predictive Asset Insights (PAI) with a common core component, Asset Central Foundation (ACF).
Asset Intelligence Network triggers Kafka messages for various objects and for a growing list of scenarios. These messages open an avenue for customers to be notified of changes happening to the objects in AIN so that they can use it in various integration scenarios.
Apache Kafka
Apache Kafka is an open-source distributed event streaming platform used for data pipelines, streaming analytics, data integration and various other use cases. More details can be found here.
AIN and Apache Kafka
AIN utilises Apache Kafka as a service on Business Technology Platform (BTP) and provides a mechanism through which a Kafka cluster and relevant topics can be shared with its customers.
Customers can develop their own Kafka consumers to consume messages from the topics that are shared. In order to utilise the feature, customers need to place a request for the cluster / topics to be shared as described here.
How to Consume?
Once advertised, the details for the cluster and topics can be checked as follows. Use the REST API to fetch the details:
API: https://kafka-marketplace.cf.<landscape>.hana.ondemand.com/v1/marketplace?org=<cloud foundry org name>
Method: GET
Response:
Note: Pass the bearer token to the API. To generate, use cf oauth-token command.
- Add/Assign necessary entitlements to your Subaccount with “reference” plan.
- In your cloud foundry space create a service instance for the advertised cluster using the information from the above API response. Use Cloud foundry CLI.
cf create-service kafka reference <instance name> -c
'{"advertisement":"<advertisement id from the API response>"}'
How to write a consumer?
JAVA:
The following example is written as a reactive vertx application to consume Kafka messages. However, there are many ways in which it can be written.
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
String consumerGroup = "DemoGroup";
String topicName = "<subaccountid>.com.sap.dsc.ac.equipment";
//refer latest documentation for the updated topic names format or refer the response of the marketplace API
Properties props = new Properties();
props.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1");
props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000");
props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "20000");
// use consumer for interacting with Apache Kafka
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx,
KafkaRulesConsumerFactory.kafkaConsumerCreate(consumerGroup, props));
consumer.handler(record -> {
String rec = record.value().toString();
byte[] decoded = Base64.getMimeDecoder().decode(rec.getBytes());
System.out.println("Processing key=" + record.key() + ",value=" + rec +
",partition=" + record.partition() + ",offset=" + record.offset());
try {
String result = KafkaUtils.readAvroMessageWithSchema(decoded);
System.out.println(result);
}
catch(Exception e)
{
System.out.println(e);
}
});
// or just subscribe to a single topic
consumer
.subscribe(topicName)
.onSuccess(v ->
System.out.println("subscribed")
).onFailure(cause ->
System.out.println("Could not subscribe " + cause.getMessage())
);
}
}
In order to connect to the Kafka cluster, SASL_SSL (PLAIN) protocol is used. The root certificate is downloaded using the APIs listed in the service key of the instance. A keystore is created and is passed in the properties used to connect to the Kafka instance.
How to decode the message?
The messages from ACF use Avro based schema and are base64 encoded.
public static String readAvroMessageWithSchema(byte[] message) throws IOException {
StringBuilder queueMessage = new StringBuilder();
SeekableByteArrayInput input = new SeekableByteArrayInput(message);
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileReader dataFileReader = new DataFileReader(input, reader);
GenericRecord data = (GenericRecord) dataFileReader.next();
return data.toString();
}
Some key dependencies used:
"io.vertx:vertx-kafka-client:4.0.3"
"io.projectreactor.kafka:reactor-kafka:1.3.3"
"org.apache.avro:avro:1.10.2"
NodeJs:
async function kConnect() {
let tokenVal;
tokenVal = await config().token();
const kafka = new Kafka({
clientId: 'node-kconsumer',
brokers: credentials.sslBroker.split(','),
ssl: {
rejectUnauthorized: false,
ca: [fs.readFileSync('./kConsumer/ssl/kafka.crt', 'utf-8')],
},
sasl: {
mechanism: 'plain', // scram-sha-256 or scram-sha-512
username: credentials.username,
password: tokenVal
},
});
const consumer = kafka.consumer({groupId: 'my-group1'})
await consumer.connect()
await consumer.subscribe({topic: '<subaccountid>.com.sap.dsc.ac.equipment',fromBeginning: true})
//Refer documentation or the advertisement API response for the topic name format
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
key: message.key.toString(),
value: message.value,
headers: message.headers,
})
console.log("JSON "+message.value.toJSON());
console.log("STRR "+message.value.toString());
avro().avroConvert(message.value.toString());
},
})
}
In this example as well SASL_SSL protocol is used to connect to the Kafka instance. The root certificate can also be generated using the APIs exposed in the service key of the instance.
curl https://kafka-service-broker.cf.<landscape>.
hana.ondemand.com/certs/rootCA/current > kafka.crt
The node example uses the following key dependencies:
const avro = require('avro-js');const {Kafka} = require("kafkajs");
Python:
def main():
env = AppEnv()
kafka_env = env.get_service(label='kafka')
token = getToken(kafka_env)
consumer = KafkaConsumer(<subaccountid>.com.sap.dsc.ac.equipment',
bootstrap_servers=kafka_env.credentials[u'cluster'][u'brokers'].split(','),
auto_offset_reset='earliest',
enable_auto_commit=True,
client_id='py-kconsumer',
group_id='Pyconn',
security_protocol='SASL_SSL',
ssl_cafile='./ssl/kafka.crt',
sasl_mechanism='PLAIN',
sasl_plain_username=kafka_env.credentials[u'username'],
sasl_plain_password=token,
api_version=(2, 5, 1)
)
print("subscribed") for message in consumer:
print(message)
The message can be converted using avro python library as:
reader = DataFileReader(open("./message.avro", "rb"), DatumReader())
for data in reader:
print(data)
reader.close()
Key dependencies used:
from kafka import KafkaConsumer
import avro.schema
from avro.datafile import DataFileReader,
from avro.io import DatumReader
In addition, customers can also leverage Kafka Adapter provided by Cloud Platform Integration.
Conclusion
Kafka event streaming is quite powerful in terms of getting real-time or near real-time updates for your data. AIN utilises this for providing a pathway for customers who wish to integrate with myriad set of disparate systems and helps provide consistency and efficient synchronisation of the data across distributed systems. Kafka in conjunction with Apace Avro makes integration scenarios more stable, providing better compatibility support across releases and upgrades.
See also: