04_training.py =============== We will create a ``04_training.py``, which contains the logic for the trainer of the network. The trainer provides random samples of wind speed and target distance, the trebuchet should aim at. After running the information through the forward pass the trainer will evaluate the result of the trebuchet and provide a gradient of the difference between actual and achieved target as feedback. We first import a necessary libraries. .. code-block:: python import socket import numpy as np import uuid import torch from torch import nn from swergio import Client, Trigger, MESSAGE_TYPE, MODEL_STATUS from swergio_toolbox.objects import MutableNumber We set the component name to *trainer*, which is use in the communication. We need to specify the IP and the port of the server as well as the message format and the header length. All of these information have to stay the same across the server and all clients. .. code-block:: python COMPONENT_NAME = 'trainer' PORT = 8080 SERVER = socket.gethostbyname(socket.gethostname()) FORMAT = 'utf-8' HEADER_LENGTH = 10 For the trainer we first define a constant variable TRAIN_STEPS with the number of total training steps we want to run through once the script is executed. We then initiate a mutable number as step counter to track how many steps we've already been through. This variable needs to be mutable since we will update the value in side the handler function context. Including the step counter we can initialize the swergio client. Finally we define a loss function, which we will use to calculate the gradient between actual and achieved target as well as the memory dictionary, which we will use to store the send targets until we get the results. .. code-block:: python TRAIN_STEPS = 1000 step_counter = MutableNumber(0) client = Client(COMPONENT_NAME,SERVER,PORT,FORMAT,HEADER_LENGTH,step_counter = step_counter) l_fn = nn.MSELoss() memory = {} We define a function to generate a new message containing a random sampled wind speeds and target for the trebuchet. The random target is can be between 20 and 100. While the wind speed is sampled from a normal distribution with factor 5 and can therefore be negative or positive. After constructing the message as dictionary and adding the samples to the *DATA* entry, the function also stores the targe value in the component memory dict. This value is retrieved and used once we get the results from the trebuchet. To identify the origin of the message throughout the whole process, we set a ROOT_ID which will be passed from message to message. .. code-block:: python def new_msg(): nr_of_samples = 100 target = np.random.uniform(20,100,nr_of_samples) wind = np.random.randn(nr_of_samples)*5 #np.random.uniform(0,5,nr_of_samples) data_id = uuid.uuid4().hex d = {'ROOT_ID':data_id, 'ID':uuid.uuid4().hex, 'TYPE': MESSAGE_TYPE.DATA.FORWARD.id, 'DATA': np.stack([wind,target],axis = 1).tolist(), 'TO_ROOM': 'input', 'MODEL_STATUS': MODEL_STATUS.TRAIN.id } memory[data_id] = target return d Once the samples are processed by the trebuchet, the trainer needs to give feedback how accurate the results are. We therefore use a handler that is triggered by incoming messages in the *output* room from type DATA.FORWARD. We extract the achieved target from the *DATA* entry in the message and retrieve the actual target from our internal memory based on the ROOT_ID of the message (same we provide in new_msg). After cleaning up the memory, we use both values to calculate a loss between the actual and achieved target. With the pytorch autograd functions we can also calculate the gradient for this loss in respect to the achieved target. This gradient information we will pass down to the trebuchet as our feedback by adding it to the message we send back to the *output* room as DATA.GRADIENT type. .. code-block:: python def backward(msg): pred = torch.FloatTensor(msg["DATA"]) if msg["ROOT_ID"] not in memory.keys(): return y = memory[msg["ROOT_ID"]] y = torch.tensor(y).float() memory.pop(msg["ROOT_ID"]) l = l_fn(pred,y) l_gradient = torch.autograd.functional.jacobian(lambda x: l_fn(x,y),pred) l_gradient = l_gradient.tolist() return {"ID":msg["ID"],"DATA":l_gradient} client.add_eventHandler(backward,MESSAGE_TYPE.DATA.GRADIENT,responseRooms='output',trigger=Trigger(MESSAGE_TYPE.DATA.FORWARD,'output')) To automatically send a new batch of sample data to the network once the feedback of the prior batch is handled, we create a separate handler. The handler function itself just updates the step counter and checks if we did not exceed the total training steps. If that's the case it will send a new generated message including a new batch. The handler is triggered once it receives a DATA.GRADIENT message in the *input* room, which will be the final gradient send by the control model. The response message is sent to the *input* room, but as a new DATA.FORWARD message. .. code-block:: python def new_data(msg, step_counter): step_counter += 1 if step_counter.value < TRAIN_STEPS: d = new_msg() return d client.add_eventHandler(new_data,MESSAGE_TYPE.DATA.FORWARD,responseRooms='input',trigger=Trigger(MESSAGE_TYPE.DATA.GRADIENT,'input')) At last the script will send the first message with a sampled batch to the *input* room. Once we sent the message the client will start listening for responses from the other components. .. code-block:: python client.send(new_msg()) client.listen()