Quite a number of blogs have been written about how to use SAP Data Intelligence for Machine Learning scenarios. Have a look for example to excellent and very instructive blogs of Andreas Forster. With the 2nd generation operators of SAP Data Intelligence we have got some additional facilitations to create productive training and predication pipelines. In the following I assume that a data scientist has already done his data exploration and wants to deploy the training and the prediction pipeline in an enterprise environment. Im using one of the “Drosophila”-ML use cases: the price prediction of used cars. if you like to build it by your own then you can download the data “Ebay Used Car Sales Data” from Kaggle.
The ML process consists of 2 pipelines and 2 custom operators (build as a kind of template):
- Training Pipeline
- Read training data (Standard: Table Consumer)
- Train the model (Custom Operator)
- Save the model as binary to an object store
- Prediction Pipeline
- Read data (Standard: Table Consumer)
- Read model (Currently a custom operator, but will soon replaced by a standard operator)
- Predict (Custom Operator)
- Save result
The particular advantage of using generation 2 operators is that you can use the very convenient input and output operators of the Structured Data Operators as you will seen when sew the pipelines together.
Preparation
Data
After downloading the data from Kaggle I have created a HANA table and imported the csv-file.
CREATE COLUMN TABLE "DEMO"."USEDCARS"(
"CAR_ID" INTEGER,
"BRAND" NVARCHAR(14),
"MODEL" NVARCHAR(11),
"VEHICLETYPE" NVARCHAR(10),
"YEAROFREGISTRATION" INTEGER,
"HP" INTEGER,
"FUELTYPE" NVARCHAR(6),
"GEARBOX" NVARCHAR(9),
"KILOMETER" INTEGER,
"PRICE" INTEGER
)
Dockerfile
Because I am using Python-packages that are not part of the standard Docker images I have to create a new Dockerfile and build it.
ARG DEPENDENCY_BASE_IMAGE_VERSION=2107.0.1
FROM §/com.sap.datahub.linuxx86_64/sles:${DEPENDENCY_BASE_IMAGE_VERSION}
RUN groupadd -g 1972 vflow && useradd -g 1972 -u 1972 -m vflow
USER 1972:1972
WORKDIR /home/vflow
ENV HOME=/home/vflow
RUN python3.9 -m pip install sklearn
RUN python3.9 -m pip install boto3
It is very important to run the installation with python3.9 to make the image available for generation 2 pipelines/operators. This is necessary until the next or next next release when the whole of SAP Data Intelligence has switched to python 3.9. You can tag the Docker image with
- sklearn
- boto3
Because the release DI Cloud 2113 provides no Binary File Reader. I have written an S3 Binary file reader for the time being. Hopefully this will be needless with the next next release.
Train Model
Custom Training Operator
I have designed the training operator as a template for being able to be reused for specific training operators.
- Inport: Generic Table ‘*’ in order to consume data conveyed by Structured Data Operators
- Receiving part that collects data until the last batch. There is nothing data specific in this part and results in a DataFrame.
- Training part that is specific to the task
- Output part is again generic and can be reused unaltered. It sends the model as binary to the outport.
import pickle
import io
import pandas as pd
from sklearn.linear_model import LinearRegression
df = pd.DataFrame()
def on_input(msg_id, header, data):
### GENERIC PART (Receiving Batches)
global df
tbl = data.get()
tbl_info = api.type_context.get_vtype(tbl.type_ref)
col_names = list(tbl_info.columns.keys())
if df.empty :
df = pd.DataFrame(tbl,columns = col_names)
else :
df = pd.concat([df,pd.DataFrame(tbl,columns = col_names)])
# In case of stream wait for other data
if 'com.sap.headers.batch' in header and header['com.sap.headers.batch'][1] == False:
return 1
### SPECIFIC (training)
model = LinearRegression()
model.fit(df[['YEAROFREGISTRATION','HP','KILOMETER']].values, df['PRICE'].values)
### GENERIC (output)
model_binary = pickle.dumps(model)
bstream = io.BytesIO(model_binary)
bstream.seek(0)
api.outputs.model.publish(bstream,-1,header)
api.set_port_callback("input", on_input)
You can download the operator from my personal GitHub and import it as solution: ml_train-operator
Training Pipeline
Having the training operator then the pipeline is pretty much straightforward to create:
In any case ensure to group the “Train Model”-operator because this one needs to run in a separate docker container that provides the package “sklearn”.
Prediction
Custom Operator Prediction
The custom operator for the prediction is a bit more complex because we have 2 inports:
- Model (Binary)
- Data
and we need to ensure that the model and all the batches have been loaded before the prediction is calculated. I have solved this by having separate functions called as callbacks by the ports to do the preparation. The orchestration is done via global variables: Once the model is set and all the batches are read, then the actual prediction is calculated. In this case only the latter function “process” is specific whereas the “on_model” and “on_data” are generic.
import pickle
import io
import pandas as pd
from sklearn.linear_model import LinearRegression
#
# GLOBAL VARIABLES
#
df = pd.DataFrame()
all_data_loaded = False
model = None
data_header = None
#
# MODEL INPUT
#
def on_model(msg_id, header, data):
# GENERIC PART
global model
bstream = io.BytesIO(data.get_reader().read(-1))
model = pickle.load(bstream)
if all_data_loaded:
process()
#
# DATA INPUT
#
def on_input(msg_id, header, data):
# GENERIC INPUT
global df
global all_data_loaded
global table_ref
global data_header
tbl = data.get()
tbl_info = api.type_context.get_vtype(tbl.type_ref)
col_names = list(tbl_info.columns.keys())
if df.empty:
df = pd.DataFrame(tbl,columns = col_names)
else:
df = pd.concat([df,pd.DataFrame(tbl,columns = col_names)])
# In case of stream wait for other data
data_header = header
if 'com.sap.headers.batch' in header and header['com.sap.headers.batch'][1] == False:
return 1
all_data_loaded = True
if model:
process()
api.set_port_callback("model", on_model)
api.set_port_callback("data", on_input)
#
# Custom Process
#
def process() :
# PREDICTION
df['PRICE'] = model.predict(df[['YEAROFREGISTRATION','HP','KILOMETER' ]])
# GENERIC OUTPUT
tbl = api.Table(df.values.tolist(),'mycompany.used_cars')
api.outputs.prediction.publish(tbl,header=data_header)
You can download the operator from my personal GitHub and import it as solution: ml_predict_price
Custom Operator read_S3
The custom operator that reads from S3 is pretty much straightforward and won’t further comment on it. You can download and import the operator as solution: read_s3.
Prediction Pipeline
Now we have all the components for our prediction pipeline:
Again we have to group the custom operators because these require non-standard Python packages that are not part of the Docker-container that provides the code for the Structured Data Operators.
Conclusion
Again you see how much the generation 2 operators eases our life when creating pipelines. Of course there is some preparation and learning required but once it is done you might be more productive and producing more robust data pipelines than before.