原文地址:https://isaac-sim.github.io/IsaacLab/source/tutorials/03_envs/create_direct_rl_env.html

从环境中获取信息(观察)

获取joint(铰链)信息:位置和速度

joint会被包含在Articulation(关节)中,一个Articulation可能会包含1个或多个的joint对象,可以通过Articulation.find_joints()方法获得joint在当前Articulation中的索引(index)数据。

find_joints的返回值是这样的:tuple[list[joint索引], list[joint名字]]

find_joints的函数声明如下:

def find_joints( self, name_keys: str | Sequence[str], joint_subset: list[str] | None = None, preserve_order: bool = False ) -> tuple[list[int], list[str]]

在Articulation内部有一个属性私有变量_data: ArticulationData,该变量通过方法def data(self) -> ArticulationData获取,在ArticulationData中存放着几个关节重要的数据:位置ArticulationData._joint_pos,速度ArticulationData._joint_vel,加速度ArticulationData._joint_acc

ArticulationData有几个@property装饰器函数,用于获取上述的三个属性,这样可以用过属性名的方式直接访问到这些数据。

下面介绍下这三个方法的返回值:

joint_pos返回torch.Size([num_instances, num_joints])

joint_vel返回torch.Size([num_instances, num_joints])

joint_acc返回torch.Size([num_instances, num_joints])


是时候讲解下DirectRLEnv(gym.Env)._get_observations(self) -> VecEnvObs方法了,该方法带有@abstractmethod被定义成抽象方法,所以我们在继承DirectRLEnv类后必须在自己的类中实现_get_observations方法。

我们在_get_observations方法中计算并返回观测值,这会用到上面提到的ArticulationData以及如何通过joint索引从中获取实际数据。

cartpole_env.py的代码中有如下实现:

