这是indexloc提供的服务,不要输入任何密码
Skip to content

Implements set_env_attr and get_env_attr for vector environments #478

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion test/base/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,25 @@ def test_vecenv(size=10, num=8, sleep=0.001):
for i, v in enumerate(venv):
print(f'{type(v)}: {t[i]:.6f}s')

def assert_get(v, expected):
assert v.get_env_attr("size") == expected
assert v.get_env_attr("size", id=0) == [expected[0]]
assert v.get_env_attr("size", id=[0, 1, 2]) == expected[:3]

for v in venv:
assert v.size == list(range(size, size + num))
assert_get(v, list(range(size, size + num)))
assert v.env_num == num
assert v.action_space == [Discrete(2)] * num

v.set_env_attr("size", 0)
assert_get(v, [0] * num)

v.set_env_attr("size", 1, 0)
assert_get(v, [1] + [0] * (num - 1))

v.set_env_attr("size", 2, [1, 2, 3])
assert_get(v, [1] + [2] * 3 + [0] * (num - 4))

for v in venv:
v.close()

Expand Down
22 changes: 11 additions & 11 deletions test/throughput/test_batch_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@pytest.fixture(scope="module")
def data():
print("Initialising data...")
print("Initializing data...")
np.random.seed(0)
batch_set = [
Batch(
Expand All @@ -19,16 +19,16 @@ def data():
'b1': (3.14, 3.14),
'b2': np.arange(1e3)
},
c=i
c=i,
) for i in np.arange(int(1e4))
]
batch0 = Batch(
a=np.ones((3, 4), dtype=np.float64),
b=Batch(
c=np.ones((1, ), dtype=np.float64),
d=torch.ones((3, 3, 3), dtype=torch.float32),
e=list(range(3))
)
e=list(range(3)),
),
)
batchs1 = [copy.deepcopy(batch0) for _ in np.arange(1e4)]
batchs2 = [copy.deepcopy(batch0) for _ in np.arange(1e4)]
Expand All @@ -39,25 +39,25 @@ def data():
indexs = np.random.choice(batch_len, size=batch_len // 10, replace=False)
slice_dict = {
'obs': [np.arange(20) for _ in np.arange(batch_len // 10)],
'reward': np.arange(batch_len // 10)
'reward': np.arange(batch_len // 10),
}
dict_set = [
{
'obs': np.arange(20),
'info': "this is info",
'reward': 0
'reward': 0,
} for _ in np.arange(1e2)
]
batch4 = Batch(
a=np.ones((10000, 4), dtype=np.float64),
b=Batch(
c=np.ones((1, ), dtype=np.float64),
d=torch.ones((1000, 1000), dtype=torch.float32),
e=np.arange(1000)
)
e=np.arange(1000),
),
)

print("Initialised")
print("Initialized")
return {
'batch_set': batch_set,
'batch0': batch0,
Expand All @@ -67,7 +67,7 @@ def data():
'indexs': indexs,
'dict_set': dict_set,
'slice_dict': slice_dict,
'batch4': batch4
'batch4': batch4,
}


Expand Down Expand Up @@ -106,7 +106,7 @@ def test_set_attr(data):

def test_numpy_torch_convert(data):
"""Test conversion between numpy and torch."""
for _ in np.arange(1e5):
for _ in np.arange(1e4): # not sure what's wrong in torch==1.10.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here

data['batch4'].to_torch()
data['batch4'].to_numpy()

Expand Down
51 changes: 45 additions & 6 deletions tianshou/env/venvs.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,56 @@ def __getattribute__(self, key: str) -> Any:
if key in [
'metadata', 'reward_range', 'spec', 'action_space', 'observation_space'
]: # reserved keys in gym.Env
return self.__getattr__(key)
return self.get_env_attr(key)
else:
return super().__getattribute__(key)

def __getattr__(self, key: str) -> List[Any]:
"""Fetch a list of env attributes.
def get_env_attr(
self,
key: str,
id: Optional[Union[int, List[int], np.ndarray]] = None
) -> List[Any]:
"""Get an attribute from the underlying environments.

If id is an int, retrieve the attribute denoted by key from the environment
underlying the worker at index id. The result is returned as a list with one
element. Otherwise, retrieve the attribute for all workers at indices id and
return a list that is ordered correspondingly to id.

:param str key: The key of the desired attribute.
:param id: Indice(s) of the desired worker(s). Default to None for all env_id.

:return list: The list of environment attributes.
"""
self._assert_is_not_closed()
id = self._wrap_id(id)
if self.is_async:
self._assert_id(id)

This function tries to retrieve an attribute from each individual wrapped
environment, if it does not belong to the wrapping vector environment class.
return [self.workers[j].get_env_attr(key) for j in id]

def set_env_attr(
self,
key: str,
value: Any,
id: Optional[Union[int, List[int], np.ndarray]] = None
) -> None:
"""Set an attribute in the underlying environments.

If id is an int, set the attribute denoted by key from the environment
underlying the worker at index id to value.
Otherwise, set the attribute for all workers at indices id.

:param str key: The key of the desired attribute.
:param Any value: The new value of the attribute.
:param id: Indice(s) of the desired worker(s). Default to None for all env_id.
"""
return [getattr(worker, key) for worker in self.workers]
self._assert_is_not_closed()
id = self._wrap_id(id)
if self.is_async:
self._assert_id(id)
for j in id:
self.workers[j].set_env_attr(key, value)

def _wrap_id(
self,
Expand Down
8 changes: 6 additions & 2 deletions tianshou/env/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ def __init__(self, env_fn: Callable[[], gym.Env]) -> None:
self._env_fn = env_fn
self.is_closed = False
self.result: Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
self.action_space = getattr(self, "action_space") # noqa: B009
self.action_space = self.get_env_attr("action_space") # noqa: B009

@abstractmethod
def __getattr__(self, key: str) -> Any:
def get_env_attr(self, key: str) -> Any:
pass

@abstractmethod
def set_env_attr(self, key: str, value: Any) -> None:
pass

@abstractmethod
Expand Down
5 changes: 4 additions & 1 deletion tianshou/env/worker/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ def __init__(self, env_fn: Callable[[], gym.Env]) -> None:
self.env = env_fn()
super().__init__(env_fn)

def __getattr__(self, key: str) -> Any:
def get_env_attr(self, key: str) -> Any:
return getattr(self.env, key)

def set_env_attr(self, key: str, value: Any) -> None:
setattr(self.env, key, value)

def reset(self) -> Any:
return self.env.reset()

Expand Down
18 changes: 15 additions & 3 deletions tianshou/env/worker/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,27 @@
pass


class _SetAttrWrapper(gym.Wrapper):

def set_env_attr(self, key: str, value: Any) -> None:
setattr(self.env, key, value)

def get_env_attr(self, key: str) -> Any:
return getattr(self.env, key)


class RayEnvWorker(EnvWorker):
"""Ray worker used in RayVectorEnv."""

def __init__(self, env_fn: Callable[[], gym.Env]) -> None:
self.env = ray.remote(gym.Wrapper).options(num_cpus=0).remote(env_fn())
self.env = ray.remote(_SetAttrWrapper).options(num_cpus=0).remote(env_fn())
super().__init__(env_fn)

def __getattr__(self, key: str) -> Any:
return ray.get(self.env.__getattr__.remote(key))
def get_env_attr(self, key: str) -> Any:
return ray.get(self.env.get_env_attr.remote(key))

def set_env_attr(self, key: str, value: Any) -> None:
ray.get(self.env.set_env_attr.remote(key, value))

def reset(self) -> Any:
return ray.get(self.env.reset.remote())
Expand Down
7 changes: 6 additions & 1 deletion tianshou/env/worker/subproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ def _encode_obs(
p.send(env.seed(data) if hasattr(env, "seed") else None)
elif cmd == "getattr":
p.send(getattr(env, data) if hasattr(env, data) else None)
elif cmd == "setattr":
setattr(env, data["key"], data["value"])
else:
p.close()
raise NotImplementedError
Expand Down Expand Up @@ -140,10 +142,13 @@ def __init__(
self.child_remote.close()
super().__init__(env_fn)

def __getattr__(self, key: str) -> Any:
def get_env_attr(self, key: str) -> Any:
self.parent_remote.send(["getattr", key])
return self.parent_remote.recv()

def set_env_attr(self, key: str, value: Any) -> None:
self.parent_remote.send(["setattr", {"key": key, "value": value}])

def _decode_obs(self) -> Union[dict, tuple, np.ndarray]:

def decode_obs(
Expand Down