In this lab, we will introduce the conception of Markov Decision Process(MDP) and two solution algorithms, and then we will introduce the Q-Learning and SARSA algorithm, finally we will use the Q-learning algorithm to train an agent to play "Flappy Bird" game.
A Markov decision process (MDP) is a random process, i.e. a sequence of random states S1, S2, ... with the Markov property. It provides a mathematical framework for modeling decision making in situations where outcomes are partly random and partly under the control of a decision maker.
There are two algorithms to solve the MDP problem: value iteration and policy iteration.
The algorithm is like below:
Next, we will show an example code of Gridworld to see how value iteration works.
please install gym.
commands: pip install gym==0.21.0
import numpy as np
import sys
from gym.envs.toy_text import discrete
# four actions in the game
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class GridworldEnv(discrete.DiscreteEnv):
"""
Grid World environment from Sutton's Reinforcement Learning book chapter 4.
You are an agent on an MxN grid and your goal is to reach the terminal
state at the top left or the bottom right corner.
For example, a 4x4 grid looks as follows:
T o o o
o x o o
o o o o
o o o T
x is your position and T are the two terminal states.
You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
Actions going off the edge leave you in your current state.
You receive a reward of -1 at each step until you reach a terminal state.
"""
metadata = {'render.modes': ['human', 'ansi']}
def __init__(self, shape=[4, 4]):
if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
raise ValueError('shape argument must be a list/tuple of length 2')
self.shape = shape
nS = np.prod(shape)
nA = 4
MAX_Y = shape[0]
MAX_X = shape[1]
P = {}
grid = np.arange(nS).reshape(shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
P[s] = {a: [] for a in range(nA)}
is_done = lambda s: s == 0 or s == (nS - 1)
reward = 0.0 if is_done(s) else -1.0
# We're stuck in a terminal state
if is_done(s):
P[s][UP] = [(1.0, s, reward, True)]
P[s][RIGHT] = [(1.0, s, reward, True)]
P[s][DOWN] = [(1.0, s, reward, True)]
P[s][LEFT] = [(1.0, s, reward, True)]
# Not a terminal state
else:
ns_up = s if y == 0 else s - MAX_X
ns_right = s if x == (MAX_X - 1) else s + 1
ns_down = s if y == (MAX_Y - 1) else s + MAX_X
ns_left = s if x == 0 else s - 1
P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]
it.iternext()
# Initial state distribution is uniform
isd = np.ones(nS) / nS
# We expose the model of the environment for educational purposes
# This should not be used in any model-free learning algorithm
self.P = P
super(GridworldEnv, self).__init__(nS, nA, P, isd)
def render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
grid = np.arange(self.nS).reshape(self.shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
if self.s == s:
output = " x "
elif s == 0 or s == self.nS - 1:
output = " T "
else:
output = " o "
if x == 0:
output = output.lstrip()
if x == self.shape[1] - 1:
output = output.rstrip()
outfile.write(output)
if x == self.shape[1] - 1:
outfile.write("\n")
it.iternext()
env = GridworldEnv()
def value_iteration(env, theta=0.0001, discount_factor=1.0):
"""
Value Iteration Algorithm.
Args:
env: OpenAI env. env.P represents the transition probabilities of the environment.
env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
env.nS is a number of states in the environment.
env.nA is a number of actions in the environment.
theta: We stop evaluation once our value function change is less than theta for all states.
discount_factor: Gamma discount factor.
Returns:
A tuple (policy, V) of the optimal policy and the optimal value function.
"""
def one_step_lookahead(state, V):
"""
Given an state, calculate the new value function V(s) based on the value iteration algorithm
Args:
state: represents each state in the Gridworld, an integer
V: the current value function of the states(V(s)), the lengh is env.nS
Returns:
a new V(s)
"""
A = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
A[a] += prob * (reward + discount_factor * V[next_state])
return A
V = np.zeros(env.nS)
while True:
delta = 0
for s in range(env.nS):
# Do a one-step lookahead to find the best action
A = one_step_lookahead(s, V)
best_action_value = np.max(A)
# Calculate delta across all states seen so far
delta = max(delta, np.abs(best_action_value - V[s]))
# Update the value function
V[s] = best_action_value
# Check if we can stop
if delta < theta:
break
# Create a deterministic policy using the optimal value function
policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
# One step lookahead to find the best action for this state
A = one_step_lookahead(s, V)
best_action = np.argmax(A)
# Always take the best action
policy[s, best_action] = 1.0
return policy, V
policy, v = value_iteration(env)
print("Policy Probability Distribution:")
print(policy)
print("")
print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")
print("Value Function:")
print(v)
print("")
print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")
Policy Probability Distribution: [[1. 0. 0. 0.] [0. 0. 0. 1.] [0. 0. 0. 1.] [0. 0. 1. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [0. 0. 1. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [0. 1. 0. 0.] [0. 0. 1. 0.] [1. 0. 0. 0.] [0. 1. 0. 0.] [0. 1. 0. 0.] [1. 0. 0. 0.]] Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left): [[0 3 3 2] [0 0 0 2] [0 0 1 2] [0 1 1 0]] Value Function: [ 0. -1. -2. -3. -1. -2. -3. -2. -2. -3. -2. -1. -3. -2. -1. 0.] Reshaped Grid Value Function: [[ 0. -1. -2. -3.] [-1. -2. -3. -2.] [-2. -3. -2. -1.] [-3. -2. -1. 0.]]
The algorithm is like below:
Next, we will show an example code of Gridworld to see how value iteration works.
import numpy as np
import sys
from gym.envs.toy_text import discrete
env = GridworldEnv()
def policy_eval(policy, env, discount_factor=1.0, theta=0.00001):
"""
Evaluate a policy given an environment and a full description of the environment's dynamics.
Args:
policy: [S, A] shaped matrix representing the policy.
env: OpenAI env. env.P represents the transition probabilities of the environment.
env.P[s][a] is a list of transition tuples (prob, next_state, reward, done).
env.nS is a number of states in the environment.
env.nA is a number of actions in the environment.
theta: We stop evaluation once our value function change is less than theta for all states.
discount_factor: Gamma discount factor.
Returns:
Vector of length env.nS representing the value function.
"""
# Start with a random (all 0) value function
V = np.zeros(env.nS)
while True:
delta = 0
# For each state, perform a "full backup"
for s in range(env.nS):
v = 0
# Look at the possible next actions
for a, action_prob in enumerate(policy[s]):
# For each action, look at the possible next states...
for prob, next_state, reward, done in env.P[s][a]:
# Calculate the expected value
v += action_prob * prob * (reward + discount_factor * V[next_state])
# How much our value function changed (across any states)
delta = max(delta, np.abs(v - V[s]))
V[s] = v
# Stop evaluating once our value function change is below a threshold
if delta < theta:
break
return np.array(V)
def policy_improvement(env, policy_eval_fn=policy_eval, discount_factor=1.0):
"""
Policy Improvement Algorithm. Iteratively evaluates and improves a policy
until an optimal policy is found.
Args:
env: The OpenAI environment.
policy_eval_fn: Policy Evaluation function that takes 3 arguments:
policy, env, discount_factor.
discount_factor: gamma discount factor.
Returns:
A tuple (policy, V).
policy is the optimal policy, a matrix of shape [S, A] where each state s
contains a valid probability distribution over actions.
V is the value function for the optimal policy.
"""
def one_step_lookahead(state, V):
"""
Helper function to calculate the value for all action in a given state.
Args:
state: The state to consider (int)
V: The value to use as an estimator, Vector of length env.nS
Returns:
A vector of length env.nA containing the expected value of each action.
"""
A = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
A[a] += prob * (reward + discount_factor * V[next_state])
return A
# Start with a random policy
policy = np.ones([env.nS, env.nA]) / env.nA
while True:
# Evaluate the current policy
V = policy_eval_fn(policy, env, discount_factor)
# Will be set to false if we make any changes to the policy
policy_stable = True
# For each state...
for s in range(env.nS):
# The best action we would take under the current policy
chosen_a = np.argmax(policy[s])
# Find the best action by one-step lookahead
# Ties are resolved arbitarily
action_values = one_step_lookahead(s, V)
best_a = np.argmax(action_values)
# Greedily update the policy
if chosen_a != best_a:
policy_stable = False
policy[s] = np.eye(env.nA)[best_a]
# If the policy is stable we've found an optimal policy. Return it
if policy_stable:
return policy, V
policy, v = policy_improvement(env)
print("Policy Probability Distribution:")
print(policy)
print("")
print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")
print("Value Function:")
print(v)
print("")
print("Reshaped Grid Value Function:")
print(v.reshape(env.shape))
print("")
Policy Probability Distribution: [[1. 0. 0. 0.] [0. 0. 0. 1.] [0. 0. 0. 1.] [0. 0. 1. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [0. 0. 1. 0.] [1. 0. 0. 0.] [1. 0. 0. 0.] [0. 1. 0. 0.] [0. 0. 1. 0.] [1. 0. 0. 0.] [0. 1. 0. 0.] [0. 1. 0. 0.] [1. 0. 0. 0.]] Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left): [[0 3 3 2] [0 0 0 2] [0 0 1 2] [0 1 1 0]] Value Function: [ 0. -1. -2. -3. -1. -2. -3. -2. -2. -3. -2. -1. -3. -2. -1. 0.] Reshaped Grid Value Function: [[ 0. -1. -2. -3.] [-1. -2. -3. -2.] [-2. -3. -2. -1.] [-3. -2. -1. 0.]]