Planning preparation

Select%20the%20option%20External%20Algorithim

Select the option External Algorithim

 

The end-to-end process

Steps%20at%20high%20level

Steps at high level

HTTP server

Integration with Google Vertex AI

  1. Using REST based API from Google.
  2. gRPC/gax based client/communication.
  3. SDKs provided by Google.

If you are using a middleware, you can check if option 2 is available, if yes, then either 1 or 2 could be a valid approach. If your middleware does not support gRPC/gax based communication, then you are probably left with option 1. I tried the option 3 which is using SDKs provided by Google for making the basic calls needed for the integration with the platform. We leveraged the Python SDK which was able to transfer the data between SAP and Google cloud platforms. We expect the reader to have a basic understanding of Python language. However, the steps detailed in this section are very generic and not specific for Python.

  • Install Python version 3 or above.
  • Set up an isolated Python environment. This is highly recommended per-project, its own virtual environment while writing code locally in Python. This can be done using the venv command.
  • Install the packages needed for your project. The following python packages are needed: requests, pandas, google-cloud-storage, google-cloud-aiplatform and google-cloud-bigquery. for example:
  • Now you need a JSON Key file which contains the application credentials from your service account. This key file is used to set up an environment variable called “GOOGLE_APPLICATION_CREDENTIALS”. Once you have downloaded the JSON Key file, then create an environment variable with the given name and point the path to the key file as its value. You can follow the detailed steps defined here.
  1. Create a dataset in BigQuery and a table to store training data.
  2. A separate table for scoring is also needed in the dataset.
  3. Upload the data to the new tables.
  4. We need a dataset in the Google Vertex AI to create a Model
  5. Train the model using the data from the BigQuery table.
  6. Start a batch prediction job on the model using scoring data from BigQuery.
  7. Once results are available, download the data, format and send it to SAP IBP.

BigQuery Tables

def create_bq_dataset(dataset_id) -> None:    
    from google.cloud import bigquery 

    # Construct a BigQuery client object.
    bq_client = bigquery.Client()
    
    ds = bq_client.create_dataset(dataset_id)
    print(
        "Created dataset {}.{}".format(ds.project, ds.dataset_id)
    )
# [END Create dataset in Big Query]

Once you have a dataset, then we create a table in that dataset to upload data to the Google Cloud Platform. You need a fair understanding of the data which you want to send as it reflects on the table schema. I am sending product, location and customer IDs as strings. I have a time stamp column which directly matches to the planning level defined in my planning view in SAP IBP. Its data type is timestamp. Additionally, a column of type integer is needed which would hold the values for the consensus demand quantity which is taken as an example for my forecast. This column is where the historic values are stored for the respective product. It is also the column where the Google Vertex AI would calculate the forecast predictions and write as results later. The following Python snippet was used to create the BigQuery table. I passed a parameter called the dataset_id which had a value “ibp_eft”. My table name was “demand_qty_py01”.

