Reinforcement Learning is a framework for tackling sequential decision problems: what to do next in order to maximize a reward (which might be delayed), on a changing universe (which might react to our actions).

Examples of this include:

  • Game playing: which actions are critical to win a game?
  • Learning in a “small world”: what actions maximize pleasure / minimize pain?
  • Treatment of chronical diseases: how to evolve the treatment for a disease that creates resistance?

The unifying theme on the problems above can be abstracted as follows:

  • An agent receives a signal from the environment, selected by Nature.
  • The agent executes an action.
  • Given the agents’ action, Nature assigns a reward and draws a new state, which is announced to the agent.
  • The situation repeats until a terminal criterion is reached.

An example: OpenAI Gym

We’ll use the OpenAI Gym environment for this.

It consists of a number of Atari environments that we can use for experimenting during this course. If you haven’t please install the library OpenAI gym (pip install gym).

from tqdm import tqdm
import gym

#create a single game instance
env = gym.make("FrozenLake-v0")

Here, S is the initial state, and your aim is to reach G, without falling into the holes, H. The squares marked with F are frozen, which means you can step on them.

Note: The environment is non-deterministic, you can slip in the ice and end up in a different state.

How to use the environment?

  • reset() returns the initial state / first observation.
  • render() returns the current environment state.
  • step(a) returns what happens after action a:
    • new observation: the new state.
    • reward: the reward corresponding to that action in that state.
    • is done: binary flag, True if the game is over.
    • info: Some auxiliary stuff, which we can ignore now.
¬†The code below is self-explanatory. If you run the three blocks within print statements in a Jupyter notebook it will make a lot of sense ūüôā
print("The initial state: ", env.reset())
print(" and it looks like: ")
env.render()

The initial state:  0 and it looks like: 

SFFF
FHFH
FFFH
HFFG

print("Now let's take an action: ")
new_state, reward, done, _ = env.step(1)
env.render()


idx_to_action = {
    0:"<", #left
    1:"v", #down
    2:">", #right
    3:"^" #up
}

Now let's take an action: 
  (Down)
SFFF
FHFH
FFFH
HFFG

 

Defining a policy
  • The environment has a 4×4 grid of states.
  • For each state there are 4 possible actions.

A policy is a function from states to actions. It tells us what we should do on each state. In this case, any array of size 16×4 is a (deterministic) policy.

We can implement policies as dictionaries.

import numpy as np
n_states = env.observation_space.n
n_actions = env.action_space.n

# Initialize random_policy:
def init_random_policy():
    random_policy  = {}
    for state in range(n_states):
        random_policy[state] = np.random.choice(n_actions)
    return random_policy

We need now to define a function to evaluate our policy.

def evaluate(env, policy, max_episodes=100): 
    tot_reward = 0
    for ep in range(max_episodes):
        state = env.reset()
        done = False
        ep_reward = 0
        
        # Reward per episode
        while not done:
            action = policy[state]
            new_state, reward, done, _ = env.step(action)
            ep_reward += reward
            state = new_state
            if done:
                tot_reward += ep_reward
    return tot_reward/(max_episodes)

Looking for the best policy: Random search

As a very first example, let’s try to find our policy by pure random search: we will play for some time and keep track of the best actions we can do on each state.

FrozenLake-v0 defines “solving” as getting average reward of 0.78 over 100 consecutive trials.

best_policy = None
best_score = -float('inf')

# Random search
for i in range(1,10000): #tip: you can use tqdm(range(1,10000)) for a progress bar
    policy = init_random_policy()
    score = evaluate(env,policy,100)
    if score > best_score:
        best_score = score
        best_policy = policy
    if i%5000 == 0:
        print("Best score:", best_score)
print("Best policy:")
print(best_policy)

running the code above we get:

Best score: 0.45
Best policy:
{0: 2, 1: 3, 2: 0, 3: 1, 4: 0, 5: 3, 6: 0, 7: 3, 8: 3, 9: 1, 10: 0, 11: 1, 12: 3, 13: 2, 14: 2, 15: 1}
# Let's see the policy in action
def play(env,policy, render=False):
    s = env.reset()
    d = False
    while not d:
        a = policy[s]
        print("*"*10)
        print("State: ",s)
        print("Action: ",idx_to_action[a])
        s, r, d, _ = env.step(a)
        if render:
            env.render()
        if d:
            print(r)

Let’s create a small function to print a nicer policy:

def print_policy(policy):
    lake = "SFFFFHFHFFFHHFFG"
    arrows = [idx_to_action[policy[i]] 
               if lake[i] in 'SF' else '*' for i in range(n_states)]
    for i in range(0,16,4):
        print(''.join(arrows[i:i+4]))

 

We can call then use the functions above to render the optimal policy. Note that the policy might not give the optimal action: recall that there is some noise involved (you can slip on the ice) which is responsible of a non-optimal action looking like optimal.

print_policy(best_policy)
play(env,best_policy)

 

Using a different policy

Let’s try some different implementation of a random policy, which will be more useful later on.

# theta = 0.25*np.ones((n_states,n_actions))
def random_parameter_policy(theta):
    theta = theta/np.sum(theta, axis=1, keepdims=True) # ensure that the array is normalized
    policy  = {}
    probs = {}
    for state in range(n_states):
        probs[state] = np.array(theta[state,:])
        policy[state] = np.random.choice(n_actions, p = probs[state])
    return policy

 

best_policy = None
best_score = -float('inf')
alpha = 1e-2
# Random search
for i in range(1,10000): 
    theta = 0.25*np.ones((n_states,n_actions))
    policy = random_parameter_policy(theta)
    score = evaluate(env,policy,100)
    if score > best_score:
        best_score = score
        best_policy = policy
    theta = theta + alpha*(score-best_score)*np.ones((n_states,n_actions))
    if i%5000 == 0:
        print("Best score:", best_score)
print("Best policy:")
print(best_policy)
print("Best score:", best_score)

the outcome is:

Best score: 0.39
Best policy:
{0: 0, 1: 1, 2: 0, 3: 1, 4: 0, 5: 1, 6: 0, 7: 2, 8: 3, 9: 1, 10: 0, 11: 1, 12: 1, 13: 2, 14: 3, 15: 1}
Best score: 0.53

 


What’s the advantage of this? Perhaps not much right now, but this is the first step to use more sophisticated techniques over random search. Note that we do a “gradient update” of sorts when we change the parameter theta in the direction of increase of the best score. This hints that we could use other update rules, perhaps taking the output as a more sophisticated input of the game history.

Another thing to notice is that we made effectively our policy stochastic: at every stage the agent has the possibility of choosing randomly his action. This has the effect of smoothing out the problem: we are now solving an optimization problem on a continuous, instead of a discrete space.

Your turn:

  • Beat the hill climbing / random search benchmark! Implement a different search method for the parameters.
  • Try the CartPole environment. In CartPole, the state is continuous (4 different parameters), so you need to do something on the lines of the parameterized random search example. Look at this blog post¬† for inspiration.

 

Do you want to learn reinforcement learning on a live tutorial? Join our August 23-24 session!

One thought on “Reinforcement Learning in Python, Part 1: Welcome to RL

Leave a Reply

Your email address will not be published.

This site uses Akismet to reduce spam. Learn how your comment data is processed.