如何使用前馈神经网络实现与 Q-Learning 算法相同的性能

问题描述

为了解决 Taxi-v2 task,我使用了两种方法,第一种方法使用 Q-learnig,第二种方法使用深度 Q-Learning 在前馈神经网络的帮助下。第二种方法应该利用深度学习的力量来计算 Q 值,并且应该获得与第一种方法相同或更好的结果。但在实践中,它在进行测试时提供了更糟糕的结果。我尝试了许多前馈神经网络的配置,使用不同的层、损失、优化器、批量大小……但无法使其表现优于第一种方法

如何使用前馈神经网络(方法 2)实现与 Q-Learning(方法 1)算法相同的性能

方法

Taxi-V2 的 Q-Learning 实现

import numpy as np
import random
from IPython.display import clear_output
import gym

enviroment = gym.make("Taxi-v2").env
enviroment.render()

print('Number of states: {}'.format(enviroment.observation_space.n))
print('Number of actions: {}'.format(enviroment.action_space.n))

alpha = 0.1
gamma = 0.6
epsilon = 0.1
q_table = np.zeros([enviroment.observation_space.n,enviroment.action_space.n])

num_of_episodes = 100000

for episode in range(0,num_of_episodes):
    # Reset the enviroment
    state = enviroment.reset()

    # Initialize variables
    reward = 0
    terminated = False
    
    while not terminated:
        # Take learned path or explore new actions based on the epsilon
        if random.uniform(0,1) < epsilon:
            action = enviroment.action_space.sample()
        else:
            action = np.argmax(q_table[state])

        # Take action    
        next_state,reward,terminated,info = enviroment.step(action) 
        
        # Recalculate
        q_value = q_table[state,action]
        max_value = np.max(q_table[next_state])
        new_q_value = (1 - alpha) * q_value + alpha * (reward + gamma * max_value)
        
        # Update Q-table
        q_table[state,action] = new_q_value
        state = next_state
        
    if (episode + 1) % 100 == 0:
        clear_output(wait=True)
        print("Episode: {}".format(episode + 1))
        enviroment.render()

print("**********************************")
print("Training is done!\n")
print("**********************************")

训练完成后的评估测试:

(结果没有错误,这意味着它在好的位置上载了乘客并将他们掺杂了 100 次。)

total_epochs = 0
total_penalties = 0
num_of_episodes = 100

for _ in range(num_of_episodes):
    state = enviroment.reset()
    epochs = 0
    penalties = 0
    reward = 0
    
    terminated = False
    
    while not terminated:
        action = np.argmax(q_table[state])
        state,info = enviroment.step(action)

        if reward == -10:
            penalties += 1

        epochs += 1

    total_penalties += penalties
    total_epochs += epochs

print("**********************************")
print("Results")
print("**********************************")
print("Epochs per episode: {}".format(total_epochs / num_of_episodes))
print("Penalties per episode: {}".format(total_penalties / num_of_episodes))

方法

对同样的问题使用前馈神经网络,这与第一种方法非常相似

import numpy as np
import random
from IPython.display import clear_output
from collections import deque
import progressbar

import gym

from tensorflow.keras import Model,Sequential
from tensorflow.keras.layers import Dense,Embedding,Reshape
from tensorflow.keras.optimizers import Adam

enviroment = gym.make("Taxi-v2").env
enviroment.render()

print('Number of states: {}'.format(enviroment.observation_space.n))
print('Number of actions: {}'.format(enviroment.action_space.n))

optimizer = Adam(learning_rate=0.01)
agent = Agent(enviroment,optimizer)

batch_size = 32
num_of_episodes = 100
timesteps_per_episode = 1000
agent.q_network.summary()

for e in range(0,num_of_episodes):
    # Reset the enviroment
    state = enviroment.reset()
    state = np.reshape(state,[1,1])
    
    # Initialize variables
    reward = 0
    terminated = False
    
    bar = progressbar.ProgressBar(maxval=timesteps_per_episode/10,widgets=\
[progressbar.Bar('=','[',']'),' ',progressbar.Percentage()])
    bar.start()
    
    for timestep in range(timesteps_per_episode):
        # Run Action
        action = agent.act(state)
        
        # Take action    
        next_state,info = enviroment.step(action) 
        next_state = np.reshape(next_state,1])
        agent.store(state,action,next_state,terminated)
        
        state = next_state
        
        if terminated:
            agent.alighn_target_model()
            break
            
        if len(agent.expirience_replay) > batch_size:
            agent.retrain(batch_size)
        
        if timestep%10 == 0:
            bar.update(timestep/10 + 1)
    
    bar.finish()
    if (e + 1) % 10 == 0:
        print("**********************************")
        print("Episode: {}".format(e + 1))
        enviroment.render()
        print("**********************************")

测试

total_epochs = 0
total_penalties = 0
num_of_episodes = 100

for _ in range(num_of_episodes):
    state = enviroment.reset()
    state = np.reshape(state,1])
    epochs = 0
    penalties = 0
    reward = 0
    
    terminated = False
    
    while not terminated:
        action = agent.act(state)
        state,info = enviroment.step(action)
        state = np.reshape(state,1])
        if reward == -10:
            penalties += 1

        epochs += 1

    total_penalties += penalties
    total_epochs += epochs

print("**********************************")
print("Results")
print("**********************************")
print("Epochs per episode: {}".format(total_epochs / num_of_episodes))
print("Penalties per episode: {}".format(total_penalties / num_of_episodes))
class Agent:
    def __init__(self,enviroment,optimizer):
        
        # Initialize atributes
        self._state_size = enviroment.observation_space.n
        self._action_size = enviroment.action_space.n
        self._optimizer = optimizer
        
        self.expirience_replay = deque(maxlen=2000)
        
        # Initialize discount and exploration rate
        self.gamma = 0.6
        self.epsilon = 0.1
        
        # Build networks
        self.q_network = self._build_compile_model()
        self.target_network = self._build_compile_model()
        self.alighn_target_model()

    def store(self,state,terminated):
        self.expirience_replay.append((state,terminated))
    
    def _build_compile_model(self):
        model = Sequential()
        model.add(Embedding(self._state_size,10,input_length=1))
        model.add(Reshape((10,)))
        model.add(Dense(50,activation='relu'))
        model.add(Dense(50,activation='relu'))
        model.add(Dense(self._action_size,activation='linear'))
        
        model.compile(loss='mse',optimizer=self._optimizer)
        return model

    def alighn_target_model(self):
        self.target_network.set_weights(self.q_network.get_weights())
    
    def act(self,state):
        if np.random.rand() <= self.epsilon:
            return enviroment.action_space.sample()
        
        q_values = self.q_network.predict(state)
        return np.argmax(q_values[0])

    def retrain(self,batch_size):
        minibatch = random.sample(self.expirience_replay,batch_size)
        
        for state,terminated in minibatch:
            
            target = self.q_network.predict(state)
            
            if terminated:
                target[0][action] = reward
            else:
                t = self.target_network.predict(next_state)
                target[0][action] = reward + self.gamma * np.amax(t)
            
            self.q_network.fit(state,target,epochs=1,verbose=0)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)