def create_table(table_name, dataset_id) -> None:    
    from google.cloud import bigquery 
    schema = [
        bigquery.SchemaField("prdid", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("locid", "STRING"),
        bigquery.SchemaField("custid", "STRING"),
        bigquery.SchemaField("timestamp", "TIMESTAMP"), 
        bigquery.SchemaField("cdemandQty", "INTEGER"), 
    ] 

    table_id = str(project + "." + dataset_id + "." + table_name)
    table = bigquery.Table(table_id, schema=schema)        

    # Construct a BigQuery client object.
    bq_client = bigquery.Client()
    
    table = bq_client.create_table(table)  # Make an API request.
    print(
        "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
    )
# [END bigquery_create_table]

The above snippet is self explanatory for an experienced Python programmer. We are importing the BigQuery package, creating a BigQuery client and then using the SDK based API — create_table to create a BigQuery table in that dataset. If you carefully note the variable — table_id you can see that it is a combination of the project name which you created in the beginning, the dataset name and then the table name. After you have run the snippet, you would have a success if the BigQuery service was able to parse your payload. This does not mean that the table was created. It is an asynchronous process, the BigQuery service takes a few more seconds after your call returns with a success to literally create the table in your project space. This means, if you try to send the data on the return of the above call you operation would probably fail. It is better to wait a few more seconds and then use the table name to upload the data. The following Python snippet was used to send data to the BigQuery table. I passed a parameter called the dataset_id which had a value “ibp_eft”. My table name was “demand_qty_py01”. The ibp_data variable contained the JSON format of the rows which I wanted to upload to the table.

def insert_bq_table(table_name, dataset_id, ibp_data) -> None:      
    from google.cloud import bigquery 
    # Set the table ID
    table_id = str(project + "." + dataset_id + "." + table_name)

    # Construct a BigQuery client object.
    bq_client = bigquery.Client()

    # Make an API request.
    errors = bq_client.insert_rows_json(table_id, ibp_data)  
    if errors == []:
        print("IBP data is inserted.")
    else:
        print("Errors while inserting rows: {}".format(errors))
# [END bigquery_table_insert_rows]

Vertex AI Models and training

async def create_vi_dataset(dataset_display_name, bigquery_source_uri):  
    # The AI Platform services require regional API endpoints.    
    vi_ds_client = aiplatform_v1.DatasetServiceAsyncClient(client_options=client_options)
    
    metadata_dict = {"input_config": {"bigquery_source": {"uri": bigquery_source_uri}}}
    metadata = json_format.ParseDict(metadata_dict, Value())

    # Initialize request argument(s)
    dataset = aiplatform_v1.Dataset()
    dataset.display_name = dataset_display_name
    dataset.metadata_schema_uri = "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml"
    dataset.metadata =  metadata

    request = aiplatform_v1.CreateDatasetRequest(
        parent=parent,
        dataset=dataset,
    )

    # Make the request
    operation = await vi_ds_client.create_dataset(request=request)
    print("Waiting for dataset to be created to complete...")
    
    response = await operation.result()
    print(response) 
# [END vertex_create_dataset] 

n this snippet you can see that we are using the Python SDK for Google Vertex AI. The package aiplatform_v1 contains different types of clients. We are using the Dataset service client to create a dataset in Vertex AI. It needs the URI of the BigQuery table. The URL is of the format:

bq://” + project + “.” + dataset_id + “.” + inputTableName

Sample%20table%20with%20product%2C%20location%2C%20customer%20and%20demand%20with%20planning%20level

Sample table with product, location, customer and demand with planning level

In this above table you can see two different products A & B which are having a demand of 10 and 11 on the day 1st of February. If we consider the above JSON structure then prdid would then be the time series identifier column, the timestamp would then be the time column and the cdemandQty would then be the target column for the model in Vertex AI. Below is a Python function which build this training pipeline structure, creates a pipeline service client from the Google Vertex AI SDK and then creates t training pipeline using the API.

async def create_vi_model(pipeline_display_name, model_display_name, dataset_id, p, l):  
    # set the columns used for training and their data types
    transformations = [       
        {"timestamp": {"column_name": "timestamp", "invalidValuesAllowed": "false"}},
        {"auto": {"column_name": "cdemandQty"}},
    ]

    data_granularity = {"unit": "day", "quantity": 1}

    # the inputs should be formatted according to the training_task_definition yaml file
    training_task_inputs_dict = {
        # required inputs
        "targetColumn": "cdemandQty",
        "timeSeriesIdentifierColumn": "prdid",
        "timeColumn": "timestamp",
        "transformations": transformations,
        "dataGranularity": data_granularity,
        "optimizationObjective": "minimize-rmse",
        "trainBudgetMilliNodeHours": 8000, 
        "unavailableAtForecastColumns": ['cdemandQty', 'locid', 'custid'],
        "availableAtForecastColumns": ['timestamp'],
        "forecastHorizon": 10,
    }

    training_task_inputs = json_format.ParseDict(training_task_inputs_dict, Value())

    
    # The AI Platform services require regional API endpoints.    
    vi_pipe_client = aiplatform_v1.PipelineServiceClient(client_options=client_options) 

    training_pipeline = {
        "display_name": pipeline_display_name,
        "training_task_definition": "gs://google-cloud-aiplatform/schema/trainingjob/definition/automl_forecasting_1.0.0.yaml",
        "training_task_inputs": training_task_inputs,
        "input_data_config": {
            "dataset_id": dataset_id,
            "fraction_split": {
                "training_fraction": 0.8,
                "validation_fraction": 0.1,
                "test_fraction": 0.1,
            },
        },
        "model_to_upload": {"display_name": model_display_name},
    }
    parent = f"projects/{p}/locations/{l}"

    print("Waiting model creation...")

    response = vi_pipe_client.create_training_pipeline(
        parent=parent, training_pipeline=training_pipeline
    )
    print("response:", response) 
# [END create model on VI] 

Batch prediction on Vertex AI

async def create_batch_prediction(batch_prediction_display_name, model_name, predictions_format, source_folder, target_folder, p, l):  
    # the inputs should be formatted according to the training_task_definition yaml file
    batch_prediction_job = {
        "display_name": batch_prediction_display_name, 
        "model": model_name,
        "input_config": {
            "instances_format": predictions_format,
            "bigquery_source": {"input_uri": source_folder},
        },
        "output_config": {
            "predictions_format": predictions_format,
            "bigquery_destination": {"output_uri": target_folder},
        },
    } 

    batch_prediction_inputs = json_format.ParseDict(batch_prediction_job, Value())

    
    # The AI Platform services require regional API endpoints.    
    vi_job_client = aiplatform_v1beta1.JobServiceClient(client_options=client_options)  

    parent = f"projects/{p}/locations/{l}"

    print("Waiting model creation...")

    response = vi_job_client.create_batch_prediction_job(
        parent=parent, batch_prediction_job=batch_prediction_job 
    )
    print("response:", response) 
# [END Start Batch prediction job VI] 

Predictions results to Planning View

if len(results.keys()):

                    # Output Key Figures
                    for key_figure_name, key_figure_result in results.items():
                        key_figure_data = {
                            "RequestID": request_id,
                            "GroupID": planning_object["GroupID"],
                            "SemanticKeyFigure": key_figure_name,
                            "ResultData": key_figure_result,
                        }
                        output["_AlgorithmDataOutput"].append(key_figure_data)

                    # Message
                    message = {
                        "RequestID": request_id,
                        "GroupID": planning_object["GroupID"],
                        "MessageSequence": 1,
                        "MessageType": "I",
                        "MessageText": "Okay",
                    }
                    output["_Message"].append(message)

            # Header message
            if len(results.keys()):
                msg_header = {
                    "RequestID": request_id,
                    "GroupID": -1,
                    "MessageSequence": 1,
                    "MessageType": "I",
                    "MessageText": f"{algorithm_name}",
                }
                output["_Message"].append(msg_header)
            else:
                msg_header = {
                    "RequestID": request_id,
                    "GroupID": -1,
                    "MessageSequence": 1,
                    "MessageType": "E",
                    "MessageText": f"Forecast calculation failed! Algorithm: {algorithm_name} .",
                }
                output["_Message"].append(msg_header)

            output_json = json.dumps(output)

            token_request = requests.get(RESULT_URL, headers={"x-csrf-token": "fetch", "accept": "application/json"},
                                         cookies=cookies, verify=False)

            if token_request.status_code == 200:

                result_send_post = requests.post(RESULT_URL, str(output_json), cookies=cookies,
                                                headers={"x-csrf-token": token_request.headers["x-csrf-token"],
                                                        "Content-Type": "application/json", "OData-Version": "4.0"}, verify=False)

                print(
                    f"Forecast result for id {request_id} sent back to IBP system! Status code: {result_send_post.status_code}.", flush=True)

 

Conclusion

Note: This blog was originally published in Medium

Sara Sampaio

Sara Sampaio

Author Since: March 10, 2022

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments
0
Would love your thoughts, please comment.x
()
x