def _get_observations(self) -> torch.Dict[str, torch.Tensor | torch.Dict[str, torch.Tensor]]:
	obs = torch.cat(
		(
			self.joint_pos[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
			self.joint_vel[:, self._pole_dof_idx[0]].unsqueeze(dim=1),
			self.joint_pos[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
			self.joint_vel[:, self._cart_dof_idx[0]].unsqueeze(dim=1),
		),
		dim=-1,
	)
	observations = {"policy": obs}
	return observations

上述代码中的_pole_dof_idx里边存放的是杆子的joint对应的索引数据,_cart_dof_idx存放的是小车的joint对应的索引数据,这里介绍下获取杆子位置的代码,获取杆子速度和小车位置和速度的代码都一样。

self.joint_pos就的返回数据类型是:torch.Size([num_instances, num_joints])形状的张量,所以self.joint_pos[:, self._pole_dof_idx[0]]的意思就是从self.joint_pos中获取索引为self._pole_dof_idx[0]的所有杆子的位置信息

通过在_get_observations函数中增加了print函数我把数据打印出来

print("[INFO]: _pole_dof_idx -> ", self._pole_dof_idx)
print("[INFO]: joint_pos -> ", self.joint_pos)
print("[INFO]: pole_joint_pos -> ", self.joint_pos[:, self._pole_dof_idx[0]])

打印出的数据如下:

[INFO]: _pole_dof_idx ->  [1]
[INFO]: joint_pos ->  tensor([[-0.0995, -0.0243],
        [-0.5815,  0.0256],
        [-0.5531,  0.4727],
        ...,
        [ 0.4905, -0.7841],
        [-0.4129,  0.4739],
        [ 0.4791, -0.8703]], device='cuda:0')
[INFO]: pole_joint_pos ->  tensor([-0.0243,  0.0256,  0.4727,  ..., -0.7841,  0.4739, -0.8703],
       device='cuda:0')

由于self._pole_dof_idx[0] == 1,所以第二列的数据存储的就是杆子的位置数据了。位于代码中用了self._pole_dof_idx[0],因为self._pole_dof_idx的表中只存储了一个joint的索引值,也就是当前杆子对应的joint索引值。

如果大家感兴趣的话可以把小车的数据也打印出来看下在joint_pos中的第一列数据是否是小车的位置信息。

Pytorch补充:

torch.unsqueeze(_input_, _dim_)函数

用来将_input_的数据增加一个维度,以打印信息的pole_joint_pos数据为例,当dim=1时,一维数组会变成二维张量,如下所示:

tensor([[-0.0243],  [0.0256],  [0.4727],  ..., [-0.7841],  [0.4739], [-0.8703]],
       device='cuda:0')

数据变成了torch.Size([N, 1]),也就是N行一列的数据

torch.cat(_tensors_, _dim=0_, _*_, _out=None_) → [Tensor]函数

用来按照指定维度拼接多个张量,在本例中torch.catdim=-1,所以按照张量的最后一个维度进行拼接

最终_get_observations中的obs变量存储这一个torch.Size([N, 4])形状的张量数据:

杆子位置杆子速度小车位置小车速度

奖励函数 _get_rewards 分析

存活奖励rew_alive

在类class DirectRLEnv(gym.Env)中有以下变量 self.reset_terminated = torch.zeros(self.num_envs, device=self.device, dtype=torch.bool)

当重置时该变量被设置为true, 否则为false

在计算存活奖励:rew_alive时的代码如下:rew_alive = rew_scale_alive * (1.0 - reset_terminated.float())

torch.bool强转为torch.float类型,当重置发生时rew_scale_alive * (1.0 - 1.0),所以重置时的存活奖励就是0

终止奖励(惩罚)rew_termination

当代理不稳定或处于不安全的状态时触发,另外如果代理能够长时间稳定运行也会希望终止回合并开始新的回合,这样代理可以学会从不同的起始配置启动

所以终止分两种:

  • 时间限制条件
  • 终止条件

计算终止奖励的代码如下:

rew_termination = rew_scale_terminated * reset_terminated.float()

速度 & 位置 范围限制奖励

当小车与倒立摆的速度与位置在范围内时能够获得的奖励

相关的奖励有三个,看下面的代码:

rew_pole_pos = rew_scale_pole_pos * torch.sum(torch.square(pole_pos), dim=-1)
rew_cart_vel = rew_scale_cart_vel * torch.sum(torch.abs(cart_vel), dim=-1)
rew_pole_vel = rew_scale_pole_vel * torch.sum(torch.abs(pole_vel), dim=-1)

Pytorch补充:

torch.sum函数:

返回输入张量中所有元素的总和,dim=-1表示的是最后一个维度

torch.square函数:

计算所有元素的平方值,并返回新值的张量

torch.abs函数:

计算所有元素的绝对值,并返回新值的张量


设计终止条件_get_dones

在超时或者超出范围时我们需要重置环境,在DirectRLEnv中有一个抽象方法用于配置终止条件,这个方法返回两个Tensor组成的TupleTuple中的第一个Tensor存储了终止条件,第二个Tensor存储了超时信息,每个张量的形状为:torch.Size([num_envs])

@abstractmethod
def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]

cartpole_env.py中的实现如下:

def _get_dones(self) -> tuple[torch.Tensor, torch.Tensor]:
	self.joint_pos = self.cartpole.data.joint_pos
	self.joint_vel = self.cartpole.data.joint_vel
	time_out = self.episode_length_buf >= self.max_episode_length - 1
	out_of_bounds = torch.any(torch.abs(self.joint_pos[:, self._cart_dof_idx]) > self.cfg.max_cart_pos, dim=1)
	out_of_bounds = out_of_bounds | torch.any(torch.abs(self.joint_pos[:, self._pole_dof_idx]) > math.pi / 2, dim=1)
	return out_of_bounds, time_out

超时条件(time_out)计算

将当前所有环境的episode缓冲长度(dtype=torch.long)与最大允许的episode长度进行比较,并将结果存储到一个形状为torch.Size([num_envs]),数据类型为torch.bool的张量中。 下面是DirectRLEnv.max_episode_length的计算方法:

@property
def max_episode_length_s(self) -> float:
	"""Maximum episode length in seconds."""
	return self.cfg.episode_length_s

@property
def max_episode_length(self):
	"""The maximum episode length in steps adjusted from s."""
	return math.ceil(self.max_episode_length_s / (self.cfg.sim.dt * self.cfg.decimation))
  • self.cfg.sim.dt:在SimulationCfg.dt中定义,是物理时间步长(秒),self.cfg.sim.dt * self.cfg.decimation用来计算控制动作而执行频率。
  • self.cfg.episode_length_s:episode的最大长度(秒)
  • 所以math.ceil(self.max_episode_length_s / (self.cfg.sim.dt * self.cfg.decimation))是计算出最大的物理时间步数并向上取整

小车和杆子的活动界限计算

  • torch.abs(self.joint_pos[:, self._cart_dof_idx]):获取小车位置信息的绝对值,返回的张量形状为:torch.Size([N, 1])
  • 然后判断小车位置的绝对值是否大于self.cfg.max_cart_pos值,这时候会返回一个形状为:torch.Size([N, 1])dtype=bool的张量
  • 最后通过torch.any函数设置dim=1测试所有列的是否为True,并返回形状为torch.Size([N])的张量