Consider a simplified version of the arcade game Pac-Man. An agent moves on an open grid with randomly-placed pellets, and a ghost periodically moves across the grid. The agent receives large negative rewards (and loses the game) for touching a wall or the ghost. It receives smaller positive rewards for touching (and thus consuming) pellets.
The goal of this assignment is to train an agent to play this game using Q-learning.
In Q-learning, the agent estimates how good it is to take action a
in state s
by computing a Q-value Q(s,a)
.
We will compute Q-values like this:
Q(s,a) = w(f1,a)*v1 + w(f2,a)*v2 + ...
In this equation, each f
represents a feature name, each v
represents the value of a feature in state s
, and w
is a mapping of (f,a)
pairs to weights.
To learn the weight mapping, we will use this algorithm:
for each training episode until the episode ends let s be the current environment state with probability epsilon, let a be a random action available in s otherwise, let a be the action with the highest Q-value in s take action a let s' be the resulting state let r be the resulting reward let a' be the action with the highest Q-value in s' compute delta = r - Q(s,a) + gamma*Q(s',a') for each feature f in s: let v be the value of f in s add alpha*delta*v to the weight w(f,a)
An Environment class and an Agent class have been started for you below. You can run this code, but the agent will just move randomly, even after training for 1000 episodes. Once you provide the crucial missing parts indicated in the comments, you will be able to watch it behave more intelligently. Adjust the height of your console window to match the grid height, so that each display appears in the same place.
Algorithm notes:
delta
in this special case should just be r - Q(s,a)
.Python notes:
collections.defaultdict(int)
is useful for the weight map. It behaves just like a regular dictionary, except that it gives all weights default values of 0 until you assign them otherwise.import time import random import collections class Environment(object): """A grid world with pellets to collect and an enemy to avoid.""" def __init__(self, size, density): """Environments have fixed size and pellet counts.""" self.size = size self.density = density self.directions = [(0, 1), (1, 0), (0, -1), (-1, 0)] # left, down, right, up def initialize(self): """Place pacman, pellets, and ghost at random locations.""" locations = list() for r in range(1,self.size-1): for c in range(1,self.size-1): locations.append((r, c)) random.shuffle(locations) self.pacman = locations.pop() self.pellets = set() for count in range(self.density): self.pellets.add(locations.pop()) self.new_ghost() self.next_reward = 0 def new_ghost(self): """Place a ghost at one end of pacman's row or column.""" (r, c) = self.pacman locations = [(r, 0), (0, c), (r, self.size-1), (self.size-1, c)] choice = random.choice(range(len(locations))) self.ghost = locations[choice] self.ghost_action = self.directions[choice] def display(self): """Print the environment.""" for r in range(self.size): for c in range(self.size): if (r,c) == self.ghost: print 'G', elif (r,c) == self.pacman: print 'O', elif (r,c) in self.pellets: print '.', elif r == 0 or r == self.size-1: print 'X', elif c == 0 or c == self.size-1: print 'X', else: print ' ', print print def actions(self): """Return the actions the agent may try to take.""" if self.terminal(): return None else: return self.directions def terminal(self): """Return whether the episode is over.""" if self.next_reward == -100: return True elif len(self.pellets) == 0: return True else: return False def reward(self): """Return the reward earned at during the last update.""" return self.next_reward def update(self, action): """Adjust the environment given the agent's choice of action.""" pacman = self.pacman ghost = self.ghost # Pacman moves as chosen (r, c) = self.pacman (dr, dc) = action self.pacman = (r+dr, c+dc) # Ghost moves in its direction (r, c) = self.ghost (dr, dc) = self.ghost_action self.ghost = (r+dr, c+dc) # Ghost is replaced when it leaves (r, c) = self.ghost if r == 0 or r == self.size-1: self.new_ghost() elif c == 0 or c == self.size-1: self.new_ghost() (r,c) = self.pacman (gr,gc) = self.ghost # Negative reward for hitting the ghost if self.pacman == self.ghost: self.next_reward = -100 elif (pacman, ghost) == (self.ghost, self.pacman): self.next_reward = -100 # Negative reward for hitting a wall elif r == 0 or r == self.size-1: self.next_reward = -100 elif c == 0 or c == self.size-1: self.next_reward = -100 # Positive reward for consuming a pellet elif self.pacman in self.pellets: self.next_reward = 10 self.pellets.remove(self.pacman) else: self.next_reward = 0 def state(self): """Return a description of the state of the environment.""" s = dict() # Baseline feature noting how many pellets are left s['pellets left'] = len(self.pellets) / float(self.density) # YOU ADD MORE FEATURES return s class Agent(object): """Learns to act within the environment.""" def __init__(self): """Establish initial weights and learning parameters.""" self.w = collections.defaultdict(float) # Each w((f,a)) starts at 0 self.epsilon = 0.05 # Exploration rate self.gamma = 0.99 # Discount factor self.alpha = 0.01 # Learning rate def choose(self, s, actions): """Return an action to try in this state.""" p = random.random() if p < self.epsilon: return random.choice(actions) else: return self.policy(s, actions) def policy(self, s, actions): """Return the best action for this state.""" max_value = max([self.Q(s,a) for a in actions]) max_actions = [a for a in actions if self.Q(s,a) == max_value] return random.choice(max_actions) def Q(self, s, a): """Return the estimated Q-value of this action in this state.""" return 0 # YOU CHANGE THIS def observe(self, s, a, sp, r, actions): """Update weights based on this observed step.""" # YOU FILL THIS IN def main(): """Train an agent and then watch it act.""" environment = Environment(20,10) agent = Agent() for episode in range(1000): environment.initialize() while not environment.terminal(): s = environment.state() actions = environment.actions() a = agent.choose(s, actions) environment.update(a) sp = environment.state() r = environment.reward() actions = environment.actions() agent.observe(s, a, sp, r, actions) environment.initialize() environment.display() while not environment.terminal(): s = environment.state() actions = environment.actions() a = agent.policy(s, actions) environment.update(a) time.sleep(0.25) environment.display() if __name__ == '__main__': main()