Pytorch DQN Double DQN Dueling DQN 实现跑 Highway
本文章已经基于读者掌握了DQN,DoubleDQN, Dueling DQN的基础之上所做的代码,
DQN 入门链接
莫凡 DQN
知乎白话文DQN
Double DQN
莫凡 DoubleQN
知乎白话文DoubleDQN
Dueling DQN
莫凡 Dueling DQN
知乎 Dueling DQN
HighwayENV 链接
说明 莫凡中的代码所有的targe_q_value 的计算方式都是按照
所有action的输出Q值来计算loss, 是多维多列矩阵。
所有官方代码都是 基于确定的action 所在的列来计算targe_q_value, 最后计算loss,是多维单列矩阵,具体2.2已经声明
主要流程
1 构建eval_net target_net
普通DQN:
eavl_net = (举例子:input_dim256256*out_dim)
target_net = Same(eavl_net)
DuelingDQN:
将Q值 输出拆分为 value + advantage 的和
advantage_net = (举例子:input_dim*256*256*out_dim)value_net = (举例子:input_dim*256*256*1)eavl_net = value_net + ( advantage_net - mean(advantage_net) )target_net = Same(eavl_net)
准备环境
s: 当前环境输入
a: 当前使用的动作
r: 当前的回报
d: 动作是否done(成功或着失败)
s_: 下一刻环境
2 计算 current_q_value targe_q_value
2.1 根据当前的环境输入 s 和 action 获取 current_q_value
current_q_value = self.eval_net(s).gather(1,a)
gather(1,a) 选择所在列的数据
2.2 根据下一刻环境输入 s_ 从所有输出总选择最大的 Q_next
# Compute the next Q-values using the target network
next_q_values = self.target_net(b_s_)# double DQN 主要解决 Q值过高问题 Q_next = target_net(s_, argmax(eval_net(s_)))
# 普通 DQN Q_next = target_net(s_).max()if self.double_q:#启用double dqn#根据环境s_ 计算 eval_net Q值next_eval_values = self.eval_net(b_s_)#选择Q值最大的 action 的值actions = next_eval_values.argmax(dim=1).reshape(-1, 1)#根据这个action 输入---> target 得出 当前 target_valuenext_q_values = next_q_values.gather(1,actions)
else:# 普通DQN 直接选择 next_q_values 中的最大值即可next_q_values, _ = next_q_values.max(dim=1)next_q_values = next_q_values.reshape(-1, 1)
2.3 根据根据下一刻环境输入 r, done 标志 ,gamma 来计算 targe_q_value
target_q_values = r + (1 - d) * self.gamma * next_q_values
3 SmoothL1loss 计算Loss
目前官方都用smoothL1loss /huber loss 来确定的action计算 loss
https://zhuanlan.zhihu.com/p/83131026
loss = F.smooth_l1_loss(current_q_values, target_q_values)
计算梯度并反馈
self.optimizer.zero_grad() #梯度清0
loss.backward() #梯度反向传播
nn.utils.clip_grad_norm_(self.eval_net.parameters(), self.max_grad_norm) #梯度截断
self.optimizer.step()
DQN 代码
import numpy as np
import os
from typing import Any, Dict, List, Optional, Tuple, Type, Unionimport torch as th
import torch.nn as nn
from torch.autograd import Variable
import torch.nn.functional as F# stable baseline3 中的构建多层全连接神经网络
def create_mlp_net(input_dim: int, #输入向量的维度output_dim: int, #输出结果的维度net_arch: List[int],activation_fn: Type[nn.Module] = nn.ReLU,squash_output: bool = False):modules = [nn.Linear(input_dim, net_arch[0]), activation_fn()]for idx in range(len(net_arch) - 1):modules.append(nn.Linear(net_arch[idx], net_arch[idx + 1]))modules.append(activation_fn())if output_dim > 0:last_layer_dim = net_arch[-1] if len(net_arch) > 0 else input_dimmodules.append(nn.Linear(last_layer_dim, output_dim))return nn.Sequential(*modules)class DuelingNet(nn.Module):def __init__(self,input_dim: int,output_dim: int, #输出结果的维度net_arch: List[int],activation_fn: Type[nn.Module] = nn.ReLU, ):super(DuelingNet, self).__init__()self.value_net = create_mlp_net(input_dim,1,net_arch,activation_fn)self.advantage_net = create_mlp_net(input_dim,output_dim,net_arch,activation_fn)def forward(self,x):value_out = self.value_net(x)advantage_out = self.advantage_net(x)average_advantage = advantage_out - th.mean(advantage_out)q_value = value_out + average_advantagereturn q_value class DQN:def __init__(self,env,learning_rate=0.005,reward_decay=0.9,e_greedy=0.9,e_greedy_increment=None,target_update_interval=200,memory_size=3000,batch_size=32,output_graph=False,DOUBLE_DQN=False, # 优化DQN Q值过高DUELING_DQN=False, # 使用value + advantage 来获取Q值):self.env = envself.n_actions = env.action_space.nself.n_features = env.observation_space.shape[0] * env.observation_space.shape[1]self.lr = learning_rateself.gamma = reward_decayself.epsilon_max = e_greedyself.replace_target_iter = target_update_intervalself.memory_size = memory_sizeself.batch_size = batch_sizeself.double_q = DOUBLE_DQNself.dueling_q = DUELING_DQNself.epsilon_increment = e_greedy_incrementself.epsilon = 0 if e_greedy_increment is not None else self.epsilon_maxself.max_grad_norm = 10self.learn_step_counter = 0#开辟 memory_size 个 [s,a,r,s_] 大小的空间self.memory = np.zeros((self.memory_size, self.n_features*2 + 3 ),dtype=np.float32) # self.memory_counter = 0#build layerif self.dueling_q:self.eval_net = DuelingNet(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)self.target_net = DuelingNet(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)else:self.eval_net = create_mlp_net(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)self.target_net = create_mlp_net(self.n_features,self.n_actions,[256,256],activation_fn=nn.ReLU)print("model------->")print(self.eval_net)#------- Define the optimizer------#self.optimizer = th.optim.Adam(self.eval_net.parameters(), learning_rate)# ------Define the loss function-----#self.loss_func = nn.SmoothL1Lossself.loss = 0.0def store_transition(self, s, a, r, s_, done):transition = np.hstack((s.flatten(), [a, r, done], s_.flatten()))index = self.memory_counter % self.memory_sizeself.memory[index, :] = transitionself.memory_counter += 1def choose_rlnet_action(self, observation):s = th.unsqueeze(th.FloatTensor(observation.flatten()), 0) #增加一个维度q_values = self.eval_net(s)action = q_values.argmax(dim=1).reshape(-1)return action.item()def choose_action(self, observation, determinstic = False):if determinstic:return self.choose_rlnet_action(observation)else:if np.random.uniform() > self.epsilon: # choosing actionreturn np.random.randint(0, self.n_actions)else:return self.choose_rlnet_action(observation)def save_model(self):if os.path.exists('torch_dqn_highway_model.pkl'):os.system("rm -rf torch_dqn_highway_model.pkl")th.save(self.eval_net,'torch_dqn_highway_model.pkl')print(" model saved !!")def load_model(self):print('load model')self.eval_net = th.load('torch_dqn_highway_model.pkl')def train_sample(self,sample_index):#参数硬更新if self.learn_step_counter % self.replace_target_iter == 0:self.target_net.load_state_dict(self.eval_net.state_dict())print('eval_net ---> targe_net: target_net_params_updated!')batch_memory = self.memory[sample_index, :]# 保持所有的输入具有相同的 batch_size 的维度b_s = Variable(th.FloatTensor(batch_memory[:, :self.n_features]))b_a = Variable(th.LongTensor(batch_memory[:, self.n_features].astype(int).reshape(-1,1))) b_r = Variable(th.FloatTensor(batch_memory[:, self.n_features + 1].reshape(-1,1)))b_d = Variable(th.FloatTensor(batch_memory[:, self.n_features + 2]).reshape(-1,1))b_s_ = Variable(th.FloatTensor(batch_memory[:, -self.n_features:]))# stablebaseline3 DQN执行代码with th.no_grad():# Compute the next Q-values using the target networknext_q_values = self.target_net(b_s_)if self.double_q:#启用double dqn#根据环境s_ 计算 eval_net Q值next_eval_values = self.eval_net(b_s_)#选择Q值最大的 action 的值actions = next_eval_values.argmax(dim=1).reshape(-1, 1)#根据这个action 输入---> target 得出 当前 target_valuenext_q_values = next_q_values.gather(1,actions)else:# Follow greedy policy: use the one with the highest valuenext_q_values, _ = next_q_values.max(dim=1)next_q_values = next_q_values.reshape(-1, 1)# print(next_q_values)# 1-step TD targettarget_q_values = b_r + (1 - b_d) * self.gamma * next_q_valuescurrent_q_values = self.eval_net(b_s).gather(1,b_a)# Compute Huber loss (less sensitive to outliers) when delta =1 : huber loss = smooth lossloss = F.smooth_l1_loss(current_q_values, target_q_values)self.loss = loss.item()self.optimizer.zero_grad() # reset the gradient to zeroloss.backward()nn.utils.clip_grad_norm_(self.eval_net.parameters(), self.max_grad_norm)self.optimizer.step()self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_maxself.learn_step_counter += 1def train(self):if self.memory_counter > self.memory_size:sample_index = np.random.choice(self.memory_size, size=self.batch_size)else:sample_index = np.random.choice(self.memory_counter, size=self.batch_size)self.train_sample(sample_index)def learn(self,learn_start, total_timesteps):eposide_count = 0try:while True:s = self.env.reset()print("\nnew eposide------>")while True:a = self.choose_action(s)s_,r,done,info = self.env.step(a)self.store_transition(s,a,r,s_,done)if self.memory_counter > learn_start:self.train()s = s_if done or self.memory_counter > total_timesteps:breakeposide_count +=1if self.memory_counter > learn_start:print("eposides_count :", eposide_count)print("time_steps :", self.memory_counter)print("epsilon :",self.epsilon)print("loss :",self.loss)print("learning progress:",float(self.memory_counter) / total_timesteps)if self.memory_counter > total_timesteps:print("learning stop !!")breakexcept KeyboardInterrupt:print("KeyboardInterrupt,learning stop")self.save_model()def test(self):global stop_flagself.load_model()s = self.env.reset()stop_flag = Falsetry:while True:a = self.choose_action(s,determinstic = True)print("action type:",self.env.action_type.actions[a])s_,r,done,info = self.env.step(a)s = s_self.env.render()if done:s = self.env.reset()except KeyboardInterrupt:print("KeyboardInterrupt, stop")
highway 主程序
#!/usr/bin/python3import sysDUELING_DQN = False
DOUBLE_DQN = Falseif len(sys.argv) > 1:if (sys.argv[1] == '-h' or sys.argv[1] == '--help'):print(
'''
-dueling enable dueling dqn
-double enable double dqn
-all enable double and dueling dqn
'''
)exit(0)elif sys.argv[1] == '-dueling':print('enable DUELING_DQN')DUELING_DQN = Trueelif sys.argv[1] == '-double':print('enable DOUBLE_DQN') DOUBLE_DQN = Trueelif sys.argv[1] == '-all':print('enable DOUBLE_DQN DUELING_DQN')DUELING_DQN = TrueDOUBLE_DQN = Trueelse:print('use default dqn model')import gym
import highway_env
from dqn import DQN
import timeenv = gym.make("highway-v0")
config ={'action': {'type': 'DiscreteMetaAction',},'observatoin': {'vehicles_count': 20,},'manual_control': False,'simulation_frequency': 15,'policy_frequency': 5,'duration': 30000, #多少步以后认为本循环结束
}
env.config.update(config)
env.reset()rl_model = DQN( env,memory_size=15000,batch_size=32,e_greedy_increment=0.0001,e_greedy = 0.8, learning_rate= 5e-4,reward_decay=0.8,target_update_interval=50,DOUBLE_DQN = DOUBLE_DQN,DUELING_DQN = DUELING_DQN,)
t1 = time.time()
# use crtl-c to stop
rl_model.learn(learn_start = 200, total_timesteps = 2e4)
print('Training time: ', time.time() - t1)
# use crtl-c to stop
rl_model.test()
结果
大概训练2万次以上有基本的避让动作,几十万次可以基本无碰撞避障.
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
