diff --git a/.github/workflows/profile.yml b/.github/workflows/profile.yml new file mode 100644 index 000000000..3bdd8ea8a --- /dev/null +++ b/.github/workflows/profile.yml @@ -0,0 +1,22 @@ +name: Data Profile + +on: [push, pull_request] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.8 + uses: actions/setup-python@v2 + with: + python-version: 3.8 + - name: Upgrade pip + run: | + python -m pip install --upgrade pip setuptools wheel + - name: Install dependencies + run: | + pip install ".[dev]" --upgrade + - name: Test with pytest + run: | + pytest test/throughput --durations=0 -v diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index c1e3604ed..84ab74f65 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -28,8 +28,9 @@ jobs: run: | pip install ".[dev]" --upgrade - name: Test with pytest + # ignore test/throughput which only profiles the code run: | - pytest test --cov tianshou --cov-report=xml --durations 0 -v + pytest test --ignore-glob='*profile.py' --cov=tianshou --cov-report=xml --durations=0 -v - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: diff --git a/test/throughput/__init__.py b/test/throughput/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/throughput/test_batch_profile.py b/test/throughput/test_batch_profile.py new file mode 100644 index 000000000..c36dd5f6f --- /dev/null +++ b/test/throughput/test_batch_profile.py @@ -0,0 +1,120 @@ +import copy +import pickle + +import numpy as np +import pytest +import torch + +from tianshou.data import Batch + + +@pytest.fixture(scope="module") +def data(): + print("Initialising data...") + np.random.seed(0) + batch_set = [Batch(a=[j for j in np.arange(1e3)], + b={'b1': (3.14, 3.14), 'b2': np.arange(1e3)}, + c=i) for i in np.arange(int(1e4))] + batch0 = Batch( + a=np.ones((3, 4), dtype=np.float64), + b=Batch( + c=np.ones((1,), dtype=np.float64), + d=torch.ones((3, 3, 3), dtype=torch.float32), + e=list(range(3)) + ) + ) + batchs1 = [copy.deepcopy(batch0) for _ in np.arange(1e4)] + batchs2 = [copy.deepcopy(batch0) for _ in np.arange(1e4)] + batch_len = int(1e4) + batch3 = Batch(obs=[np.arange(20) for _ in np.arange(batch_len)], + reward=np.arange(batch_len)) + indexs = np.random.choice(batch_len, + size=batch_len//10, replace=False) + slice_dict = {'obs': [np.arange(20) + for _ in np.arange(batch_len//10)], + 'reward': np.arange(batch_len//10)} + dict_set = [{'obs': np.arange(20), 'info': "this is info", 'reward': 0} + for _ in np.arange(1e2)] + batch4 = Batch( + a=np.ones((10000, 4), dtype=np.float64), + b=Batch( + c=np.ones((1,), dtype=np.float64), + d=torch.ones((1000, 1000), dtype=torch.float32), + e=np.arange(1000) + ) + ) + + print("Initialised") + return {'batch_set': batch_set, + 'batch0': batch0, + 'batchs1': batchs1, + 'batchs2': batchs2, + 'batch3': batch3, + 'indexs': indexs, + 'dict_set': dict_set, + 'slice_dict': slice_dict, + 'batch4': batch4 + } + + +def test_init(data): + """Test Batch __init__().""" + for _ in np.arange(10): + _ = Batch(data['batch_set']) + + +def test_get_item(data): + """Test get with item.""" + for _ in np.arange(1e5): + _ = data['batch3'][data['indexs']] + + +def test_get_attr(data): + """Test get with attr.""" + for _ in np.arange(1e6): + data['batch3'].get('obs') + data['batch3'].get('reward') + _, _ = data['batch3'].obs, data['batch3'].reward + + +def test_set_item(data): + """Test set with item.""" + for _ in np.arange(1e4): + data['batch3'][data['indexs']] = data['slice_dict'] + + +def test_set_attr(data): + """Test set with attr.""" + for _ in np.arange(1e4): + data['batch3'].c = np.arange(1e3) + data['batch3'].obs = data['dict_set'] + + +def test_numpy_torch_convert(data): + """Test conversion between numpy and torch.""" + for _ in np.arange(1e5): + data['batch4'].to_torch() + data['batch4'].to_numpy() + + +def test_pickle(data): + for _ in np.arange(1e4): + pickle.loads(pickle.dumps(data['batch4'])) + + +def test_cat(data): + """Test cat""" + for i in range(10000): + Batch.cat((data['batch0'], data['batch0'])) + data['batchs1'][i].cat_(data['batch0']) + + +def test_stack(data): + """Test stack""" + for i in range(10000): + Batch.stack((data['batch0'], data['batch0'])) + data['batchs2'][i].stack_([data['batch0']]) + + +if __name__ == '__main__': + pytest.main(["-s", "-k batch_profile", "--durations=0", "-v"]) diff --git a/test/throughput/test_buffer_profile.py b/test/throughput/test_buffer_profile.py new file mode 100644 index 000000000..aec32682a --- /dev/null +++ b/test/throughput/test_buffer_profile.py @@ -0,0 +1,81 @@ +import numpy as np +import pytest + +from tianshou.data import (ListReplayBuffer, PrioritizedReplayBuffer, + ReplayBuffer) + + +@pytest.fixture(scope="module") +def data(): + np.random.seed(0) + obs = {'observable': np.random.rand( + 100, 100), 'hidden': np.random.randint(1000, size=200)} + info = {'policy': "dqn", 'base': np.arange(10)} + add_data = {'obs': obs, 'rew': 1., 'act': np.random.rand(30), + 'done': False, 'obs_next': obs, 'info': info} + buffer = ReplayBuffer(int(1e3), stack_num=100) + buffer2 = ReplayBuffer(int(1e4), stack_num=100) + indexes = np.random.choice(int(1e3), size=3, replace=False) + return{ + 'add_data': add_data, + 'buffer': buffer, + 'buffer2': buffer2, + 'slice': slice(-3000, -1000, 2), + 'indexes': indexes + } + + +def test_init(): + for _ in np.arange(1e5): + _ = ReplayBuffer(1e5) + _ = PrioritizedReplayBuffer( + size=int(1e5), alpha=0.5, + beta=0.5, repeat_sample=True) + _ = ListReplayBuffer() + + +def test_add(data): + buffer = data['buffer'] + for _ in np.arange(1e5): + buffer.add(**data['add_data']) + + +def test_update(data): + buffer = data['buffer'] + buffer2 = data['buffer2'] + for _ in np.arange(1e2): + buffer2.update(buffer) + + +def test_getitem_slice(data): + Slice = data['slice'] + buffer = data['buffer'] + for _ in np.arange(1e3): + _ = buffer[Slice] + + +def test_getitem_indexes(data): + indexes = data['indexes'] + buffer = data['buffer'] + for _ in np.arange(1e2): + _ = buffer[indexes] + + +def test_get(data): + indexes = data['indexes'] + buffer = data['buffer'] + for _ in np.arange(3e2): + buffer.get(indexes, 'obs') + buffer.get(indexes, 'rew') + buffer.get(indexes, 'done') + buffer.get(indexes, 'info') + + +def test_sample(data): + buffer = data['buffer'] + for _ in np.arange(1e1): + buffer.sample(int(1e2)) + + +if __name__ == '__main__': + pytest.main(["-s", "-k buffer_profile", "--durations=0", "-v"]) diff --git a/test/throughput/test_collector_profile.py b/test/throughput/test_collector_profile.py new file mode 100644 index 000000000..7d24d27fb --- /dev/null +++ b/test/throughput/test_collector_profile.py @@ -0,0 +1,167 @@ +import gym +import numpy as np +import pytest +from gym.spaces.discrete import Discrete +from gym.utils import seeding + +from tianshou.data import Batch, Collector, ReplayBuffer +from tianshou.env import VectorEnv, SubprocVectorEnv +from tianshou.policy import BasePolicy + + +class SimpleEnv(gym.Env): + """A simplest example of self-defined env, used to minimize + data collect time and profile collector.""" + + def __init__(self): + self.action_space = Discrete(200) + self._fake_data = np.ones((10, 10, 1)) + self.seed(0) + self.reset() + + def reset(self): + self._index = 0 + self.done = np.random.randint(3, high=200) + return {'observable': np.zeros((10, 10, 1)), + 'hidden': self._index} + + def step(self, action): + if self._index == self.done: + raise ValueError('step after done !!!') + self._index += 1 + return {'observable': self._fake_data, 'hidden': self._index}, -1, \ + self._index == self.done, {} + + def seed(self, seed=None): + self.np_random, seed = seeding.np_random(seed) + return [seed] + + +class SimplePolicy(BasePolicy): + """A simplest example of self-defined policy, used + to minimize data collect time.""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + def learn(self, batch, **kwargs): + return super().learn(batch, **kwargs) + + def forward(self, batch, state=None, **kwargs): + return Batch(act=np.array([30]*len(batch)), state=None, logits=None) + + +@pytest.fixture(scope="module") +def data(): + np.random.seed(0) + env = SimpleEnv() + env.seed(0) + env_vec = VectorEnv( + [lambda: SimpleEnv() for _ in range(100)]) + env_vec.seed(np.random.randint(1000, size=100).tolist()) + env_subproc = SubprocVectorEnv( + [lambda: SimpleEnv() for _ in range(8)]) + env_subproc.seed(np.random.randint(1000, size=100).tolist()) + env_subproc_init = SubprocVectorEnv( + [lambda: SimpleEnv() for _ in range(8)]) + env_subproc_init.seed(np.random.randint(1000, size=100).tolist()) + buffer = ReplayBuffer(50000) + policy = SimplePolicy() + collector = Collector(policy, env, ReplayBuffer(50000)) + collector_vec = Collector(policy, env_vec, ReplayBuffer(50000)) + collector_subproc = Collector(policy, env_subproc, ReplayBuffer(50000)) + return{ + "env": env, + "env_vec": env_vec, + "env_subproc": env_subproc, + "env_subproc_init": env_subproc_init, + "policy": policy, + "buffer": buffer, + "collector": collector, + "collector_vec": collector_vec, + "collector_subproc": collector_subproc + } + + +def test_init(data): + for _ in range(5000): + c = Collector(data["policy"], data["env"], data["buffer"]) + c.close() + + +def test_reset(data): + for _ in range(5000): + data["collector"].reset() + + +def test_collect_st(data): + for _ in range(50): + data["collector"].collect(n_step=1000) + + +def test_collect_ep(data): + for _ in range(50): + data["collector"].collect(n_episode=10) + + +def test_sample(data): + for _ in range(5000): + data["collector"].sample(256) + + +def test_init_vec_env(data): + for _ in range(5000): + c = Collector(data["policy"], data["env_vec"], data["buffer"]) + c.close() + + +def test_reset_vec_env(data): + for _ in range(5000): + data["collector_vec"].reset() + + +def test_collect_vec_env_st(data): + for _ in range(50): + data["collector_vec"].collect(n_step=1000) + + +def test_collect_vec_env_ep(data): + for _ in range(50): + data["collector_vec"].collect(n_episode=10) + + +def test_sample_vec_env(data): + for _ in range(5000): + data["collector_vec"].sample(256) + + +def test_init_subproc_env(data): + for _ in range(5000): + c = Collector(data["policy"], data["env_subproc_init"], data["buffer"]) + """TODO: This should be changed to c.close() in theory, + but currently subproc_env doesn't support that.""" + c.reset() + + +def test_reset_subproc_env(data): + for _ in range(5000): + data["collector_subproc"].reset() + + +def test_collect_subproc_env_st(data): + for _ in range(50): + data["collector_subproc"].collect(n_step=1000) + + +def test_collect_subproc_env_ep(data): + for _ in range(50): + data["collector_subproc"].collect(n_episode=10) + + +def test_sample_subproc_env(data): + for _ in range(5000): + data["collector_subproc"].sample(256) + + +if __name__ == '__main__': + pytest.main(["-s", "-k collector_profile", "--durations=0", "-v"])