At Cmotions we love to learn from each other and that is why we regularly collaborate on internal projects, in which we can express our creativity, curiosity and eagerness to learn. In this article we want to share what we did in the ‘Snake’ project, where we learned the computer to play snake using Reinforcement Learning. Never heard of Reinforcement Learning before? Not to worry, we wrote an article explaining this and more! When reading the article here, we assume you've either read that previous article, or already have experience with Reinforcement Learning, so we won't explain that in this article. Here we focus on the code we have written to learn our computer to play Snake (remember, those good old days with your Nokia mobile phone).
Small reminder: how to play Snake¶
The goal is simple: being a snake you navigate through a square playground looking for food. As you eat more food, the difficulty increases because your snake will grow longer, and you cannot crash into yourself. Snake is a good choice for a first introduction to reinforcement learning, because it lets us define the environment, agent, states, actions and rewards relatively simple.
Our game environment can be defined as a grid of points, with a piece of food at a random coordinate in that grid. In our version of Snake there are no walls, which means that the only way the snake can die is by a collision into its own body. Our agent - the snake - is encoded by a list of coordinates that are covered by the snake, and the state is the current representation of the environment. For this article, we have experimented with different representations of the state, incorporating different sources of information such as the direction of the snake and the distance to the food. Additionally, we have experimented with adding spatial features extracted by convolutional filters. Convolutional filters extract spatial information of an image by multiplying regions of pixels in an input with weights that are learned by a neural network. This results in a feature mapping that encodes spatial information of the playground.
As the snake navigates through the environment, it can either move up, down, left or right, which are the actions that the agent can take. However, according to the rules of the game, the snake cannot move in the opposite direction of the current direction. Therefore, given the state, we can only take three out of four actions. This subset of actions is what we call the action space. Our goal is to, given a certain state, choose the action that maximizes the future reward, which can consist of multiple features. In the first place, our snake is rewarded when it eats food and the score increases. Conversely, the snake is negatively rewarded when it collides into its own body. Additionally, we have included a small penalty to the reward when the snake moves further away from the food, and we added a small positive reward when the snake moves closer to the food. This encourages the snake to actually pursue eating food instead of only avoiding a collision.
Now that we know what the actions, rewards and states in our situation are, it’s time to have a look at how we are going to incorporate this in a model that can be trained.
The models we implemented¶
Since we were interested in different setups for Reinforcement Learning, and how that would influence both performance and efficiency, we decided to train two different versions of agents to play Snake. The first model being a bit more simple and the second model the more advanced version.
We will describe both models very briefly here, but please read our previous article for more in-depth information on the models
Model V1
The features used in the first model:
- X-coordinate of the food minus the x-coordinate of the snake's head.
- Y-coordinate of the food minus the y-coordinate of the snake's head.
- Dummy variable: if there is snake cell below head
- Dummy variable: if there is snake cell above head
- Dummy variable: if there is snake cell left of head
- Dummy variable: if there is snake cell right of head
For this first version of the model we insert fairly simple input features that consist of 'which direction the food is' and 'if the snake's body is near its head', we do not want to overcomplicate our model. Therefore, we choose to build a neural network consisting of two layers with 16 neurons and end with a dense layer that forecasts the reward for each action. This neural network will then be used as our Q function. For our first version of the model we find that after around 500.000 training iterations, we don't see the rewards improve anymore. This takes around 30 to 60 minutes of training, depending on the processor speed.
Model V2
The second model build on the first model and mainly improves by using a convolutional neural network to understand the full picture of the game such that it can make strategic choices so that the snake will not trap itself. We do this by creating a combined network of the input features from the first model, that gives very precise information about dangers and directions, and combine them with the snake's snapshot image such that it will give the snake more strategic insights about the state of the game. This snapshot image is the current state of the game seen as an array, each point in the array contains a tuple of length three, that will be used to indicate if there is a snake body, snake head or food at that point.
Once we have our array containing the snake's game snapshot, we will let this go through two separate convolutional layers containing 8 filters, a kernel with size=(4, 4) and strides=2 where the padding is 'same' which makes sure that we also account for the border pixels in our model. After the input has gone through both convolutional layers, we will have extracted signals about the game. Flattening this array of signals makes us able to use these signals in a normal dense layer. Next, after flattening the array, we use two dense layers of 256 and 64 neurons to further extract signals about the state of the game. For the normal features we use one dense layer of 16 neurons to extract signals about immediate threats and the position of the food. After that we concatenate both the normal signals and our convolutional signals together such that we can use them simultaneously in a dense layer which will finally be used to predict the Q-values for each action.
For this second version of the model, we find that it converges after approximately 5 million training iterations which take around 4-7 hours of training, depending on the processor speed.
Snake Game code¶
Now it's time to get our hands dirty and define our snake game in Python. We start with the basics: defining the environment and the variables we will need for that definition. A bit further in the script we will explain about the two different models we define and train. But first things first. Before we start, we load all the necessary libraries and define a dictionary that can be used to translate the directions for the snake into action numbers. Furthermore, we create Point
which is a named tuple which we will later use to identify the food and head of the snake. Finally, we define the block size as 20, which is the size of a pixel in our snake's game. These variables will then be used to build our snake game.
%%script false --no-raise-error
import numpy as np
import tensorflow as tf
import keras
from keras.layers import Conv2D, Dense, Flatten, ReLU, MaxPool2D, concatenate
from keras.optimizers import Adam
import time
import random
import pygame
from collections import namedtuple
direction_dict = {'Right': 0, 'Left': 1, 'Up': 2, 'Down': 3}
Point = namedtuple('Point', 'x, y')
BLOCK_SIZE = 20
Game Class¶
Below, we create a class SnakeGameAI
in which we encode the features of our game. First, we initialize the class by defining the width and height of the snake game, given that each block (pixel) of the game has a size of 20 (blocksize). Next we set our variable game_over
to false, which later can be used in other funtions. Finally, we call our reset function which will be given in the next code chunk.
%%script false --no-raise-error
class SnakeGameAI:
def __init__(self, w=640, h=480):
self.w = w
self.h = h
self.game_over = False
self.reset()
The reset()
method will be used to reset your game (no surprise here), which means we set the direction and location of the snake, which is located exactly in the middle of our map, and we place the food using the _place_food()
method. Moreover, the frame_iteration attribute, which describes how many iterations the current game has had, will be set to zero. The body of the snake will be defined by a list of points containing the head and the rest of the body of the snake.
%%script false --no-raise-error
def reset(self):
# init game state
self.direction = 0 # we start moving to the right
self.head = Point(self.w / 2, self.h / 2) # our snake starts in the middle of the field (width/2 and height/2)
# the snakes body begins left from the head
self.snake = [self.head,
Point(self.head.x - BLOCK_SIZE, self.head.y),
Point(self.head.x - (2 * BLOCK_SIZE), self.head.y)]
self.score = 0
self.food = None
self._place_food()
self.frame_iteration = 0
self.game_over = False
For the _place_food()
method we choose a random value for x and y based on the total width and height of the map, then define the food as this point on the map. If the food is placed within the snake we place a new food such that we make sure that the food is never placed inside the snake.
%%script false --no-raise-error
def _place_food(self):
x = random.randint(0, (self.w - BLOCK_SIZE) // BLOCK_SIZE) * BLOCK_SIZE
y = random.randint(0, (self.h - BLOCK_SIZE) // BLOCK_SIZE) * BLOCK_SIZE
self.food = Point(x, y)
if self.food in self.snake:
self._place_food()
Now we can define the play_step()
method, in which we let the snake play an iteration given an action. We start by increasing the frame_iteration
by one, so that we keep track of the number of iterations in the game. Next we call the _move()
method, given the action for the snake that determines the direction. Then we use the is_collision()
function to determine wether the snake died or not. Finally, we return three attributes: the reward
, a boolean done
to indicate if the snake died, and self.score
, which is the current games' score.
%%script false --no-raise-error
def play_step(self, action):
self.frame_iteration += 1
# Move
reward = self._move(action) # update the head
# Check if game over
if self.is_collision():
done = True
reward = -1
self.reset()
else:
done = False
# Return game over and score
return reward, done, self.score
The is_collision()
function checks whether the head of the snake is within the body of the snake, or in other words, if the head is in the list of all the snake's points, where we ignore the first index as this is the snake's head.
%%script false --no-raise-error
def is_collision(self, pt=None):
if self.head in self.snake[1:]:
self.game_over = True
return True
else:
return False
Now we will give a description of the _move()
function. First we set our reward
variable to zero, and set the direction
to the chosen action. Next, we will calculate the current absolute distance to the food based on the position of the snake's head. Based on the given direction (action chosen) we calculate the new x and y coordinates of the snake's head, where we make sure that if the snake is out of the bounds of the game it will be on the other side of the map. As we have just gathered the new location of the snake, we insert the new position in the snake's body. To make sure that our model converges more smoothly we calculate if the new location of the snake is closer to the food then the previously calculated distance_to_food_
. If the snake moved closer or further away from to the food we change the reward
to 0.001 and -0.001 respectively. Next, we check if the snake has eaten the food, which changes the reward to 1 and places the new food. If the snake did not eat the food, we will have to remove the last part of the snake's body as it did not grow but it only moved. Finally, we return the reward obtained by the given action.
%%script false --no-raise-error
def _move(self, action):
reward = 0
self.direction = action
distance_to_food_x = abs(self.food.x - self.head.x)
distance_to_food_y = abs(self.food.y - self.head.y)
distance_to_food_ = distance_to_food_x + distance_to_food_y
x = self.head.x
y = self.head.y
if self.direction == 0:
x += BLOCK_SIZE
elif self.direction == 1:
x -= BLOCK_SIZE
elif self.direction == 2:
y += BLOCK_SIZE
elif self.direction == 3:
y -= BLOCK_SIZE
# cross the border of the map and enter on the other side
if x > self.w:
x = 0
if x < 0:
x = self.w
if y > self.h:
y = 0
if y < 0:
y = self.h
# moving head of snake
self.head = Point(x, y)
self.snake.insert(0, self.head)
# reward for going in the right direction
distance_to_food_x = abs(self.food.x - self.head.x)
distance_to_food_y = abs(self.food.y - self.head.y)
distance_to_food = distance_to_food_x + distance_to_food_y
if (distance_to_food_ - distance_to_food) > 0:
reward = 0.001
else:
reward = -0.001
if self.head == self.food:
self.score += 1
reward = 1
self._place_food()
else:
self.snake.pop()
return reward
Next, we have to define the get_action_space()
function which will return all possible actions. As the snake is not able to make a 180 degrees turn, we have to return the possible actions given the current direction of the snake. Using the direction dictionary we define the possible choices as a list of all directions without the opposite of the current direction. Then, using the direction dictionary again, we transform the choices to the possible action space where the total action space is given by [0, 1, 2, 3] respectively to the choices [Right, Left, Up, Down].
%%script false --no-raise-error
def get_action_space(self):
choices = ['Right', 'Left', 'Up', 'Down']
# if direction right, can't go left
if self.direction == direction_dict['Right']:
choices = [x for x in choices if x != 'Left']
# if direction left, can't go right
if self.direction == direction_dict['Left']:
choices = [x for x in choices if x != 'Right']
#if direction up, can't go down
if self.direction == direction_dict['Up']:
choices = [x for x in choices if x != 'Down']
#if direction down, can't go up
if self.direction == direction_dict['Down']:
choices = [x for x in choices if x != 'Up']
action_space = [direction_dict[x] for x in choices]
return action_space
In order to use the current state of the game we define the function get_conv_state()
that takes a snapshot of the games current state by creating an array representing alls blocks of the map that indicates if there is a snake cell, snake head or food on that specific block. In the V2 model's description we will give a more enhanced explanation on this manner.
%%script false --no-raise-error
def get_conv_state(self):
state = np.zeros((int(self.w/BLOCK_SIZE), int(self.h/BLOCK_SIZE), 3))
for snake_cell in self.snake:
state[int(snake_cell.x/BLOCK_SIZE) - 1, int(snake_cell.y/BLOCK_SIZE) - 1, 0] = 1
state[int(self.head.x/BLOCK_SIZE) - 1, int(self.head.y/BLOCK_SIZE) - 1, 1] = 1
state[int(self.food.x/BLOCK_SIZE) - 1, int(self.food.y/BLOCK_SIZE) - 1, 2] = 1
return state
For our first model's version we define get_state()
as the method that creates the input features (state) that will be used as the input for our model. The input features are stored in an array containing the difference in the location of the food and the snake's head for both the x and y axis, and dummy variables for whether there is a snake's body below, above, left or right from its head.
def get_state(self):
# state = [difference in location food and head x axis,
# difference in location food and head y axis,
# dummy if there is snake cell below head
# dummy if there is snake cell above head
# dummy if there is snake cell left of head
# dummy if there is snake cell right of head]
state = [int(self.food.x / BLOCK_SIZE) - int(self.head.x / BLOCK_SIZE),
int(self.food.y / BLOCK_SIZE) - int(self.head.y / BLOCK_SIZE),
int(any([(snake_cell.y == self.head.y - BLOCK_SIZE) for snake_cell in self.snake if
snake_cell.x == self.head.x])),
int(any([(snake_cell.y == self.head.y + BLOCK_SIZE) for snake_cell in self.snake if
snake_cell.x == self.head.x])),
int(any([(snake_cell.x == self.head.x - BLOCK_SIZE) for snake_cell in self.snake if
snake_cell.y == self.head.y])),
int(any([(snake_cell.x == self.head.x + BLOCK_SIZE) for snake_cell in self.snake if
snake_cell.y == self.head.y]))]
return np.array(state)
We start training our model by exploring the actions, which means choosing random actions, so the model will have lots of data of suboptimal choices. In order to help the model converge quicker, we want to show our model what possible good choices are, which is why we created the get_example_action()
function to give an example action that will give a possible good action. To do so, we check in which direction the food is placed, and if for this choice there is a snake's body blocking this direction. If there is an action that brings the snake closer to the food without dying we choose that action, else we take a random action from the current action space.
%%script false --no-raise-error
def get_example_action(self):
actions = []
state = self.get_state()
# if food is on the right and no snake cell on right
if state[0] > 0 and state[5] == 0:
actions.append(0)
# if food is on the left and no snake cell on left
elif state[0] < 0 and state[4] == 0:
actions.append(1)
# if food is up and no snake cell up
if state[1] > 0 and state[3] == 0:
actions.append(2)
# if food is down and no snake cell down
elif state[1] < 0 and state[2] == 0:
actions.append(3)
if len(actions) == 0:
actions = self.get_action_space()
return random.choice(actions)
Replay Buffer¶
In order to store and sample the observations from the states and rewards gained by playing the snake game, we have to create a ReplayBuffer
class that allows us to do so. Note that we already incorporate the convolution state memory, which we will later use for the v2 version. First create zero arrays in the appropriate size given the memory size and the dimensions of the observations. As for the training of the model we need the current state, new state, action, reward and the boolean that indicates if the game is done, we create for each of these such an array.
To store observations we create the store_transition
function that stores all our needed observations about an action in our created arrays. Furthermore, we create the sample_buffer
function that draws a random batch of observations out of our memory that can be used to train our model.
class ReplayBuffer(object):
def __init__(self, max_size, input_dims_conv=1, input_dims):
self.mem_size = max_size
self.mem_cntr = 0
self.conv_state_memory = np.zeros((self.mem_size, *input_dims_conv),
dtype=np.float32)
self.state_memory = np.zeros((self.mem_size, input_dims),
dtype=np.float32)
self.new_conv_state_memory = np.zeros((self.mem_size, *input_dims_conv),
dtype=np.float32)
self.new_state_memory = np.zeros((self.mem_size, input_dims),
dtype=np.float32)
self.action_memory = np.zeros(self.mem_size, dtype=np.int32)
self.reward_memory = np.zeros(self.mem_size, dtype=np.float32)
self.terminal_memory = np.zeros(self.mem_size, dtype=bool)
def store_transition(self, conv_state, state, action, reward, conv_state_, state_, done):
index = self.mem_cntr % self.mem_size # if the maximum memory size is exceeded, the snake starts to forget the oldest memories
self.state_memory[index] = state
self.new_state_memory[index] = state_
self.conv_state_memory[index] = conv_state
self.new_conv_state_memory[index] = conv_state_
self.action_memory[index] = action
self.reward_memory[index] = reward
self.terminal_memory[index] = done
self.mem_cntr += 1
def sample_buffer(self, batch_size):
max_mem = min(self.mem_cntr, self.mem_size)
batch = np.random.choice(max_mem, batch_size, replace=False)
states = self.state_memory[batch]
conv_states = self.conv_state_memory[batch]
actions = self.action_memory[batch]
rewards = self.reward_memory[batch]
states_ = self.new_state_memory[batch]
conv_states_ = self.new_conv_state_memory[batch]
terminal = self.terminal_memory[batch]
return conv_states, states, actions, rewards, conv_states_, states_, terminal
The Agent¶
Here we define the Agent by creating the class Agent
, that will describe our models paramaters, contains a replay buffer and contains the models q_eval
and q_next
based on our tensorflow class that we will later define. Note that we have a different replay buffer and different q_eval
and q_next
for each model version.
class Agent:
def __init__(self, gamma, epsilon, lr, n_actions, input_dims_conv=None, input_dims,
mem_size, batch_size, eps_min=0.01, eps_dec=5e-7,
replace=1000, model_version='v1'):
self.gamma = gamma
self.epsilon = epsilon
self.lr = lr
self.n_actions = n_actions
self.input_dims = input_dims
self.input_dims_conv = input_dims_conv
self.batch_size = batch_size
self.eps_min = eps_min
self.eps_dec = eps_dec
self.replace_target_cnt = replace
self.action_space = [i for i in range(n_actions)]
self.learn_step_counter = 0
self.model_version = moder_version
if self.model_version == 'v1':
self.memory = ReplayBuffer(mem_size, input_dims)
self.q_eval = DeepQNetwork(input_dims, n_actions)
self.q_eval.compile(optimizer=Adam(learning_rate=lr))
self.q_next = DeepQNetwork(input_dims, n_actions)
self.q_next.compile(optimizer=Adam(learning_rate=lr))
else:
self.memory = ReplayBuffer(mem_size, input_dims_conv, input_dims)
self.q_eval = DeepQNetwork(input_dims_conv, input_dims, n_actions)
self.q_eval.compile(optimizer=Adam(learning_rate=lr))
self.q_next = DeepQNetwork(input_dims_conv, input_dims, n_actions)
self.q_next.compile(optimizer=Adam(learning_rate=lr))
In order to save and load our models we create the save_models()
and load_models()
functions which saves and loads the weights of our network.
def save_models(self):
self.q_eval.save_weights('model'+self.model_version+'.h5')
print('... models saved successfully ...')
def load_models(self):
self.q_eval.load_weights('model'+self.model_version+'.h5')
self.q_next.load_weights('model'+self.model_version+'.h5')
print('... models loaded successfully ...')
To use the replay buffer from our Agent
class, we create the store_transition
and sample_memory
functions which use the already defined functions from our replay buffer. Note that for the sampling of data we want to convert our observations to a tensor such that we can use it to train our model.
def store_transition(self, conv_state, state, action, reward, conv_state_, state_, done):
self.memory.store_transition(conv_state, state, action, reward, conv_state_, state_, done)
def sample_memory(self):
conv_state, state, action, reward, new_conv_state, new_state, done = self.memory.sample_buffer(self.batch_size)
conv_states = tf.convert_to_tensor(conv_state)
states = tf.convert_to_tensor(state)
rewards = tf.convert_to_tensor(reward)
dones = tf.convert_to_tensor(done)
actions = tf.convert_to_tensor(action, dtype=tf.int32)
conv_states_ = tf.convert_to_tensor(new_conv_state)
states_ = tf.convert_to_tensor(new_state)
return conv_states, states, actions, rewards, conv_states_, states_, dones
As we want our Agent
to choose actions for us, we create the choose_action
function. Based on the epsilon we want to let the agent decide whether it will explore a random action to learn whether doing something different than the current model prescribes leads to higher rewards or use the model to choose the best action based on its information.
def choose_action(self, state, action_space):
if np.random.random() > self.epsilon:
if self.model_version=='v1':
state = tf.convert_to_tensor([state])
else:
state = (tf.convert_to_tensor([state[0]]), tf.convert_to_tensor([state[1]]))
# evaluate all actions and pick the one with the highest estimated Q value
actions = self.q_eval(state)
sorted_actions = tf.argsort(actions, axis=1).numpy()[0]
action = sorted_actions[-1]
if action in action_space:
return action
else:
action = sorted_actions[-2]
return action
else:
action = np.random.choice(action_space)
return action
As we use a Double Deep Q-Network to make sure that the training of our model remains stable, we have to replace our q_next
model with our q_eval
after the replace_target_cnt
amount of training iterations. Therefore we create the replace_target_network
function that will do so when the learn_step_counter
modulus the replace_target_cnt
is zero.
def replace_target_network(self):
if self.learn_step_counter % self.replace_target_cnt == 0:
self.q_next.set_weights(self.q_eval.get_weights())
In the beginning of the model training we are exploring, which means we try out random actions to see what the reward is for each action. However, after each training iteration we want to decrease the epsilon that represents the probability of exploring. We define the function decrement_epsilon()
to decrease the epsilon when it is larger than the minimum value for epsilon eps_min
.
def decrement_epsilon(self):
self.epsilon = self.epsilon - self.eps_dec \
if self.epsilon > self.eps_min else self.eps_min
The last function of our Agent
class will be the learn()
function which will be used to train the model. This function will only work if the mem_cntr
is larger than the batch_size, as we will need at least one batch to train the model. We sample data with our sample_memory()
function. Based on the model's version we define the input for the models. Then using the Bellman equation, which is described in our previous article [LINK], we calculate the Q-values and use an MSE loss function to calculate new weights for our model that predict the Q-values for each action given our state.
def learn(self):
if self.memory.mem_cntr < self.batch_size:
return
self.replace_target_network()
conv_states, states, actions, rewards, conv_states_, states_, dones = self.sample_memory()
indices = tf.range(self.batch_size, dtype=tf.int32)
action_indices = tf.stack([indices, actions], axis=1)
if self.model_version == 'v1':
eval_input = states
next_input = states_
else:
eval_input = (conv_states, states)
next_input = (conv_states_, states_)
with tf.GradientTape() as tape:
q_pred = tf.gather_nd(self.q_eval(eval_input), indices=action_indices)
q_next = self.q_next(next_input)
max_actions = tf.math.argmax(q_next, axis=1, output_type=tf.int32)
max_action_idx = tf.stack([indices, max_actions], axis=1)
q_target = rewards + \
self.gamma*tf.gather_nd(q_next, indices=max_action_idx) *\
(1 - dones.numpy())
loss = keras.losses.MSE(q_pred, q_target)
params = self.q_eval.trainable_variables
grads = tape.gradient(loss, params)
self.q_eval.optimizer.apply_gradients(zip(grads, params))
self.learn_step_counter += 1
self.decrement_epsilon()
Learning and Discount Rate¶
Now we are ready to decide on the learning rate and discount rate for our first model. As we do not want the model to converge too quickly to a suboptimal solution, we choose a fairly small learning rate of $ \alpha=0.001 $. Furthermore, we want our Snake to have a long term view, therefore we set the gamma close to one. However, it should not be too close to one, as the present steps are still slightly more important than those in the far future (and the snake cannot die of overeating anyway). Additonally, the model trains better with smaller Q-values, so discounting future reward will help this training process. Therefore, we choose a discount rate of $ \gamma=0.9 $, to be an optimal trade-off between the two. Note, that these values are chosen arbitrarily and not optimal.
%%script false --no-raise-error
class DeepQNetwork(keras.Model):
def __init__(self, input_dims, n_actions):
super(DeepQNetwork, self).__init__()
self.fc1 = Dense(16, activation='relu', input_shape=(None, input_dims))
self.fc2 = Dense(16, activation='relu')
self.action_layer = Dense(n_actions, activation=None)
def call(self, state):
x = self.fc1(state)
x = self.fc2(x)
x = self.action_layer(x)
return x
Parameter settings¶
Now that we have defined all necessary classes and functions, we can initiate our game environment with our SnakeGameAI
class. First, we want to set all our parameters. We will set our gamma to 0.9, which is referring to our discount rate from the bellman equation. As we have not trained the model yet, we start with exploring only, hence an epsilon of 1. The learning rate is set to 0.001 and furthermore we set the memory size to 100.000 observations and the batch size to 32.
Now we create our DQN agent by initiating our Agent
class given our parameters. Note that this is for our first models version, such that we do not have to specify the model's version.
game_env = SnakeGameAI()
gamma = 0.9
epsilon = 1
lr = 0.001
n_actions = 4
mem_size = 100000
block_size = 20
input_dims = 6
batch_size = 32
dqn_agent = Agent(gamma, epsilon, lr, n_actions, input_dims,
mem_size, batch_size, eps_min=0.1, eps_dec=1e-5,
replace=200)
Training the first model¶
Once the model is set, we want to train the model until the model converges to an optimum, which means that we do not see the average reward improve anymore after a certain number of training iterations. To do so, we train per 100.000 iterations and print the average rewards, times died and the max score per 10.000 iterations. Furthermore, we help the model converge quicker by giving example actions for 20% of the iterations (when the loopnumber modulus 1000 is higher then 800). This way, the model has enough data about how to gain rewards such that these action will gain higher Q-values.
For our first version of the model we find that after running this loop five times, we don't see the rewards improve anymore. This takes around 30-60 minutes of training, depending on the processor speed.
%%script false --no-raise-error
loop_nr = 0
reward_list = []
max_score = 0
while loop_nr < 100000:
loop_nr += 1
state = game_env.get_state()
action = dqn_agent.choose_action(state, game_env.get_action_space())
reward, done, score = game_env.play_step(action)
max_score = max(max_score, score)
new_state = game_env.get_state()
if loop_nr % 1000 > 800:
#for proportion of the time help the algorithm by giving example actions
action = game_env.get_example_action()
dqn_agent.store_transition(state=state, action=action, reward=reward, state_=new_state, done=done)
dqn_agent.learn()
reward_list.append(reward)
if loop_nr % 10000 == 0:
# per 10000 training iterations check the average rewards and scores of the game
print('avg rewards: {}'.format(np.mean(reward_list)))
print('food eaten: {}'.format(sum([x for x in reward_list if x == 1])))
print('died: {}'.format(sum([x for x in reward_list if x == -1])))
print('max score: {}'.format(max_score))
print('current eps: {}'.format(dqn_agent.epsilon))
reward_list = []
dqn_agent.save_models()
Initalizing the second model¶
%%script false --no-raise-error
class DeepQNetwork(keras.Model):
def __init__(self, input_conv_dims, input_dims, n_actions):
super(DeepQNetwork, self).__init__()
input_conv_shape = (None, input_conv_dims[0], input_conv_dims[1], input_conv_dims[2])
self.conv1 = Conv2D(8, (4, 4), strides=2, activation='relu', padding='same', input_shape=input_conv_shape)
self.conv2 = Conv2D(8, (4, 4), strides=2, activation='relu', padding='same')
self.flat = Flatten()
self.fc1 = Dense(256, activation='relu')
self.fc2 = Dense(64, activation='relu')
self.fc3 = Dense(16, activation='relu', input_shape=(None, input_dims))
self.fc4 = Dense(16, activation='relu')
self.action_layer = Dense(n_actions, activation=None)
def call(self, states):
conv_state, state = states
#conv block
x1 = self.conv1(conv_state)
x1 = self.conv2(x1)
x1 = self.flat(x1)
x1 = self.fc1(x1)
x1 = self.fc2(x1)
#features block
x2 = self.fc3(state)
#final block
x = concatenate([x1, x2])
x = self.fc4(x)
x = self.action_layer(x)
return x
Now that we use our multi-input (v2) model, we have to specify the input dimensions of the convolutional part which is given by the shape (gamewidth, gameheight, 3). Next, we initiate our Agent
class where we also specify the model_version to be 'v2'. Finally, we want to build our model using the correct input shape given that we have a multi-input model.
%%script false --no-raise-error
game_env = SnakeGameAI()
gamma = 0.9
epsilon = 1
lr = 0.001
n_actions = 4
mem_size = 100000
block_size = 20
input_dims = 6
input_dims_conv = (int(game_env.w / block_size), int(game_env.h / block_size), 3)
batch_size = 32
dqn_agent = Agent(gamma, epsilon, lr, n_actions, input_dims_conv, input_dims,
mem_size, batch_size, eps_min=0.1, eps_dec=1e-5,
replace=200, model_version='v2')
dqn_agent.q_next.build(input_shape=[(None, ) + input_dims_conv, (None, input_dims)])
dqn_agent.q_eval.build(input_shape=[(None, ) + input_dims_conv, (None, input_dims)])
Training the second model¶
Now, we want to train our multi-input model the same way until the model converges to an optimum where the average reward is not improving anymore. As we have a much larger network containing convolutional filters, it will take the algorithm a lot longer to train. Therefore, we set the training loop to 500.000 iterations and print the average rewards, times died and max score per 500.000 iterations. Again we help the model by giving example actions for 20% of the iterations. For the second version of the model we find that it converges after approximately 5.000.000 training iterations, which takes around 4-5 hours of training, depending on the processor speed.
%%script false --no-raise-error
loop_nr = 0
reward_list = []
max_score = 0
while loop_nr < 500000:
loop_nr += 1
state = (game_env.get_conv_state(), game_env.get_state())
action = dqn_agent.choose_action(state, game_env.get_action_space())
reward, done, score = game_env.play_step(action)
max_score = max(max_score, score)
new_state = (game_env.get_conv_state(), game_env.get_state())
if loop_nr % 1000 > 800:
action = game_env.get_example_action()
dqn_agent.store_transition(conv_state=state[0], state=state[1], action=action,
reward=reward, conv_state_=new_state[0], state_=new_state[1], done=done)
dqn_agent.learn()
reward_list.append(reward)
if loop_nr % 50000 == 0:
print('avg rewards: {}'.format(np.mean(reward_list)))
print('food eaten: {}'.format(sum([x for x in reward_list if x == 1])))
print('died: {}'.format(sum([x for x in reward_list if x == -1])))
print('max score: {}'.format(max_score))
print('current eps: {}'.format(dqn_agent.epsilon))
reward_list = []
dqn_agent.save_models()
Comparing the models¶
Finally, we come to the most important part of the project where we want to assess the performance of our models. In order to validate this performance we want our models to play 100 games until they die. Then we are going to compare the average score and the maximum score out of these 100 games to compare the models. To make sure that we are not exploring actions, which means that the algorithm is randomly taking actions, we set the epsilon and the minimum epsilon to zero. Then the code chunk for the validation of the model is given by:
model_v = 'v1' # or v2
dqn_agent.eps_min = 0.0
dqn_agent.epsilon = 0.0
rewards_list = []
score_list = []
game_env.reset()
dones_count = 0
max_score = 0
current_game_score = 0
while dones_count < 100:
if model_v == 'v1':
state = game_env.get_state()
elif model_v == 'v2'
state = (game_env.get_conv_state(), game_env.get_state())
else:
print('no model version specified')
pass
action = dqn_agent.choose_action(state, game_env.get_action_space())
reward, done, score = game_env.play_step(action)
max_score = max(max_score, score)
current_game_score = max(current_game_score, score)
if done:
dones_count += 1
score_list.append(current_game_score)
current_game_score = 0
rewards_list.append(reward)
print('avg score: {}'.format(np.mean(score_list)))
print('max score: {}'.format(max_score))
Conclusion¶
Of the 100 games played to validate the models we find the following average score and maximum score for each model:
v1 model | v2 model | |
---|---|---|
avg score | 22.28 | 27.16 |
max score | 47 | 72 |
This shows that our second model definitely outperforms our first model, but keep in mind, it also takes a lot longer to train. While the first model's results aren't all that bad. We really see a clear trade-off between performance and required time to reach an optimum. In practice it is always difficult to decide which is most important: efficiency or accuracy. There is not one single answer to that question which always applies, except for: it depends.
We really enjoyed diving into Reinforcement Learning, we hope our previous article and the code in this notebook will help you getting started with Reinforcement Learning as well!