diff --git a/README.md b/README.md index 7226d7c52..c50321f98 100644 --- a/README.md +++ b/README.md @@ -158,13 +158,14 @@ Currently, the overall code of Tianshou platform is less than 2500 lines. Most o ```python result = collector.collect(n_step=n) ``` - -If you have 3 environments in total and want to collect 1 episode in the first environment, 3 for the third environment: +If you have 3 environments in total and want to collect 4 episodes: ```python -result = collector.collect(n_episode=[1, 0, 3]) +result = collector.collect(n_episode=4) ``` +Collector will collect exactly 4 episodes without any bias of episode length despite we only have 3 parallel environments. + If you want to train the given policy with a sampled batch: ```python @@ -194,7 +195,7 @@ train_num, test_num = 8, 100 gamma, n_step, target_freq = 0.9, 3, 320 buffer_size = 20000 eps_train, eps_test = 0.1, 0.05 -step_per_epoch, collect_per_step = 1000, 10 +step_per_epoch, collect_per_step = 1000, 8 writer = SummaryWriter('log/dqn') # tensorboard is also supported! ``` @@ -223,8 +224,8 @@ Setup policy and collectors: ```python policy = ts.policy.DQNPolicy(net, optim, gamma, n_step, target_update_freq=target_freq) -train_collector = ts.data.Collector(policy, train_envs, ts.data.ReplayBuffer(buffer_size)) -test_collector = ts.data.Collector(policy, test_envs) +train_collector = ts.data.Collector(policy, train_envs, ts.data.VectorReplayBuffer(buffer_size, train_num), exploration_noise=True) +test_collector = ts.data.Collector(policy, test_envs, exploration_noise=True) # because DQN uses epsilon-greedy method ``` Let's train it: @@ -252,7 +253,7 @@ Watch the performance with 35 FPS: ```python policy.eval() policy.set_eps(eps_test) -collector = ts.data.Collector(policy, env) +collector = ts.data.Collector(policy, env, exploration_noise=True) collector.collect(n_episode=1, render=1 / 35) ``` diff --git a/docs/api/tianshou.data.rst b/docs/api/tianshou.data.rst index fa1d5c738..555d35640 100644 --- a/docs/api/tianshou.data.rst +++ b/docs/api/tianshou.data.rst @@ -1,7 +1,29 @@ tianshou.data ============= -.. automodule:: tianshou.data + +Batch +----- + +.. automodule:: tianshou.data.batch + :members: + :undoc-members: + :show-inheritance: + + +Buffer +------ + +.. automodule:: tianshou.data.buffer + :members: + :undoc-members: + :show-inheritance: + + +Collector +--------- + +.. automodule:: tianshou.data.collector :members: :undoc-members: :show-inheritance: diff --git a/docs/api/tianshou.env.rst b/docs/api/tianshou.env.rst index 7201bae46..f7eec6998 100644 --- a/docs/api/tianshou.env.rst +++ b/docs/api/tianshou.env.rst @@ -1,11 +1,19 @@ tianshou.env ============ + +VectorEnv +--------- + .. automodule:: tianshou.env :members: :undoc-members: :show-inheritance: + +Worker +------ + .. automodule:: tianshou.env.worker :members: :undoc-members: diff --git a/docs/api/tianshou.utils.rst b/docs/api/tianshou.utils.rst index 3a293b1c1..b2ac6a976 100644 --- a/docs/api/tianshou.utils.rst +++ b/docs/api/tianshou.utils.rst @@ -6,6 +6,10 @@ tianshou.utils :undoc-members: :show-inheritance: + +Pre-defined Networks +-------------------- + .. automodule:: tianshou.utils.net.common :members: :undoc-members: diff --git a/docs/conf.py b/docs/conf.py index f7bcc562d..b981eb4d4 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -70,6 +70,7 @@ ] ) } +autodoc_member_order = "bysource" bibtex_bibfiles = ['refs.bib'] # -- Options for HTML output ------------------------------------------------- diff --git a/docs/tutorials/cheatsheet.rst b/docs/tutorials/cheatsheet.rst index 07155f356..fefb30934 100644 --- a/docs/tutorials/cheatsheet.rst +++ b/docs/tutorials/cheatsheet.rst @@ -144,7 +144,7 @@ And finally, :: test_processor = MyProcessor(size=100) - collector = Collector(policy, env, buffer, test_processor.preprocess_fn) + collector = Collector(policy, env, buffer, preprocess_fn=test_processor.preprocess_fn) Some examples are in `test/base/test_collector.py `_. @@ -156,7 +156,7 @@ RNN-style Training This is related to `Issue 19 `_. -First, add an argument "stack_num" to :class:`~tianshou.data.ReplayBuffer`: +First, add an argument "stack_num" to :class:`~tianshou.data.ReplayBuffer`, :class:`~tianshou.data.VectorReplayBuffer`, or other types of buffer you are using, like: :: buf = ReplayBuffer(size=size, stack_num=stack_num) @@ -206,14 +206,13 @@ The state can be a ``numpy.ndarray`` or a Python dictionary. Take "FetchReach-v1 It shows that the state is a dictionary which has 3 keys. It will stored in :class:`~tianshou.data.ReplayBuffer` as: :: - >>> from tianshou.data import ReplayBuffer + >>> from tianshou.data import Batch, ReplayBuffer >>> b = ReplayBuffer(size=3) - >>> b.add(obs=e.reset(), act=0, rew=0, done=0) + >>> b.add(Batch(obs=e.reset(), act=0, rew=0, done=0)) >>> print(b) ReplayBuffer( act: array([0, 0, 0]), - done: array([0, 0, 0]), - info: Batch(), + done: array([False, False, False]), obs: Batch( achieved_goal: array([[1.34183265, 0.74910039, 0.53472272], [0. , 0. , 0. ], @@ -234,7 +233,6 @@ It shows that the state is a dictionary which has 3 keys. It will stored in :cla 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00]]), ), - policy: Batch(), rew: array([0, 0, 0]), ) >>> print(b.obs.achieved_goal) @@ -278,7 +276,7 @@ For self-defined class, the replay buffer will store the reference into a ``nump >>> import networkx as nx >>> b = ReplayBuffer(size=3) - >>> b.add(obs=nx.Graph(), act=0, rew=0, done=0) + >>> b.add(Batch(obs=nx.Graph(), act=0, rew=0, done=0)) >>> print(b) ReplayBuffer( act: array([0, 0, 0]), @@ -299,6 +297,10 @@ But the state stored in the buffer may be a shallow-copy. To make sure each of y ... return copy.deepcopy(self.graph), reward, done, {} +.. note :: + + Please make sure this variable is numpy-compatible, e.g., np.array([variable]) will not result in an empty array. Otherwise, ReplayBuffer cannot create an numpy array to store it. + .. _marl_example: diff --git a/docs/tutorials/concepts.rst b/docs/tutorials/concepts.rst index a314cdedb..b3e126352 100644 --- a/docs/tutorials/concepts.rst +++ b/docs/tutorials/concepts.rst @@ -53,7 +53,7 @@ In short, you can define a :class:`~tianshou.data.Batch` with any key-value pair Buffer ------ -:class:`~tianshou.data.ReplayBuffer` stores data generated from interaction between the policy and environment. ReplayBuffer can be considered as a specialized form (or management) of Batch. It stores all the data in a batch with circular-queue style. +:class:`~tianshou.data.ReplayBuffer` stores data generated from interaction between the policy and environment. ReplayBuffer can be considered as a specialized form (or management) of :class:`~tianshou.data.Batch`. It stores all the data in a batch with circular-queue style. The current implementation of Tianshou typically use 7 reserved keys in :class:`~tianshou.data.Batch`: @@ -209,7 +209,7 @@ The following code snippet illustrates its usage, including:
-Tianshou provides other type of data buffer such as :class:`~tianshou.data.ListReplayBuffer` (based on list), :class:`~tianshou.data.PrioritizedReplayBuffer` (based on Segment Tree and ``numpy.ndarray``), :class:`~tianshou.data.CachedReplayBuffer` (add different episodes' data but without losing chronological order). Check out :class:`~tianshou.data.ReplayBuffer` for more detail. +Tianshou provides other type of data buffer such as :class:`~tianshou.data.PrioritizedReplayBuffer` (based on Segment Tree and ``numpy.ndarray``) and :class:`~tianshou.data.VectorReplayBuffer` (add different episodes' data but without losing chronological order). Check out :class:`~tianshou.data.ReplayBuffer` for more detail. Policy @@ -339,14 +339,12 @@ Collector The :class:`~tianshou.data.Collector` enables the policy to interact with different types of environments conveniently. -:meth:`~tianshou.data.Collector.collect` is the main method of Collector: it let the policy perform (at least) a specified number of step ``n_step`` or episode ``n_episode`` and store the data in the replay buffer. - -Why do we mention **at least** here? For multiple environments, we could not directly store the collected data into the replay buffer, since it breaks the principle of storing data chronologically. - -The proposed solution is to add some cache buffers inside the collector. Once collecting **a full episode of trajectory**, it will move the stored data from the cache buffer to the main buffer. To satisfy this condition, the collector will interact with environments that may exceed the given step number or episode number. +:meth:`~tianshou.data.Collector.collect` is the main method of Collector: it let the policy perform a specified number of step ``n_step`` or episode ``n_episode`` and store the data in the replay buffer, then return the statistics of the collected data such as episode's total reward. The general explanation is listed in :ref:`pseudocode`. Other usages of collector are listed in :class:`~tianshou.data.Collector` documentation. +There is also another type of collector :class:`~tianshou.data.AsyncCollector` which supports asynchronous environment setting (for those taking a long time to step). However, AsyncCollector only supports **at least** ``n_step`` or ``n_episode`` collection due to the property of asynchronous environments. + Trainer ------- diff --git a/docs/tutorials/dqn.rst b/docs/tutorials/dqn.rst index faea6a869..361f79f3c 100644 --- a/docs/tutorials/dqn.rst +++ b/docs/tutorials/dqn.rst @@ -113,8 +113,8 @@ The collector is a key concept in Tianshou. It allows the policy to interact wit In each step, the collector will let the policy perform (at least) a specified number of steps or episodes and store the data in a replay buffer. :: - train_collector = ts.data.Collector(policy, train_envs, ts.data.ReplayBuffer(size=20000)) - test_collector = ts.data.Collector(policy, test_envs) + train_collector = ts.data.Collector(policy, train_envs, ts.data.VectorReplayBuffer(20000, 8), exploration_noise=True) + test_collector = ts.data.Collector(policy, test_envs, exploration_noise=True) Train Policy with a Trainer @@ -191,7 +191,7 @@ Watch the Agent's Performance policy.eval() policy.set_eps(0.05) - collector = ts.data.Collector(policy, env) + collector = ts.data.Collector(policy, env, exploration_noise=True) collector.collect(n_episode=1, render=1 / 35) @@ -206,8 +206,7 @@ Tianshou supports user-defined training code. Here is the code snippet: :: # pre-collect at least 5000 frames with random action before training - policy.set_eps(1) - train_collector.collect(n_step=5000) + train_collector.collect(n_step=5000, random=True) policy.set_eps(0.1) for i in range(int(1e6)): # total step @@ -215,11 +214,11 @@ Tianshou supports user-defined training code. Here is the code snippet: # once if the collected episodes' mean returns reach the threshold, # or every 1000 steps, we test it on test_collector - if collect_result['rew'] >= env.spec.reward_threshold or i % 1000 == 0: + if collect_result['rews'].mean() >= env.spec.reward_threshold or i % 1000 == 0: policy.set_eps(0.05) result = test_collector.collect(n_episode=100) - if result['rew'] >= env.spec.reward_threshold: - print(f'Finished training! Test mean returns: {result["rew"]}') + if result['rews'].mean() >= env.spec.reward_threshold: + print(f'Finished training! Test mean returns: {result["rews"].mean()}') break else: # back to training eps diff --git a/docs/tutorials/tictactoe.rst b/docs/tutorials/tictactoe.rst index ac9adc118..c656c1ee2 100644 --- a/docs/tutorials/tictactoe.rst +++ b/docs/tutorials/tictactoe.rst @@ -128,7 +128,7 @@ Tianshou already provides some builtin classes for multi-agent learning. You can >>> >>> # use collectors to collect a episode of trajectories >>> # the reward is a vector, so we need a scalar metric to monitor the training - >>> collector = Collector(policy, env, reward_metric=lambda x: x[0]) + >>> collector = Collector(policy, env) >>> >>> # you will see a long trajectory showing the board status at each timestep >>> result = collector.collect(n_episode=1, render=.1) @@ -180,7 +180,7 @@ So let's start to train our Tic-Tac-Toe agent! First, import some required modul from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer - from tianshou.data import Collector, ReplayBuffer + from tianshou.data import Collector, VectorReplayBuffer from tianshou.policy import BasePolicy, RandomPolicy, DQNPolicy, MultiAgentPolicyManager from tic_tac_toe_env import TicTacToeEnv @@ -199,27 +199,27 @@ The explanation of each Tianshou class/function will be deferred to their first help='a smaller gamma favors earlier win') parser.add_argument('--n-step', type=int, default=3) parser.add_argument('--target-update-freq', type=int, default=320) - parser.add_argument('--epoch', type=int, default=10) - parser.add_argument('--step-per-epoch', type=int, default=1000) + parser.add_argument('--epoch', type=int, default=20) + parser.add_argument('--step-per-epoch', type=int, default=500) parser.add_argument('--collect-per-step', type=int, default=10) parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128, 128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.1) - parser.add_argument('--board_size', type=int, default=6) - parser.add_argument('--win_size', type=int, default=4) - parser.add_argument('--win-rate', type=float, default=np.float32(0.9), + parser.add_argument('--board-size', type=int, default=6) + parser.add_argument('--win-size', type=int, default=4) + parser.add_argument('--win-rate', type=float, default=0.9, help='the expected winning rate') parser.add_argument('--watch', default=False, action='store_true', help='no training, watch the play of pre-trained models') - parser.add_argument('--agent_id', type=int, default=2, + parser.add_argument('--agent-id', type=int, default=2, help='the learned agent plays as the agent_id-th player. Choices are 1 and 2.') - parser.add_argument('--resume_path', type=str, default='', + parser.add_argument('--resume-path', type=str, default='', help='the path of agent pth file for resuming from a pre-trained agent') - parser.add_argument('--opponent_path', type=str, default='', + parser.add_argument('--opponent-path', type=str, default='', help='the path of opponent agent pth file for resuming from a pre-trained agent') parser.add_argument('--device', type=str, default='cuda' if torch.cuda.is_available() else 'cpu') @@ -240,11 +240,13 @@ Both agents are passed to :class:`~tianshou.policy.MultiAgentPolicyManager`, whi Here it is: :: - def get_agents(args=get_args(), - agent_learn=None, # BasePolicy - agent_opponent=None, # BasePolicy - optim=None, # torch.optim.Optimizer - ): # return a tuple of (BasePolicy, torch.optim.Optimizer) + def get_agents( + args=get_args(), + agent_learn=None, # BasePolicy + agent_opponent=None, # BasePolicy + optim=None, # torch.optim.Optimizer + ): # return a tuple of (BasePolicy, torch.optim.Optimizer) + env = TicTacToeEnv(args.board_size, args.win_size) args.state_shape = env.observation_space.shape or env.observation_space.n args.action_shape = env.action_space.shape or env.action_space.n @@ -279,9 +281,6 @@ With the above preparation, we are close to the first learned agent. The followi :: args = get_args() - # the reward is a vector, we need a scalar metric to monitor the training. - # we choose the reward of the learning agent - Collector._default_rew_metric = lambda x: x[args.agent_id - 1] # ======== a test function that tests a pre-trained agent and exit ====== def watch(args=get_args(), @@ -294,7 +293,7 @@ With the above preparation, we are close to the first learned agent. The followi policy.policies[args.agent_id - 1].set_eps(args.eps_test) collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + print(f'Final reward:{result["rews"].mean()}, length: {result["lens"].mean()}') if args.watch: watch(args) exit(0) @@ -313,9 +312,10 @@ With the above preparation, we are close to the first learned agent. The followi policy, optim = get_agents() # ======== collector setup ========= - train_collector = Collector(policy, train_envs, ReplayBuffer(args.buffer_size)) - test_collector = Collector(policy, test_envs) - train_collector.collect(n_step=args.batch_size) + buffer = VectorReplayBuffer(args.buffer_size, args.training_num) + train_collector = Collector(policy, train_envs, buffer, exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) + train_collector.collect(n_step=args.batch_size * args.training_num) # ======== tensorboard logging setup ========= if not hasattr(args, 'writer'): @@ -347,13 +347,18 @@ With the above preparation, we are close to the first learned agent. The followi def test_fn(epoch, env_step): policy.policies[args.agent_id - 1].set_eps(args.eps_test) + # the reward is a vector, we need a scalar metric to monitor the training. + # we choose the reward of the learning agent + def reward_metric(rews): + return rews[:, args.agent_id - 1] + # start training, this may require about three minutes result = offpolicy_trainer( policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.collect_per_step, args.test_num, args.batch_size, train_fn=train_fn, test_fn=test_fn, - stop_fn=stop_fn, save_fn=save_fn, writer=writer, - test_in_train=False) + stop_fn=stop_fn, save_fn=save_fn, reward_metric=reward_metric, + writer=writer, test_in_train=False) agent = policy.policies[args.agent_id - 1] # let's watch the match! @@ -476,7 +481,7 @@ By default, the trained agent is stored in ``log/tic_tac_toe/dqn/policy.pth``. Y .. code-block:: console - $ python test_tic_tac_toe.py --watch --resume_path=log/tic_tac_toe/dqn/policy.pth --opponent_path=log/tic_tac_toe/dqn/policy.pth + $ python test_tic_tac_toe.py --watch --resume-path log/tic_tac_toe/dqn/policy.pth --opponent-path log/tic_tac_toe/dqn/policy.pth Here is our output: diff --git a/examples/atari/atari_bcq.py b/examples/atari/atari_bcq.py index e9edb8310..e2b8f0778 100644 --- a/examples/atari/atari_bcq.py +++ b/examples/atari/atari_bcq.py @@ -111,7 +111,7 @@ def test_discrete_bcq(args=get_args()): exit(0) # collector - test_collector = Collector(policy, test_envs) + test_collector = Collector(policy, test_envs, exploration_noise=True) log_path = os.path.join(args.logdir, args.task, 'discrete_bcq') writer = SummaryWriter(log_path) @@ -130,7 +130,7 @@ def watch(): test_envs.seed(args.seed) print("Testing agent ...") test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, + result = test_collector.collect(n_episode=args.test_num, render=args.render) pprint.pprint(result) diff --git a/examples/atari/atari_c51.py b/examples/atari/atari_c51.py index c74da8f13..42cffa9be 100644 --- a/examples/atari/atari_c51.py +++ b/examples/atari/atari_c51.py @@ -8,7 +8,7 @@ from tianshou.policy import C51Policy from tianshou.env import SubprocVectorEnv from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from atari_network import C51 from atari_wrapper import wrap_deepmind @@ -33,7 +33,7 @@ def get_args(): parser.add_argument('--step-per-epoch', type=int, default=10000) parser.add_argument('--collect-per-step', type=int, default=10) parser.add_argument('--batch-size', type=int, default=32) - parser.add_argument('--training-num', type=int, default=16) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=10) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -84,17 +84,16 @@ def test_c51(args=get_args()): ).to(args.device) # load a previous policy if args.resume_path: - policy.load_state_dict(torch.load( - args.resume_path, map_location=args.device - )) + policy.load_state_dict(torch.load(args.resume_path, map_location=args.device)) print("Loaded agent from: ", args.resume_path) # replay buffer: `save_last_obs` and `stack_num` can be removed together # when you have enough RAM - buffer = ReplayBuffer(args.buffer_size, ignore_obs_next=True, - save_only_last_obs=True, stack_num=args.frames_stack) + buffer = VectorReplayBuffer( + args.buffer_size, buffer_num=len(train_envs), ignore_obs_next=True, + save_only_last_obs=True, stack_num=args.frames_stack) # collector - train_collector = Collector(policy, train_envs, buffer) - test_collector = Collector(policy, test_envs) + train_collector = Collector(policy, train_envs, buffer, exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # log log_path = os.path.join(args.logdir, args.task, 'c51') writer = SummaryWriter(log_path) @@ -130,8 +129,7 @@ def watch(): policy.set_eps(args.eps_test) test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) + result = test_collector.collect(n_episode=args.test_num, render=args.render) pprint.pprint(result) if args.watch: @@ -139,7 +137,7 @@ def watch(): exit(0) # test train_collector and start filling replay buffer - train_collector.collect(n_step=args.batch_size * 4) + train_collector.collect(n_step=args.batch_size * args.training_num) # trainer result = offpolicy_trainer( policy, train_collector, test_collector, args.epoch, diff --git a/examples/atari/atari_dqn.py b/examples/atari/atari_dqn.py index 4f4c4f2df..559b0878e 100644 --- a/examples/atari/atari_dqn.py +++ b/examples/atari/atari_dqn.py @@ -8,7 +8,7 @@ from tianshou.policy import DQNPolicy from tianshou.env import SubprocVectorEnv from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from atari_network import DQN from atari_wrapper import wrap_deepmind @@ -30,7 +30,7 @@ def get_args(): parser.add_argument('--step-per-epoch', type=int, default=10000) parser.add_argument('--collect-per-step', type=int, default=10) parser.add_argument('--batch-size', type=int, default=32) - parser.add_argument('--training-num', type=int, default=16) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=10) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -80,17 +80,16 @@ def test_dqn(args=get_args()): target_update_freq=args.target_update_freq) # load a previous policy if args.resume_path: - policy.load_state_dict(torch.load( - args.resume_path, map_location=args.device - )) + policy.load_state_dict(torch.load(args.resume_path, map_location=args.device)) print("Loaded agent from: ", args.resume_path) # replay buffer: `save_last_obs` and `stack_num` can be removed together # when you have enough RAM - buffer = ReplayBuffer(args.buffer_size, ignore_obs_next=True, - save_only_last_obs=True, stack_num=args.frames_stack) + buffer = VectorReplayBuffer( + args.buffer_size, buffer_num=len(train_envs), ignore_obs_next=True, + save_only_last_obs=True, stack_num=args.frames_stack) # collector - train_collector = Collector(policy, train_envs, buffer) - test_collector = Collector(policy, test_envs) + train_collector = Collector(policy, train_envs, buffer, exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # log log_path = os.path.join(args.logdir, args.task, 'dqn') writer = SummaryWriter(log_path) @@ -127,9 +126,10 @@ def watch(): test_envs.seed(args.seed) if args.save_buffer_name: print(f"Generate buffer with size {args.buffer_size}") - buffer = ReplayBuffer( - args.buffer_size, ignore_obs_next=True, - save_only_last_obs=True, stack_num=args.frames_stack) + buffer = VectorReplayBuffer( + args.buffer_size, buffer_num=len(test_envs), + ignore_obs_next=True, save_only_last_obs=True, + stack_num=args.frames_stack) collector = Collector(policy, test_envs, buffer) result = collector.collect(n_step=args.buffer_size) print(f"Save buffer into {args.save_buffer_name}") @@ -138,7 +138,7 @@ def watch(): else: print("Testing agent ...") test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, + result = test_collector.collect(n_episode=args.test_num, render=args.render) pprint.pprint(result) @@ -147,7 +147,7 @@ def watch(): exit(0) # test train_collector and start filling replay buffer - train_collector.collect(n_step=args.batch_size * 4) + train_collector.collect(n_step=args.batch_size * args.training_num) # trainer result = offpolicy_trainer( policy, train_collector, test_collector, args.epoch, diff --git a/examples/atari/atari_qrdqn.py b/examples/atari/atari_qrdqn.py index 08c34733c..ed356381f 100644 --- a/examples/atari/atari_qrdqn.py +++ b/examples/atari/atari_qrdqn.py @@ -8,7 +8,7 @@ from tianshou.policy import QRDQNPolicy from tianshou.env import SubprocVectorEnv from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from atari_network import QRDQN from atari_wrapper import wrap_deepmind @@ -31,7 +31,7 @@ def get_args(): parser.add_argument('--step-per-epoch', type=int, default=10000) parser.add_argument('--collect-per-step', type=int, default=10) parser.add_argument('--batch-size', type=int, default=32) - parser.add_argument('--training-num', type=int, default=16) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=10) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -82,17 +82,16 @@ def test_qrdqn(args=get_args()): ).to(args.device) # load a previous policy if args.resume_path: - policy.load_state_dict(torch.load( - args.resume_path, map_location=args.device - )) + policy.load_state_dict(torch.load(args.resume_path, map_location=args.device)) print("Loaded agent from: ", args.resume_path) # replay buffer: `save_last_obs` and `stack_num` can be removed together # when you have enough RAM - buffer = ReplayBuffer(args.buffer_size, ignore_obs_next=True, - save_only_last_obs=True, stack_num=args.frames_stack) + buffer = VectorReplayBuffer( + args.buffer_size, buffer_num=len(train_envs), + ignore_obs_next=True, save_only_last_obs=True, stack_num=args.frames_stack) # collector - train_collector = Collector(policy, train_envs, buffer) - test_collector = Collector(policy, test_envs) + train_collector = Collector(policy, train_envs, buffer, exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # log log_path = os.path.join(args.logdir, args.task, 'qrdqn') writer = SummaryWriter(log_path) @@ -128,8 +127,7 @@ def watch(): policy.set_eps(args.eps_test) test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) + result = test_collector.collect(n_episode=args.test_num, render=args.render) pprint.pprint(result) if args.watch: @@ -137,7 +135,7 @@ def watch(): exit(0) # test train_collector and start filling replay buffer - train_collector.collect(n_step=args.batch_size * 4) + train_collector.collect(n_step=args.batch_size * args.training_num) # trainer result = offpolicy_trainer( policy, train_collector, test_collector, args.epoch, diff --git a/examples/atari/runnable/pong_a2c.py b/examples/atari/runnable/pong_a2c.py index ffed1694d..0b81cecd6 100644 --- a/examples/atari/runnable/pong_a2c.py +++ b/examples/atari/runnable/pong_a2c.py @@ -9,7 +9,7 @@ from tianshou.env import SubprocVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import onpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.discrete import Actor, Critic from atari import create_atari_environment, preprocess_fn @@ -75,8 +75,9 @@ def test_a2c(args=get_args()): ent_coef=args.ent_coef, max_grad_norm=args.max_grad_norm) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size), - preprocess_fn=preprocess_fn) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, buffer_num=len(train_envs)), + preprocess_fn=preprocess_fn, exploration_noise=True) test_collector = Collector(policy, test_envs, preprocess_fn=preprocess_fn) # log writer = SummaryWriter(os.path.join(args.logdir, args.task, 'a2c')) @@ -98,7 +99,8 @@ def stop_fn(mean_rewards): env = create_atari_environment(args.task) collector = Collector(policy, env, preprocess_fn=preprocess_fn) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/atari/runnable/pong_ppo.py b/examples/atari/runnable/pong_ppo.py index 35ed0e749..8ed04c21e 100644 --- a/examples/atari/runnable/pong_ppo.py +++ b/examples/atari/runnable/pong_ppo.py @@ -9,7 +9,7 @@ from tianshou.env import SubprocVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import onpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.discrete import Actor, Critic from atari import create_atari_environment, preprocess_fn @@ -79,8 +79,9 @@ def test_ppo(args=get_args()): action_range=None) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size), - preprocess_fn=preprocess_fn) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, buffer_num=len(train_envs)), + preprocess_fn=preprocess_fn, exploration_noise=True) test_collector = Collector(policy, test_envs, preprocess_fn=preprocess_fn) # log writer = SummaryWriter(os.path.join(args.logdir, args.task, 'ppo')) @@ -102,7 +103,8 @@ def stop_fn(mean_rewards): env = create_atari_environment(args.task) collector = Collector(policy, env, preprocess_fn=preprocess_fn) result = collector.collect(n_step=2000, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/box2d/acrobot_dualdqn.py b/examples/box2d/acrobot_dualdqn.py index 071990069..444c357f8 100644 --- a/examples/box2d/acrobot_dualdqn.py +++ b/examples/box2d/acrobot_dualdqn.py @@ -10,7 +10,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer def get_args(): @@ -28,13 +28,12 @@ def get_args(): parser.add_argument('--step-per-epoch', type=int, default=1000) parser.add_argument('--collect-per-step', type=int, default=100) parser.add_argument('--batch-size', type=int, default=64) - parser.add_argument('--hidden-sizes', type=int, - nargs='*', default=[128]) + parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128]) parser.add_argument('--dueling-q-hidden-sizes', type=int, nargs='*', default=[128, 128]) parser.add_argument('--dueling-v-hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -72,10 +71,12 @@ def test_dqn(args=get_args()): target_update_freq=args.target_update_freq) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) - test_collector = Collector(policy, test_envs) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # policy.set_eps(1) - train_collector.collect(n_step=args.batch_size) + train_collector.collect(n_step=args.batch_size * args.training_num) # log log_path = os.path.join(args.logdir, args.task, 'dqn') writer = SummaryWriter(log_path) @@ -114,9 +115,9 @@ def test_fn(epoch, env_step): policy.set_eps(args.eps_test) test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + result = test_collector.collect(n_episode=args.test_num, render=args.render) + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/box2d/bipedal_hardcore_sac.py b/examples/box2d/bipedal_hardcore_sac.py index 58eb98ec9..0bf802d3b 100644 --- a/examples/box2d/bipedal_hardcore_sac.py +++ b/examples/box2d/bipedal_hardcore_sac.py @@ -10,7 +10,7 @@ from tianshou.utils.net.common import Net from tianshou.env import SubprocVectorEnv from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import ActorProb, Critic @@ -32,7 +32,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -125,7 +125,9 @@ def test_sac_bipedal(args=get_args()): # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -151,9 +153,10 @@ def stop_fn(mean_rewards): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, + result = test_collector.collect(n_episode=args.test_num, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/box2d/lunarlander_dqn.py b/examples/box2d/lunarlander_dqn.py index 5c98fb779..de88aa315 100644 --- a/examples/box2d/lunarlander_dqn.py +++ b/examples/box2d/lunarlander_dqn.py @@ -9,7 +9,7 @@ from tianshou.policy import DQNPolicy from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.env import DummyVectorEnv, SubprocVectorEnv @@ -35,7 +35,7 @@ def get_args(): nargs='*', default=[128, 128]) parser.add_argument('--dueling-v-hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=10) + parser.add_argument('--training-num', type=int, default=16) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -73,10 +73,12 @@ def test_dqn(args=get_args()): target_update_freq=args.target_update_freq) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) - test_collector = Collector(policy, test_envs) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # policy.set_eps(1) - train_collector.collect(n_step=args.batch_size) + train_collector.collect(n_step=args.batch_size * args.training_num) # log log_path = os.path.join(args.logdir, args.task, 'dqn') writer = SummaryWriter(log_path) @@ -110,9 +112,9 @@ def test_fn(epoch, env_step): policy.set_eps(args.eps_test) test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + result = test_collector.collect(n_episode=args.test_num, render=args.render) + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/box2d/mcc_sac.py b/examples/box2d/mcc_sac.py index 47ca4e25c..14e5095e7 100644 --- a/examples/box2d/mcc_sac.py +++ b/examples/box2d/mcc_sac.py @@ -8,7 +8,7 @@ from tianshou.policy import SACPolicy from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.env import DummyVectorEnv from tianshou.exploration import OUNoise from tianshou.utils.net.common import Net @@ -34,7 +34,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=16) + parser.add_argument('--training-num', type=int, default=5) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -94,7 +94,9 @@ def test_sac(args=get_args()): exploration_noise=OUNoise(0.0, args.noise_std)) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -119,9 +121,9 @@ def stop_fn(mean_rewards): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + result = test_collector.collect(n_episode=args.test_num, render=args.render) + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/mujoco/mujoco_sac.py b/examples/mujoco/mujoco_sac.py index 0b09c81a5..ba9cd79a3 100644 --- a/examples/mujoco/mujoco_sac.py +++ b/examples/mujoco/mujoco_sac.py @@ -10,7 +10,7 @@ from tianshou.env import SubprocVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import ActorProb, Critic @@ -35,7 +35,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=256) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=16) + parser.add_argument('--training-num', type=int, default=4) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -108,7 +108,9 @@ def test_sac(args=get_args()): # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # log log_path = os.path.join(args.logdir, args.task, 'sac') @@ -120,8 +122,7 @@ def watch(): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) + result = test_collector.collect(n_episode=args.test_num, render=args.render) pprint.pprint(result) def save_fn(policy): diff --git a/examples/mujoco/runnable/ant_v2_ddpg.py b/examples/mujoco/runnable/ant_v2_ddpg.py index 13192dbc9..b9a6e0118 100644 --- a/examples/mujoco/runnable/ant_v2_ddpg.py +++ b/examples/mujoco/runnable/ant_v2_ddpg.py @@ -10,7 +10,7 @@ from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer from tianshou.exploration import GaussianNoise -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import Actor, Critic @@ -30,7 +30,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=4) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -74,7 +74,9 @@ def test_ddpg(args=get_args()): reward_normalization=True, ignore_done=True) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # log writer = SummaryWriter(args.logdir + '/' + 'ddpg') @@ -94,9 +96,9 @@ def stop_fn(mean_rewards): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + result = test_collector.collect(n_episode=args.test_num, render=args.render) + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/mujoco/runnable/ant_v2_td3.py b/examples/mujoco/runnable/ant_v2_td3.py index 5ed45506a..2f8370217 100644 --- a/examples/mujoco/runnable/ant_v2_td3.py +++ b/examples/mujoco/runnable/ant_v2_td3.py @@ -10,7 +10,7 @@ from tianshou.utils.net.common import Net from tianshou.exploration import GaussianNoise from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import Actor, Critic @@ -33,7 +33,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -82,7 +82,9 @@ def test_td3(args=get_args()): reward_normalization=True, ignore_done=True) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -103,9 +105,9 @@ def stop_fn(mean_rewards): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + result = test_collector.collect(n_episode=args.test_num, render=args.render) + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/mujoco/runnable/halfcheetahBullet_v0_sac.py b/examples/mujoco/runnable/halfcheetahBullet_v0_sac.py index b669d264a..6f34ce0ad 100644 --- a/examples/mujoco/runnable/halfcheetahBullet_v0_sac.py +++ b/examples/mujoco/runnable/halfcheetahBullet_v0_sac.py @@ -11,7 +11,7 @@ from tianshou.utils.net.common import Net from tianshou.env import SubprocVectorEnv from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import ActorProb, Critic @@ -32,7 +32,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=4) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--log-interval', type=int, default=100) @@ -82,7 +82,9 @@ def test_sac(args=get_args()): reward_normalization=True, ignore_done=True) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -105,9 +107,10 @@ def stop_fn(mean_rewards): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, + result = test_collector.collect(n_episode=args.test_num, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/examples/mujoco/runnable/point_maze_td3.py b/examples/mujoco/runnable/point_maze_td3.py index dbb612fc8..76271f40f 100644 --- a/examples/mujoco/runnable/point_maze_td3.py +++ b/examples/mujoco/runnable/point_maze_td3.py @@ -10,7 +10,7 @@ from tianshou.env import SubprocVectorEnv from tianshou.exploration import GaussianNoise from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import Actor, Critic from mujoco.register import reg @@ -35,7 +35,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -87,7 +87,9 @@ def test_td3(args=get_args()): reward_normalization=True, ignore_done=True) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -111,9 +113,10 @@ def stop_fn(mean_rewards): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, + result = test_collector.collect(n_episode=args.test_num, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/setup.cfg b/setup.cfg index 0a4742891..d485e6d06 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,14 @@ +[flake8] +exclude = + .git + log + __pycache__ + docs + build + dist + *.egg-info +max-line-length = 87 + [mypy] files = tianshou/**/*.py allow_redefinition = True diff --git a/test/base/env.py b/test/base/env.py index f0907f8e1..e71957a7e 100644 --- a/test/base/env.py +++ b/test/base/env.py @@ -10,15 +10,16 @@ class MyTestEnv(gym.Env): """ def __init__(self, size, sleep=0, dict_state=False, recurse_state=False, - ma_rew=0, multidiscrete_action=False, random_sleep=False): - assert not ( - dict_state and recurse_state), \ - "dict_state and recurse_state cannot both be true" + ma_rew=0, multidiscrete_action=False, random_sleep=False, + array_state=False): + assert dict_state + recurse_state + array_state <= 1, \ + "dict_state / recurse_state / array_state can be only one true" self.size = size self.sleep = sleep self.random_sleep = random_sleep self.dict_state = dict_state self.recurse_state = recurse_state + self.array_state = array_state self.ma_rew = ma_rew self._md_action = multidiscrete_action # how many steps this env has stepped @@ -36,6 +37,8 @@ def __init__(self, size, sleep=0, dict_state=False, recurse_state=False, "rand": Box(shape=(1, 2), low=0, high=1, dtype=np.float64)}) }) + elif array_state: + self.observation_space = Box(shape=(4, 84, 84), low=0, high=255) else: self.observation_space = Box(shape=(1, ), low=0, high=size - 1) if multidiscrete_action: @@ -72,6 +75,13 @@ def _get_state(self): 'dict': {"tuple": (np.array([1], dtype=np.int64), self.rng.rand(2)), "rand": self.rng.rand(1, 2)}} + elif self.array_state: + img = np.zeros([4, 84, 84], np.int) + img[3, np.arange(84), np.arange(84)] = self.index + img[2, np.arange(84)] = self.index + img[1, :, np.arange(84)] = self.index + img[0] = self.index + return img else: return np.array([self.index], dtype=np.float32) diff --git a/test/base/test_batch.py b/test/base/test_batch.py index 4553edff7..0898e154c 100644 --- a/test/base/test_batch.py +++ b/test/base/test_batch.py @@ -21,6 +21,8 @@ def test_batch(): assert not Batch(a=[1, 2, 3]).is_empty() b = Batch({'a': [4, 4], 'b': [5, 5]}, c=[None, None]) assert b.c.dtype == np.object + b = Batch(d=[None], e=[starmap], f=Batch) + assert b.d.dtype == b.e.dtype == np.object and b.f == Batch b = Batch() b.update() assert b.is_empty() diff --git a/test/base/test_buffer.py b/test/base/test_buffer.py index fba2007bc..3f65ad633 100644 --- a/test/base/test_buffer.py +++ b/test/base/test_buffer.py @@ -1,16 +1,18 @@ import os +import h5py import torch import pickle import pytest import tempfile -import h5py import numpy as np from timeit import timeit from tianshou.data.utils.converter import to_hdf5 from tianshou.data import Batch, SegmentTree, ReplayBuffer -from tianshou.data import ListReplayBuffer, PrioritizedReplayBuffer -from tianshou.data import ReplayBufferManager, CachedReplayBuffer +from tianshou.data import PrioritizedReplayBuffer +from tianshou.data import VectorReplayBuffer, CachedReplayBuffer +from tianshou.data import PrioritizedVectorReplayBuffer + if __name__ == '__main__': from env import MyTestEnv @@ -27,13 +29,12 @@ def test_replaybuffer(size=10, bufsize=20): action_list = [1] * 5 + [0] * 10 + [1] * 10 for i, a in enumerate(action_list): obs_next, rew, done, info = env.step(a) - buf.add(obs, [a], rew, done, obs_next, info) + buf.add(Batch(obs=obs, act=[a], rew=rew, + done=done, obs_next=obs_next, info=info)) obs = obs_next assert len(buf) == min(bufsize, i + 1) - with pytest.raises(ValueError): - buf._add_to_buffer('rew', np.array([1, 2, 3])) - assert buf.act.dtype == np.object - assert isinstance(buf.act[0], list) + assert buf.act.dtype == np.int + assert buf.act.shape == (bufsize, 1) data, indice = buf.sample(bufsize * 2) assert (indice < len(buf)).all() assert (data.obs < size).all() @@ -41,7 +42,9 @@ def test_replaybuffer(size=10, bufsize=20): b = ReplayBuffer(size=10) # neg bsz should return empty index assert b.sample_index(-1).tolist() == [] - b.add(1, 1, 1, 1, 'str', {'a': 3, 'b': {'c': 5.0}}) + ptr, ep_rew, ep_len, ep_idx = b.add( + Batch(obs=1, act=1, rew=1, done=1, obs_next='str', + info={'a': 3, 'b': {'c': 5.0}})) assert b.obs[0] == 1 assert b.done[0] assert b.obs_next[0] == 'str' @@ -51,25 +54,45 @@ def test_replaybuffer(size=10, bufsize=20): assert np.all(b.info.a[1:] == 0) assert b.info.b.c[0] == 5.0 and b.info.b.c.dtype == np.inexact assert np.all(b.info.b.c[1:] == 0.0) + assert ptr.shape == (1,) and ptr[0] == 0 + assert ep_rew.shape == (1,) and ep_rew[0] == 1 + assert ep_len.shape == (1,) and ep_len[0] == 1 + assert ep_idx.shape == (1,) and ep_idx[0] == 0 + # test extra keys pop up, the buffer should handle it dynamically + batch = Batch(obs=2, act=2, rew=2, done=0, obs_next="str2", + info={"a": 4, "d": {"e": -np.inf}}) + b.add(batch) + info_keys = ["a", "b", "d"] + assert set(b.info.keys()) == set(info_keys) + assert b.info.a[1] == 4 and b.info.b.c[1] == 0 + assert b.info.d.e[1] == -np.inf + # test batch-style adding method, where len(batch) == 1 + batch.done = 1 + batch.info.e = np.zeros([1, 4]) + batch = Batch.stack([batch]) + ptr, ep_rew, ep_len, ep_idx = b.add(batch, buffer_ids=[0]) + assert ptr.shape == (1,) and ptr[0] == 2 + assert ep_rew.shape == (1,) and ep_rew[0] == 4 + assert ep_len.shape == (1,) and ep_len[0] == 2 + assert ep_idx.shape == (1,) and ep_idx[0] == 1 + assert set(b.info.keys()) == set(info_keys + ["e"]) + assert b.info.e.shape == (b.maxsize, 1, 4) with pytest.raises(IndexError): b[22] - b = ListReplayBuffer() - with pytest.raises(NotImplementedError): - b.sample(0) def test_ignore_obs_next(size=10): # Issue 82 buf = ReplayBuffer(size, ignore_obs_next=True) for i in range(size): - buf.add(obs={'mask1': np.array([i, 1, 1, 0, 0]), - 'mask2': np.array([i + 4, 0, 1, 0, 0]), - 'mask': i}, - act={'act_id': i, - 'position_id': i + 3}, - rew=i, - done=i % 3 == 0, - info={'if': i}) + buf.add(Batch(obs={'mask1': np.array([i, 1, 1, 0, 0]), + 'mask2': np.array([i + 4, 0, 1, 0, 0]), + 'mask': i}, + act={'act_id': i, + 'position_id': i + 3}, + rew=i, + done=i % 3 == 0, + info={'if': i})) indice = np.arange(len(buf)) orig = np.arange(len(buf)) data = buf[indice] @@ -103,9 +126,10 @@ def test_stack(size=5, bufsize=9, stack_num=4, cached_num=3): obs = env.reset(1) for i in range(16): obs_next, rew, done, info = env.step(1) - buf.add(obs, 1, rew, done, None, info) - buf2.add(obs, 1, rew, done, None, info) - buf3.add([None, None, obs], 1, rew, done, [None, obs], info) + buf.add(Batch(obs=obs, act=1, rew=rew, done=done, info=info)) + buf2.add(Batch(obs=obs, act=1, rew=rew, done=done, info=info)) + buf3.add(Batch(obs=[obs, obs, obs], act=1, rew=rew, + done=done, obs_next=[obs, obs], info=info)) obs = obs_next if done: obs = env.reset(1) @@ -129,11 +153,16 @@ def test_stack(size=5, bufsize=9, stack_num=4, cached_num=3): def test_priortized_replaybuffer(size=32, bufsize=15): env = MyTestEnv(size) buf = PrioritizedReplayBuffer(bufsize, 0.5, 0.5) + buf2 = PrioritizedVectorReplayBuffer(bufsize, buffer_num=3, alpha=0.5, beta=0.5) obs = env.reset() action_list = [1] * 5 + [0] * 10 + [1] * 10 for i, a in enumerate(action_list): obs_next, rew, done, info = env.step(a) - buf.add(obs, a, rew, done, obs_next, info, np.random.randn() - 0.5) + batch = Batch(obs=obs, act=a, rew=rew, done=done, obs_next=obs_next, + info=info, policy=np.random.randn() - 0.5) + batch_stack = Batch.stack([batch, batch, batch]) + buf.add(Batch.stack([batch]), buffer_ids=[0]) + buf2.add(batch_stack, buffer_ids=[0, 1, 2]) obs = obs_next data, indice = buf.sample(len(buf) // 2) if len(buf) // 2 == 0: @@ -141,26 +170,36 @@ def test_priortized_replaybuffer(size=32, bufsize=15): else: assert len(data) == len(buf) // 2 assert len(buf) == min(bufsize, i + 1) + assert len(buf2) == min(bufsize, 3 * (i + 1)) + # check single buffer's data + assert buf.info.key.shape == (buf.maxsize,) + assert buf.rew.dtype == np.float + assert buf.done.dtype == np.bool_ data, indice = buf.sample(len(buf) // 2) buf.update_weight(indice, -data.weight / 2) - assert np.allclose( - buf.weight[indice], np.abs(-data.weight / 2) ** buf._alpha) + assert np.allclose(buf.weight[indice], np.abs(-data.weight / 2) ** buf._alpha) + # check multi buffer's data + assert np.allclose(buf2[np.arange(buf2.maxsize)].weight, 1) + batch, indice = buf2.sample(10) + buf2.update_weight(indice, batch.weight * 0) + weight = buf2[np.arange(buf2.maxsize)].weight + mask = np.isin(np.arange(buf2.maxsize), indice) + assert np.all(weight[mask] == weight[mask][0]) + assert np.all(weight[~mask] == weight[~mask][0]) + assert weight[~mask][0] < weight[mask][0] and weight[mask][0] < 1 def test_update(): buf1 = ReplayBuffer(4, stack_num=2) buf2 = ReplayBuffer(4, stack_num=2) for i in range(5): - buf1.add(obs=np.array([i]), act=float(i), rew=i * i, - done=i % 2 == 0, info={'incident': 'found'}) + buf1.add(Batch(obs=np.array([i]), act=float(i), rew=i * i, + done=i % 2 == 0, info={'incident': 'found'})) assert len(buf1) > len(buf2) buf2.update(buf1) assert len(buf1) == len(buf2) assert (buf2[0].obs == buf1[1].obs).all() assert (buf2[-1].obs == buf1[0].obs).all() - b = ListReplayBuffer() - with pytest.raises(NotImplementedError): - b.update(b) b = CachedReplayBuffer(ReplayBuffer(10), 4, 5) with pytest.raises(NotImplementedError): b.update(b) @@ -270,22 +309,17 @@ def sample_tree(): def test_pickle(): size = 100 vbuf = ReplayBuffer(size, stack_num=2) - lbuf = ListReplayBuffer() pbuf = PrioritizedReplayBuffer(size, 0.6, 0.4) rew = np.array([1, 1]) for i in range(4): - vbuf.add(obs=Batch(index=np.array([i])), act=0, rew=rew, done=0) - for i in range(3): - lbuf.add(obs=Batch(index=np.array([i])), act=1, rew=rew, done=0) + vbuf.add(Batch(obs=Batch(index=np.array([i])), act=0, rew=rew, done=0)) for i in range(5): - pbuf.add(obs=Batch(index=np.array([i])), - act=2, rew=rew, done=0, weight=np.random.rand()) + pbuf.add(Batch(obs=Batch(index=np.array([i])), + act=2, rew=rew, done=0, info=np.random.rand())) # save & load _vbuf = pickle.loads(pickle.dumps(vbuf)) - _lbuf = pickle.loads(pickle.dumps(lbuf)) _pbuf = pickle.loads(pickle.dumps(pbuf)) assert len(_vbuf) == len(vbuf) and np.allclose(_vbuf.act, vbuf.act) - assert len(_lbuf) == len(lbuf) and np.allclose(_lbuf.act, lbuf.act) assert len(_pbuf) == len(pbuf) and np.allclose(_pbuf.act, pbuf.act) # make sure the meta var is identical assert _vbuf.stack_num == vbuf.stack_num @@ -297,7 +331,6 @@ def test_hdf5(): size = 100 buffers = { "array": ReplayBuffer(size, stack_num=2), - "list": ListReplayBuffer(), "prioritized": PrioritizedReplayBuffer(size, 0.6, 0.4), } buffer_types = {k: b.__class__ for k, b in buffers.items()} @@ -311,9 +344,8 @@ def test_hdf5(): 'done': i % 3 == 2, 'info': {"number": {"n": i, "t": info_t}, 'extra': None}, } - buffers["array"].add(**kwargs) - buffers["list"].add(**kwargs) - buffers["prioritized"].add(weight=np.random.rand(), **kwargs) + buffers["array"].add(Batch(kwargs)) + buffers["prioritized"].add(Batch(kwargs)) # save paths = {} @@ -356,10 +388,11 @@ def test_hdf5(): def test_replaybuffermanager(): - buf = ReplayBufferManager([ReplayBuffer(size=5) for i in range(4)]) - ep_len, ep_rew = buf.add(obs=[1, 2, 3], act=[1, 2, 3], rew=[1, 2, 3], - done=[0, 0, 1], buffer_ids=[0, 1, 2]) - assert np.allclose(ep_len, [0, 0, 1]) and np.allclose(ep_rew, [0, 0, 3]) + buf = VectorReplayBuffer(20, 4) + batch = Batch(obs=[1, 2, 3], act=[1, 2, 3], rew=[1, 2, 3], done=[0, 0, 1]) + ptr, ep_rew, ep_len, ep_idx = buf.add(batch, buffer_ids=[0, 1, 2]) + assert np.all(ep_len == [0, 0, 1]) and np.all(ep_rew == [0, 0, 3]) + assert np.all(ptr == [0, 5, 10]) and np.all(ep_idx == [0, 5, 10]) with pytest.raises(NotImplementedError): # ReplayBufferManager cannot be updated buf.update(buf) @@ -373,7 +406,7 @@ def test_replaybuffermanager(): indice_next = buf.next(indice) assert np.allclose(indice_next, indice), indice_next assert np.allclose(buf.unfinished_index(), [0, 5]) - buf.add(obs=[4], act=[4], rew=[4], done=[1], buffer_ids=[3]) + buf.add(Batch(obs=[4], act=[4], rew=[4], done=[1]), buffer_ids=[3]) assert np.allclose(buf.unfinished_index(), [0, 5]) batch, indice = buf.sample(10) batch, indice = buf.sample(0) @@ -383,12 +416,14 @@ def test_replaybuffermanager(): indice_next = buf.next(indice) assert np.allclose(indice_next, indice), indice_next data = np.array([0, 0, 0, 0]) - buf.add(obs=data, act=data, rew=data, done=data, buffer_ids=[0, 1, 2, 3]) - buf.add(obs=data, act=data, rew=data, done=1 - data, + buf.add(Batch(obs=data, act=data, rew=data, done=data), + buffer_ids=[0, 1, 2, 3]) + buf.add(Batch(obs=data, act=data, rew=data, done=1 - data), buffer_ids=[0, 1, 2, 3]) assert len(buf) == 12 - buf.add(obs=data, act=data, rew=data, done=data, buffer_ids=[0, 1, 2, 3]) - buf.add(obs=data, act=data, rew=data, done=[0, 1, 0, 1], + buf.add(Batch(obs=data, act=data, rew=data, done=data), + buffer_ids=[0, 1, 2, 3]) + buf.add(Batch(obs=data, act=data, rew=data, done=[0, 1, 0, 1]), buffer_ids=[0, 1, 2, 3]) assert len(buf) == 20 indice = buf.sample_index(120000) @@ -416,9 +451,10 @@ def test_replaybuffermanager(): 15, 17, 17, 19, 19, ]) assert np.allclose(buf.unfinished_index(), [4, 14]) - ep_len, ep_rew = buf.add(obs=[1], act=[1], rew=[1], done=[1], - buffer_ids=[2]) - assert np.allclose(ep_len, [3]) and np.allclose(ep_rew, [1]) + ptr, ep_rew, ep_len, ep_idx = buf.add( + Batch(obs=[1], act=[1], rew=[1], done=[1]), buffer_ids=[2]) + assert np.all(ep_len == [3]) and np.all(ep_rew == [1]) + assert np.all(ptr == [10]) and np.all(ep_idx == [13]) assert np.allclose(buf.unfinished_index(), [4]) indice = list(sorted(buf.sample_index(0))) assert np.allclose(indice, np.arange(len(buf))) @@ -438,9 +474,9 @@ def test_replaybuffermanager(): assert buf.prev(-1) == buf.prev([buf.maxsize - 1])[0] assert buf.next(-1) == buf.next([buf.maxsize - 1])[0] batch = buf._meta - batch.info.n = np.ones(buf.maxsize) + batch.info = np.ones(buf.maxsize) buf.set_batch(batch) - assert np.allclose(buf.buffers[-1].info.n, [1] * 5) + assert np.allclose(buf.buffers[-1].info, [1] * 5) assert buf.sample_index(-1).tolist() == [] assert np.array([ReplayBuffer(0, ignore_obs_next=True)]).dtype == np.object @@ -449,8 +485,8 @@ def test_cachedbuffer(): buf = CachedReplayBuffer(ReplayBuffer(10), 4, 5) assert buf.sample_index(0).tolist() == [] # check the normal function/usage/storage in CachedReplayBuffer - ep_len, ep_rew = buf.add(obs=[1], act=[1], rew=[1], done=[0], - cached_buffer_ids=[1]) + ptr, ep_rew, ep_len, ep_idx = buf.add( + Batch(obs=[1], act=[1], rew=[1], done=[0]), buffer_ids=[1]) obs = np.zeros(buf.maxsize) obs[15] = 1 indice = buf.sample_index(0) @@ -458,21 +494,25 @@ def test_cachedbuffer(): assert np.allclose(buf.prev(indice), [15]) assert np.allclose(buf.next(indice), [15]) assert np.allclose(buf.obs, obs) - assert np.allclose(ep_len, [0]) and np.allclose(ep_rew, [0.0]) - ep_len, ep_rew = buf.add(obs=[2], act=[2], rew=[2], done=[1], - cached_buffer_ids=[3]) + assert np.all(ep_len == [0]) and np.all(ep_rew == [0.0]) + assert np.all(ptr == [15]) and np.all(ep_idx == [15]) + ptr, ep_rew, ep_len, ep_idx = buf.add( + Batch(obs=[2], act=[2], rew=[2], done=[1]), buffer_ids=[3]) obs[[0, 25]] = 2 indice = buf.sample_index(0) assert np.allclose(indice, [0, 15]) assert np.allclose(buf.prev(indice), [0, 15]) assert np.allclose(buf.next(indice), [0, 15]) assert np.allclose(buf.obs, obs) - assert np.allclose(ep_len, [1]) and np.allclose(ep_rew, [2.0]) + assert np.all(ep_len == [1]) and np.all(ep_rew == [2.0]) + assert np.all(ptr == [0]) and np.all(ep_idx == [0]) assert np.allclose(buf.unfinished_index(), [15]) assert np.allclose(buf.sample_index(0), [0, 15]) - ep_len, ep_rew = buf.add(obs=[3, 4], act=[3, 4], rew=[3, 4], - done=[0, 1], cached_buffer_ids=[3, 1]) - assert np.allclose(ep_len, [0, 2]) and np.allclose(ep_rew, [0, 5.0]) + ptr, ep_rew, ep_len, ep_idx = buf.add( + Batch(obs=[3, 4], act=[3, 4], rew=[3, 4], done=[0, 1]), + buffer_ids=[3, 1]) + assert np.all(ep_len == [0, 2]) and np.all(ep_rew == [0, 5.0]) + assert np.all(ptr == [25, 2]) and np.all(ep_idx == [25, 1]) obs[[0, 1, 2, 15, 16, 25]] = [2, 1, 4, 1, 4, 3] assert np.allclose(buf.obs, obs) assert np.allclose(buf.unfinished_index(), [25]) @@ -488,11 +528,15 @@ def test_cachedbuffer(): buf = CachedReplayBuffer(ReplayBuffer(0, sample_avail=True), 4, 5) data = np.zeros(4) rew = np.ones([4, 4]) - buf.add(obs=data, act=data, rew=rew, done=[0, 0, 1, 1], obs_next=data) - buf.add(obs=data, act=data, rew=rew, done=[0, 0, 0, 0], obs_next=data) - buf.add(obs=data, act=data, rew=rew, done=[1, 1, 1, 1], obs_next=data) - buf.add(obs=data, act=data, rew=rew, done=[0, 0, 0, 0], obs_next=data) - buf.add(obs=data, act=data, rew=rew, done=[0, 1, 0, 1], obs_next=data) + buf.add(Batch(obs=data, act=data, rew=rew, done=[0, 0, 1, 1])) + buf.add(Batch(obs=data, act=data, rew=rew, done=[0, 0, 0, 0])) + buf.add(Batch(obs=data, act=data, rew=rew, done=[1, 1, 1, 1])) + buf.add(Batch(obs=data, act=data, rew=rew, done=[0, 0, 0, 0])) + ptr, ep_rew, ep_len, ep_idx = buf.add( + Batch(obs=data, act=data, rew=rew, done=[0, 1, 0, 1])) + assert np.all(ptr == [1, -1, 11, -1]) and np.all(ep_idx == [0, -1, 10, -1]) + assert np.all(ep_len == [0, 2, 0, 2]) + assert np.all(ep_rew == [data, data + 2, data, data + 2]) assert np.allclose(buf.done, [ 0, 0, 1, 0, 0, 0, 1, 1, 0, 0, @@ -515,11 +559,11 @@ def test_multibuf_stack(): buf4 = CachedReplayBuffer( ReplayBuffer(bufsize, stack_num=stack_num, ignore_obs_next=True), cached_num, size) - # test if CachedReplayBuffer can handle super corner case: - # prio-buffer + stack_num + ignore_obs_next + sample_avail + # test if CachedReplayBuffer can handle corner case: + # buffer + stack_num + ignore_obs_next + sample_avail buf5 = CachedReplayBuffer( - PrioritizedReplayBuffer(bufsize, 0.6, 0.4, stack_num=stack_num, - ignore_obs_next=True, sample_avail=True), + ReplayBuffer(bufsize, stack_num=stack_num, + ignore_obs_next=True, sample_avail=True), cached_num, size) obs = env.reset(1) for i in range(18): @@ -530,10 +574,12 @@ def test_multibuf_stack(): done_list = [done] * cached_num obs_next_list = -obs_list info_list = [info] * cached_num - buf4.add(obs_list, act_list, rew_list, done_list, - obs_next_list, info_list) - buf5.add(obs_list, act_list, rew_list, done_list, - obs_next_list, info_list) + batch = Batch(obs=obs_list, act=act_list, rew=rew_list, + done=done_list, obs_next=obs_next_list, info=info_list) + buf5.add(batch) + buf4.add(batch) + assert np.all(buf4.obs == buf5.obs) + assert np.all(buf4.done == buf5.done) obs = obs_next if done: obs = env.reset(1) @@ -567,7 +613,6 @@ def test_multibuf_stack(): [1, 1, 1, 2], [1, 1, 1, 2], [6, 6, 6, 7], [6, 6, 6, 7], [11, 11, 11, 12], [11, 11, 11, 12], ]) - assert np.all(buf4.done == buf5.done) indice = buf5.sample_index(0) assert np.allclose(sorted(indice), [2, 7]) assert np.all(np.isin(buf5.sample_index(100), indice)) @@ -578,25 +623,14 @@ def test_multibuf_stack(): indice = buf5.sample_index(0) assert np.allclose(sorted(indice), [0, 1, 2, 5, 6, 7, 10, 15, 20]) batch, _ = buf5.sample(0) - assert np.allclose(buf5[np.arange(buf5.maxsize)].weight, 1) - buf5.update_weight(indice, batch.weight * 0) - weight = buf5[np.arange(buf5.maxsize)].weight - modified_weight = weight[[0, 1, 2, 5, 6, 7]] - assert modified_weight.min() == modified_weight.max() - assert modified_weight.max() < 1 - unmodified_weight = weight[[3, 4, 8]] - assert unmodified_weight.min() == unmodified_weight.max() - assert unmodified_weight.max() < 1 - cached_weight = weight[9:] - assert cached_weight.min() == cached_weight.max() == 1 # test Atari with CachedReplayBuffer, save_only_last_obs + ignore_obs_next buf6 = CachedReplayBuffer( ReplayBuffer(bufsize, stack_num=stack_num, save_only_last_obs=True, ignore_obs_next=True), cached_num, size) obs = np.random.rand(size, 4, 84, 84) - buf6.add(obs=[obs[2], obs[0]], act=[1, 1], rew=[0, 0], done=[0, 1], - obs_next=[obs[3], obs[1]], cached_buffer_ids=[1, 2]) + buf6.add(Batch(obs=[obs[2], obs[0]], act=[1, 1], rew=[0, 0], done=[0, 1], + obs_next=[obs[3], obs[1]]), buffer_ids=[1, 2]) assert buf6.obs.shape == (buf6.maxsize, 84, 84) assert np.allclose(buf6.obs[0], obs[0, -1]) assert np.allclose(buf6.obs[14], obs[2, -1]) @@ -607,7 +641,7 @@ def test_multibuf_stack(): def test_multibuf_hdf5(): size = 100 buffers = { - "vector": ReplayBufferManager([ReplayBuffer(size) for i in range(4)]), + "vector": VectorReplayBuffer(size * 4, 4), "cached": CachedReplayBuffer(ReplayBuffer(size), 4, size) } buffer_types = {k: b.__class__ for k, b in buffers.items()} @@ -621,10 +655,10 @@ def test_multibuf_hdf5(): 'done': i % 3 == 2, 'info': {"number": {"n": i, "t": info_t}, 'extra': None}, } - buffers["vector"].add(**Batch.stack([kwargs, kwargs, kwargs]), + buffers["vector"].add(Batch.stack([kwargs, kwargs, kwargs]), + buffer_ids=[0, 1, 2]) + buffers["cached"].add(Batch.stack([kwargs, kwargs, kwargs]), buffer_ids=[0, 1, 2]) - buffers["cached"].add(**Batch.stack([kwargs, kwargs, kwargs]), - cached_buffer_ids=[0, 1, 2]) # save paths = {} @@ -644,7 +678,7 @@ def test_multibuf_hdf5(): assert _buffers[k].stack_num == buffers[k].stack_num assert _buffers[k].maxsize == buffers[k].maxsize assert np.all(_buffers[k]._indices == buffers[k]._indices) - # check shallow copy in ReplayBufferManager + # check shallow copy in VectorReplayBuffer for k in ["vector", "cached"]: buffers[k].info.number.n[0] = -100 assert buffers[k].buffers[0].info.number.n[0] == -100 @@ -657,7 +691,7 @@ def test_multibuf_hdf5(): 'done': False, 'info': {"number": {"n": i}, 'Timelimit.truncate': True}, } - buffers[k].add(**Batch.stack([kwargs, kwargs, kwargs, kwargs])) + buffers[k].add(Batch.stack([kwargs, kwargs, kwargs, kwargs])) act = np.zeros(buffers[k].maxsize) if k == "vector": act[np.arange(5)] = np.array([0, 1, 2, 3, 5]) @@ -671,6 +705,8 @@ def test_multibuf_hdf5(): act[np.arange(3) + size * 3] = np.array([3, 5, 2]) act[size * 4] = 5 assert np.allclose(buffers[k].act, act) + info_keys = ["number", "extra", "Timelimit.truncate"] + assert set(buffers[k].info.keys()) == set(info_keys) for path in paths.values(): os.remove(path) diff --git a/test/base/test_collector.py b/test/base/test_collector.py index e7e11759f..b9d789193 100644 --- a/test/base/test_collector.py +++ b/test/base/test_collector.py @@ -1,9 +1,17 @@ +import tqdm +import pytest import numpy as np from torch.utils.tensorboard import SummaryWriter from tianshou.policy import BasePolicy from tianshou.env import DummyVectorEnv, SubprocVectorEnv -from tianshou.data import Collector, Batch, ReplayBuffer +from tianshou.data import Batch, Collector, AsyncCollector +from tianshou.data import ( + ReplayBuffer, + PrioritizedReplayBuffer, + VectorReplayBuffer, + CachedReplayBuffer, +) if __name__ == '__main__': from env import MyTestEnv @@ -12,7 +20,7 @@ class MyPolicy(BasePolicy): - def __init__(self, dict_state: bool = False, need_state: bool = True): + def __init__(self, dict_state=False, need_state=True): """ :param bool dict_state: if the observation of the environment is a dict :param bool need_state: if the policy needs the hidden state (for RNN) @@ -43,15 +51,13 @@ def __init__(self, writer): def preprocess_fn(self, **kwargs): # modify info before adding into the buffer, and recorded into tfb # if only obs exist -> reset - # if obs/act/rew/done/... exist -> normal step + # if obs_next/rew/done/info exist -> normal step if 'rew' in kwargs: - n = len(kwargs['obs']) info = kwargs['info'] - for i in range(n): - info[i].update(rew=kwargs['rew'][i]) + info.rew = kwargs['rew'] if 'key' in info.keys(): - self.writer.add_scalar('key', np.mean( - info['key']), global_step=self.cnt) + self.writer.add_scalar( + 'key', np.mean(info.key), global_step=self.cnt) self.cnt += 1 return Batch(info=info) else: @@ -61,10 +67,8 @@ def preprocess_fn(self, **kwargs): def single_preprocess_fn(**kwargs): # same as above, without tfb if 'rew' in kwargs: - n = len(kwargs['obs']) info = kwargs['info'] - for i in range(n): - info[i].update(rew=kwargs['rew'][i]) + info.rew = kwargs['rew'] return Batch(info=info) else: return Batch() @@ -79,110 +83,105 @@ def test_collector(): dum = DummyVectorEnv(env_fns) policy = MyPolicy() env = env_fns[0]() - c0 = Collector(policy, env, ReplayBuffer(size=100, ignore_obs_next=False), - logger.preprocess_fn) + c0 = Collector(policy, env, ReplayBuffer(size=100), logger.preprocess_fn) c0.collect(n_step=3) - assert np.allclose(c0.buffer.obs[:4, 0], [0, 1, 0, 1]) - assert np.allclose(c0.buffer[:4].obs_next[..., 0], [1, 2, 1, 2]) + assert len(c0.buffer) == 3 + assert np.allclose(c0.buffer.obs[:4, 0], [0, 1, 0, 0]) + assert np.allclose(c0.buffer[:].obs_next[..., 0], [1, 2, 1]) c0.collect(n_episode=3) - assert np.allclose(c0.buffer.obs[:10, 0], [0, 1, 0, 1, 0, 1, 0, 1, 0, 1]) - assert np.allclose(c0.buffer[:10].obs_next[..., 0], - [1, 2, 1, 2, 1, 2, 1, 2, 1, 2]) + assert len(c0.buffer) == 8 + assert np.allclose(c0.buffer.obs[:10, 0], [0, 1, 0, 1, 0, 1, 0, 1, 0, 0]) + assert np.allclose(c0.buffer[:].obs_next[..., 0], + [1, 2, 1, 2, 1, 2, 1, 2]) c0.collect(n_step=3, random=True) - c1 = Collector(policy, venv, ReplayBuffer(size=100, ignore_obs_next=False), - logger.preprocess_fn) - c1.collect(n_step=6) - assert np.allclose(c1.buffer.obs[:11, 0], - [0, 1, 0, 1, 2, 0, 1, 0, 1, 2, 3]) - assert np.allclose(c1.buffer[:11].obs_next[..., 0], - [1, 2, 1, 2, 3, 1, 2, 1, 2, 3, 4]) - c1.collect(n_episode=2) - assert np.allclose(c1.buffer.obs[11:21, 0], [0, 1, 2, 3, 4, 0, 1, 0, 1, 2]) - assert np.allclose(c1.buffer[11:21].obs_next[..., 0], - [1, 2, 3, 4, 5, 1, 2, 1, 2, 3]) - c1.collect(n_episode=3, random=True) - c2 = Collector(policy, dum, ReplayBuffer(size=100, ignore_obs_next=False), - logger.preprocess_fn) - c2.collect(n_episode=[1, 2, 2, 2]) - assert np.allclose(c2.buffer.obs_next[:26, 0], [ - 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, - 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]) - c2.reset_env() - c2.collect(n_episode=[2, 2, 2, 2]) - assert np.allclose(c2.buffer.obs_next[26:54, 0], [ - 1, 2, 1, 2, 3, 1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, - 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]) - c2.collect(n_episode=[1, 1, 1, 1], random=True) - + c1 = Collector( + policy, venv, + VectorReplayBuffer(total_size=100, buffer_num=4), + logger.preprocess_fn) + c1.collect(n_step=8) + obs = np.zeros(100) + obs[[0, 1, 25, 26, 50, 51, 75, 76]] = [0, 1, 0, 1, 0, 1, 0, 1] -def test_collector_with_exact_episodes(): - env_lens = [2, 6, 3, 10] - writer = SummaryWriter('log/exact_collector') - logger = Logger(writer) - env_fns = [lambda x=i: MyTestEnv(size=x, sleep=0.1, random_sleep=True) - for i in env_lens] + assert np.allclose(c1.buffer.obs[:, 0], obs) + assert np.allclose(c1.buffer[:].obs_next[..., 0], [1, 2, 1, 2, 1, 2, 1, 2]) + c1.collect(n_episode=4) + assert len(c1.buffer) == 16 + obs[[2, 3, 27, 52, 53, 77, 78, 79]] = [0, 1, 2, 2, 3, 2, 3, 4] + assert np.allclose(c1.buffer.obs[:, 0], obs) + assert np.allclose(c1.buffer[:].obs_next[..., 0], + [1, 2, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]) + c1.collect(n_episode=4, random=True) + c2 = Collector( + policy, dum, + VectorReplayBuffer(total_size=100, buffer_num=4), + logger.preprocess_fn) + c2.collect(n_episode=7) + obs1 = obs.copy() + obs1[[4, 5, 28, 29, 30]] = [0, 1, 0, 1, 2] + obs2 = obs.copy() + obs2[[28, 29, 30, 54, 55, 56, 57]] = [0, 1, 2, 0, 1, 2, 3] + c2obs = c2.buffer.obs[:, 0] + assert np.all(c2obs == obs1) or np.all(c2obs == obs2) + c2.reset_env() + c2.reset_buffer() + assert c2.collect(n_episode=8)['n/ep'] == 8 + obs[[4, 5, 28, 29, 30, 54, 55, 56, 57]] = [0, 1, 0, 1, 2, 0, 1, 2, 3] + assert np.all(c2.buffer.obs[:, 0] == obs) + c2.collect(n_episode=4, random=True) - venv = SubprocVectorEnv(env_fns, wait_num=len(env_fns) - 1) - policy = MyPolicy() - c1 = Collector(policy, venv, - ReplayBuffer(size=1000, ignore_obs_next=False), - logger.preprocess_fn) - n_episode1 = [2, 0, 5, 1] - n_episode2 = [1, 3, 2, 0] - c1.collect(n_episode=n_episode1) - expected_steps = sum([a * b for a, b in zip(env_lens, n_episode1)]) - actual_steps = sum(venv.steps) - assert expected_steps == actual_steps - c1.collect(n_episode=n_episode2) - expected_steps = sum( - [a * (b + c) for a, b, c in zip(env_lens, n_episode1, n_episode2)]) - actual_steps = sum(venv.steps) - assert expected_steps == actual_steps + # test corner case + with pytest.raises(TypeError): + Collector(policy, dum, ReplayBuffer(10)) + with pytest.raises(TypeError): + Collector(policy, dum, PrioritizedReplayBuffer(10, 0.5, 0.5)) + with pytest.raises(TypeError): + c2.collect() def test_collector_with_async(): env_lens = [2, 3, 4, 5] writer = SummaryWriter('log/async_collector') logger = Logger(writer) - env_fns = [lambda x=i: MyTestEnv(size=x, sleep=0.1, random_sleep=True) + env_fns = [lambda x=i: MyTestEnv(size=x, sleep=0.001, random_sleep=True) for i in env_lens] venv = SubprocVectorEnv(env_fns, wait_num=len(env_fns) - 1) policy = MyPolicy() - c1 = Collector(policy, venv, - ReplayBuffer(size=1000, ignore_obs_next=False), - logger.preprocess_fn) - c1.collect(n_episode=10) - # check if the data in the buffer is chronological - # i.e. data in the buffer are full episodes, and each episode is - # returned by the same environment - env_id = c1.buffer.info['env_id'] - size = len(c1.buffer) - obs = c1.buffer.obs[:size] - done = c1.buffer.done[:size] - obs_ground_truth = [] - i = 0 - while i < size: - # i is the start of an episode - if done[i]: - # this episode has one transition - assert env_lens[env_id[i]] == 1 - i += 1 - continue - j = i - while True: - j += 1 - # in one episode, the environment id is the same - assert env_id[j] == env_id[i] - if done[j]: - break - j = j + 1 # j is the start of the next episode - assert j - i == env_lens[env_id[i]] - obs_ground_truth += list(range(j - i)) - i = j - obs_ground_truth = np.expand_dims( - np.array(obs_ground_truth), axis=-1) - assert np.allclose(obs, obs_ground_truth) + bufsize = 60 + c1 = AsyncCollector( + policy, venv, + VectorReplayBuffer(total_size=bufsize * 4, buffer_num=4), + logger.preprocess_fn) + ptr = [0, 0, 0, 0] + for n_episode in tqdm.trange(1, 30, desc="test async n_episode"): + result = c1.collect(n_episode=n_episode) + assert result["n/ep"] >= n_episode + # check buffer data, obs and obs_next, env_id + for i, count in enumerate( + np.bincount(result["lens"], minlength=6)[2:]): + env_len = i + 2 + total = env_len * count + indices = np.arange(ptr[i], ptr[i] + total) % bufsize + ptr[i] = (ptr[i] + total) % bufsize + seq = np.arange(env_len) + buf = c1.buffer.buffers[i] + assert np.all(buf.info.env_id[indices] == i) + assert np.all(buf.obs[indices].reshape(count, env_len) == seq) + assert np.all(buf.obs_next[indices].reshape( + count, env_len) == seq + 1) + # test async n_step, for now the buffer should be full of data + for n_step in tqdm.trange(1, 15, desc="test async n_step"): + result = c1.collect(n_step=n_step) + assert result["n/st"] >= n_step + for i in range(4): + env_len = i + 2 + seq = np.arange(env_len) + buf = c1.buffer.buffers[i] + assert np.all(buf.info.env_id == i) + assert np.all(buf.obs.reshape(-1, env_len) == seq) + assert np.all(buf.obs_next.reshape(-1, env_len) == seq + 1) + with pytest.raises(TypeError): + c1.collect() def test_collector_with_dict_state(): @@ -192,72 +191,227 @@ def test_collector_with_dict_state(): Logger.single_preprocess_fn) c0.collect(n_step=3) c0.collect(n_episode=2) + assert len(c0.buffer) == 10 env_fns = [lambda x=i: MyTestEnv(size=x, sleep=0, dict_state=True) for i in [2, 3, 4, 5]] envs = DummyVectorEnv(env_fns) envs.seed(666) obs = envs.reset() assert not np.isclose(obs[0]['rand'], obs[1]['rand']) - c1 = Collector(policy, envs, ReplayBuffer(size=100), - Logger.single_preprocess_fn) - c1.collect(n_step=10) - c1.collect(n_episode=[2, 1, 1, 2]) + c1 = Collector( + policy, envs, + VectorReplayBuffer(total_size=100, buffer_num=4), + Logger.single_preprocess_fn) + c1.collect(n_step=12) + result = c1.collect(n_episode=8) + assert result['n/ep'] == 8 + lens = np.bincount(result['lens']) + assert result['n/st'] == 21 and np.all(lens == [0, 0, 2, 2, 2, 2]) or \ + result['n/st'] == 20 and np.all(lens == [0, 0, 3, 1, 2, 2]) batch, _ = c1.buffer.sample(10) - print(batch) c0.buffer.update(c1.buffer) - assert np.allclose(c0.buffer[:len(c0.buffer)].obs.index[..., 0], [ - 0., 1., 2., 3., 4., 0., 1., 2., 3., 4., 0., 1., 2., 3., 4., 0., 1., - 0., 1., 2., 0., 1., 0., 1., 2., 3., 0., 1., 2., 3., 4., 0., 1., 0., - 1., 2., 0., 1., 0., 1., 2., 3., 0., 1., 2., 3., 4.]) - c2 = Collector(policy, envs, ReplayBuffer(size=100, stack_num=4), - Logger.single_preprocess_fn) - c2.collect(n_episode=[0, 0, 0, 10]) + assert len(c0.buffer) in [42, 43] + if len(c0.buffer) == 42: + assert np.all(c0.buffer[:].obs.index[..., 0] == [ + 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 1, 2, 0, 1, 2, + 0, 1, 2, 3, 0, 1, 2, 3, + 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, + ]), c0.buffer[:].obs.index[..., 0] + else: + assert np.all(c0.buffer[:].obs.index[..., 0] == [ + 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, + 0, 1, 0, 1, 0, 1, + 0, 1, 2, 0, 1, 2, 0, 1, 2, + 0, 1, 2, 3, 0, 1, 2, 3, + 0, 1, 2, 3, 4, 0, 1, 2, 3, 4, + ]), c0.buffer[:].obs.index[..., 0] + c2 = Collector( + policy, envs, + VectorReplayBuffer(total_size=100, buffer_num=4, stack_num=4), + Logger.single_preprocess_fn) + c2.collect(n_episode=10) batch, _ = c2.buffer.sample(10) def test_collector_with_ma(): - def reward_metric(x): - return x.sum() env = MyTestEnv(size=5, sleep=0, ma_rew=4) policy = MyPolicy() c0 = Collector(policy, env, ReplayBuffer(size=100), - Logger.single_preprocess_fn, reward_metric=reward_metric) + Logger.single_preprocess_fn) # n_step=3 will collect a full episode - r = c0.collect(n_step=3)['rew'] - assert np.asanyarray(r).size == 1 and r == 4. - r = c0.collect(n_episode=2)['rew'] - assert np.asanyarray(r).size == 1 and r == 4. + r = c0.collect(n_step=3)['rews'] + assert len(r) == 0 + r = c0.collect(n_episode=2)['rews'] + assert r.shape == (2, 4) and np.all(r == 1) env_fns = [lambda x=i: MyTestEnv(size=x, sleep=0, ma_rew=4) for i in [2, 3, 4, 5]] envs = DummyVectorEnv(env_fns) - c1 = Collector(policy, envs, ReplayBuffer(size=100), - Logger.single_preprocess_fn, reward_metric=reward_metric) - r = c1.collect(n_step=10)['rew'] - assert np.asanyarray(r).size == 1 and r == 4. - r = c1.collect(n_episode=[2, 1, 1, 2])['rew'] - assert np.asanyarray(r).size == 1 and r == 4. + c1 = Collector( + policy, envs, + VectorReplayBuffer(total_size=100, buffer_num=4), + Logger.single_preprocess_fn) + r = c1.collect(n_step=12)['rews'] + assert r.shape == (2, 4) and np.all(r == 1), r + r = c1.collect(n_episode=8)['rews'] + assert r.shape == (8, 4) and np.all(r == 1) batch, _ = c1.buffer.sample(10) print(batch) c0.buffer.update(c1.buffer) - assert np.allclose(c0.buffer[:len(c0.buffer)].obs[..., 0], [ - 0., 1., 2., 3., 4., 0., 1., 2., 3., 4., 0., 1., 2., 3., 4., 0., 1., - 0., 1., 2., 0., 1., 0., 1., 2., 3., 0., 1., 2., 3., 4., 0., 1., 0., - 1., 2., 0., 1., 0., 1., 2., 3., 0., 1., 2., 3., 4.]) - rew = [0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, - 0, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, - 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1] - assert np.allclose(c0.buffer[:len(c0.buffer)].rew, - [[x] * 4 for x in rew]) - c2 = Collector(policy, envs, ReplayBuffer(size=100, stack_num=4), - Logger.single_preprocess_fn, reward_metric=reward_metric) - r = c2.collect(n_episode=[0, 0, 0, 10])['rew'] - assert np.asanyarray(r).size == 1 and r == 4. + assert len(c0.buffer) in [42, 43] + if len(c0.buffer) == 42: + rew = [ + 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, + 0, 1, 0, 1, 0, 1, 0, 1, + 0, 0, 1, 0, 0, 1, + 0, 0, 0, 1, 0, 0, 0, 1, + 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, + ] + else: + rew = [ + 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, + 0, 1, 0, 1, 0, 1, + 0, 0, 1, 0, 0, 1, 0, 0, 1, + 0, 0, 0, 1, 0, 0, 0, 1, + 0, 0, 0, 0, 1, 0, 0, 0, 0, 1, + ] + assert np.all(c0.buffer[:].rew == [[x] * 4 for x in rew]) + assert np.all(c0.buffer[:].done == rew) + c2 = Collector( + policy, envs, + VectorReplayBuffer(total_size=100, buffer_num=4, stack_num=4), + Logger.single_preprocess_fn) + r = c2.collect(n_episode=10)['rews'] + assert r.shape == (10, 4) and np.all(r == 1) batch, _ = c2.buffer.sample(10) +def test_collector_with_atari_setting(): + reference_obs = np.zeros([6, 4, 84, 84]) + for i in range(6): + reference_obs[i, 3, np.arange(84), np.arange(84)] = i + reference_obs[i, 2, np.arange(84)] = i + reference_obs[i, 1, :, np.arange(84)] = i + reference_obs[i, 0] = i + + # atari single buffer + env = MyTestEnv(size=5, sleep=0, array_state=True) + policy = MyPolicy() + c0 = Collector(policy, env, ReplayBuffer(size=100)) + c0.collect(n_step=6) + c0.collect(n_episode=2) + assert c0.buffer.obs.shape == (100, 4, 84, 84) + assert c0.buffer.obs_next.shape == (100, 4, 84, 84) + assert len(c0.buffer) == 15 + obs = np.zeros_like(c0.buffer.obs) + obs[np.arange(15)] = reference_obs[np.arange(15) % 5] + assert np.all(obs == c0.buffer.obs) + + c1 = Collector(policy, env, ReplayBuffer(size=100, ignore_obs_next=True)) + c1.collect(n_episode=3) + assert np.allclose(c0.buffer.obs, c1.buffer.obs) + with pytest.raises(AttributeError): + c1.buffer.obs_next + assert np.all(reference_obs[[1, 2, 3, 4, 4] * 3] == c1.buffer[:].obs_next) + + c2 = Collector( + policy, env, + ReplayBuffer(size=100, ignore_obs_next=True, save_only_last_obs=True)) + c2.collect(n_step=8) + assert c2.buffer.obs.shape == (100, 84, 84) + obs = np.zeros_like(c2.buffer.obs) + obs[np.arange(8)] = reference_obs[[0, 1, 2, 3, 4, 0, 1, 2], -1] + assert np.all(c2.buffer.obs == obs) + assert np.allclose(c2.buffer[:].obs_next, + reference_obs[[1, 2, 3, 4, 4, 1, 2, 2], -1]) + + # atari multi buffer + env_fns = [lambda x=i: MyTestEnv(size=x, sleep=0, array_state=True) + for i in [2, 3, 4, 5]] + envs = DummyVectorEnv(env_fns) + c3 = Collector( + policy, envs, + VectorReplayBuffer(total_size=100, buffer_num=4)) + c3.collect(n_step=12) + result = c3.collect(n_episode=9) + assert result["n/ep"] == 9 and result["n/st"] == 23 + assert c3.buffer.obs.shape == (100, 4, 84, 84) + obs = np.zeros_like(c3.buffer.obs) + obs[np.arange(8)] = reference_obs[[0, 1, 0, 1, 0, 1, 0, 1]] + obs[np.arange(25, 34)] = reference_obs[[0, 1, 2, 0, 1, 2, 0, 1, 2]] + obs[np.arange(50, 58)] = reference_obs[[0, 1, 2, 3, 0, 1, 2, 3]] + obs[np.arange(75, 85)] = reference_obs[[0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] + assert np.all(obs == c3.buffer.obs) + obs_next = np.zeros_like(c3.buffer.obs_next) + obs_next[np.arange(8)] = reference_obs[[1, 2, 1, 2, 1, 2, 1, 2]] + obs_next[np.arange(25, 34)] = reference_obs[[1, 2, 3, 1, 2, 3, 1, 2, 3]] + obs_next[np.arange(50, 58)] = reference_obs[[1, 2, 3, 4, 1, 2, 3, 4]] + obs_next[np.arange(75, 85)] = reference_obs[[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]] + assert np.all(obs_next == c3.buffer.obs_next) + c4 = Collector( + policy, envs, + VectorReplayBuffer(total_size=100, buffer_num=4, stack_num=4, + ignore_obs_next=True, save_only_last_obs=True)) + c4.collect(n_step=12) + result = c4.collect(n_episode=9) + assert result["n/ep"] == 9 and result["n/st"] == 23 + assert c4.buffer.obs.shape == (100, 84, 84) + obs = np.zeros_like(c4.buffer.obs) + slice_obs = reference_obs[:, -1] + obs[np.arange(8)] = slice_obs[[0, 1, 0, 1, 0, 1, 0, 1]] + obs[np.arange(25, 34)] = slice_obs[[0, 1, 2, 0, 1, 2, 0, 1, 2]] + obs[np.arange(50, 58)] = slice_obs[[0, 1, 2, 3, 0, 1, 2, 3]] + obs[np.arange(75, 85)] = slice_obs[[0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] + assert np.all(c4.buffer.obs == obs) + obs_next = np.zeros([len(c4.buffer), 4, 84, 84]) + ref_index = np.array([ + 1, 1, 1, 1, 1, 1, 1, 1, + 1, 2, 2, 1, 2, 2, 1, 2, 2, + 1, 2, 3, 3, 1, 2, 3, 3, + 1, 2, 3, 4, 4, 1, 2, 3, 4, 4, + ]) + obs_next[:, -1] = slice_obs[ref_index] + ref_index -= 1 + ref_index[ref_index < 0] = 0 + obs_next[:, -2] = slice_obs[ref_index] + ref_index -= 1 + ref_index[ref_index < 0] = 0 + obs_next[:, -3] = slice_obs[ref_index] + ref_index -= 1 + ref_index[ref_index < 0] = 0 + obs_next[:, -4] = slice_obs[ref_index] + assert np.all(obs_next == c4.buffer[:].obs_next) + + buf = ReplayBuffer(100, stack_num=4, ignore_obs_next=True, + save_only_last_obs=True) + c5 = Collector(policy, envs, CachedReplayBuffer(buf, 4, 10)) + result_ = c5.collect(n_step=12) + assert len(buf) == 5 and len(c5.buffer) == 12 + result = c5.collect(n_episode=9) + assert result["n/ep"] == 9 and result["n/st"] == 23 + assert len(buf) == 35 + assert np.all(buf.obs[:len(buf)] == slice_obs[[ + 0, 1, 0, 1, 2, 0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4, + 0, 1, 0, 1, 2, 0, 1, 0, 1, 2, 3, 0, 1, 2, 0, 1, 2, 3, 4]]) + assert np.all(buf[:].obs_next[:, -1] == slice_obs[[ + 1, 1, 1, 2, 2, 1, 1, 1, 2, 3, 3, 1, 2, 3, 4, 4, + 1, 1, 1, 2, 2, 1, 1, 1, 2, 3, 3, 1, 2, 2, 1, 2, 3, 4, 4]]) + assert len(buf) == len(c5.buffer) + + # test buffer=None + c6 = Collector(policy, envs) + result1 = c6.collect(n_step=12) + for key in ["n/ep", "n/st", "rews", "lens"]: + assert np.allclose(result1[key], result_[key]) + result2 = c6.collect(n_episode=9) + for key in ["n/ep", "n/st", "rews", "lens"]: + assert np.allclose(result2[key], result[key]) + + if __name__ == '__main__': test_collector() test_collector_with_dict_state() test_collector_with_ma() + test_collector_with_atari_setting() test_collector_with_async() - test_collector_with_exact_episodes() diff --git a/test/base/test_returns.py b/test/base/test_returns.py index a8bdc7c9d..5aba7f56a 100644 --- a/test/base/test_returns.py +++ b/test/base/test_returns.py @@ -20,50 +20,71 @@ def compute_episodic_return_base(batch, gamma): def test_episodic_returns(size=2560): fn = BasePolicy.compute_episodic_return + buf = ReplayBuffer(20) batch = Batch( done=np.array([1, 0, 0, 1, 0, 1, 0, 1.]), rew=np.array([0, 1, 2, 3, 4, 5, 6, 7.]), ) - batch = fn(batch, None, gamma=.1, gae_lambda=1) + for b in batch: + b.obs = b.act = 1 + buf.add(b) + batch = fn(batch, buf, buf.sample_index(0), None, gamma=.1, gae_lambda=1) ans = np.array([0, 1.23, 2.3, 3, 4.5, 5, 6.7, 7]) assert np.allclose(batch.returns, ans) + buf.reset() batch = Batch( done=np.array([0, 1, 0, 1, 0, 1, 0.]), rew=np.array([7, 6, 1, 2, 3, 4, 5.]), ) - batch = fn(batch, None, gamma=.1, gae_lambda=1) + for b in batch: + b.obs = b.act = 1 + buf.add(b) + batch = fn(batch, buf, buf.sample_index(0), None, gamma=.1, gae_lambda=1) ans = np.array([7.6, 6, 1.2, 2, 3.4, 4, 5]) assert np.allclose(batch.returns, ans) + buf.reset() batch = Batch( done=np.array([0, 1, 0, 1, 0, 0, 1.]), rew=np.array([7, 6, 1, 2, 3, 4, 5.]), ) - batch = fn(batch, None, gamma=.1, gae_lambda=1) + for b in batch: + b.obs = b.act = 1 + buf.add(b) + batch = fn(batch, buf, buf.sample_index(0), None, gamma=.1, gae_lambda=1) ans = np.array([7.6, 6, 1.2, 2, 3.45, 4.5, 5]) assert np.allclose(batch.returns, ans) + buf.reset() batch = Batch( done=np.array([0, 0, 0, 1., 0, 0, 0, 1, 0, 0, 0, 1]), - rew=np.array([ - 101, 102, 103., 200, 104, 105, 106, 201, 107, 108, 109, 202]) + rew=np.array([101, 102, 103., 200, 104, 105, 106, 201, 107, 108, 109, 202]), ) + for b in batch: + b.obs = b.act = 1 + buf.add(b) v = np.array([2., 3., 4, -1, 5., 6., 7, -2, 8., 9., 10, -3]) - ret = fn(batch, v, gamma=0.99, gae_lambda=0.95) + ret = fn(batch, buf, buf.sample_index(0), v, gamma=0.99, gae_lambda=0.95) returns = np.array([ 454.8344, 376.1143, 291.298, 200., 464.5610, 383.1085, 295.387, 201., 474.2876, 390.1027, 299.476, 202.]) assert np.allclose(ret.returns, returns) + buf.reset() if __name__ == '__main__': + buf = ReplayBuffer(size) batch = Batch( done=np.random.randint(100, size=size) == 0, rew=np.random.random(size), ) + for b in batch: + b.obs = b.act = 1 + buf.add(b) + indice = buf.sample_index(0) def vanilla(): return compute_episodic_return_base(batch, gamma=.1) def optimized(): - return fn(batch, gamma=.1) + return fn(batch, buf, indice, gamma=.1, gae_lambda=1.0) cnt = 3000 print('GAE vanilla', timeit(vanilla, setup=vanilla, number=cnt)) @@ -72,7 +93,7 @@ def optimized(): def target_q_fn(buffer, indice): # return the next reward - indice = (indice + 1 - buffer.done[indice]) % len(buffer) + indice = buffer.next(indice) return torch.tensor(-buffer.rew[indice], dtype=torch.float32) @@ -101,14 +122,15 @@ def compute_nstep_return_base(nstep, gamma, buffer, indice): def test_nstep_returns(size=10000): buf = ReplayBuffer(10) for i in range(12): - buf.add(obs=0, act=0, rew=i + 1, done=i % 4 == 3) + buf.add(Batch(obs=0, act=0, rew=i + 1, done=i % 4 == 3)) batch, indice = buf.sample(0) assert np.allclose(indice, [2, 3, 4, 5, 6, 7, 8, 9, 0, 1]) - # rew: [10, 11, 2, 3, 4, 5, 6, 7, 8, 9] + # rew: [11, 12, 3, 4, 5, 6, 7, 8, 9, 10] # done: [ 0, 1, 0, 1, 0, 0, 0, 1, 0, 0] # test nstep = 1 returns = to_numpy(BasePolicy.compute_nstep_return( - batch, buf, indice, target_q_fn, gamma=.1, n_step=1).pop('returns')) + batch, buf, indice, target_q_fn, gamma=.1, n_step=1 + ).pop('returns').reshape(-1)) assert np.allclose(returns, [2.6, 4, 4.4, 5.3, 6.2, 8, 8, 8.9, 9.8, 12]) r_ = compute_nstep_return_base(1, .1, buf, indice) assert np.allclose(returns, r_), (r_, returns) @@ -118,9 +140,9 @@ def test_nstep_returns(size=10000): assert np.allclose(returns_multidim, returns[:, np.newaxis]) # test nstep = 2 returns = to_numpy(BasePolicy.compute_nstep_return( - batch, buf, indice, target_q_fn, gamma=.1, n_step=2).pop('returns')) - assert np.allclose(returns, [ - 3.4, 4, 5.53, 6.62, 7.8, 8, 9.89, 10.98, 12.2, 12]) + batch, buf, indice, target_q_fn, gamma=.1, n_step=2 + ).pop('returns').reshape(-1)) + assert np.allclose(returns, [3.4, 4, 5.53, 6.62, 7.8, 8, 9.89, 10.98, 12.2, 12]) r_ = compute_nstep_return_base(2, .1, buf, indice) assert np.allclose(returns, r_) returns_multidim = to_numpy(BasePolicy.compute_nstep_return( @@ -129,9 +151,9 @@ def test_nstep_returns(size=10000): assert np.allclose(returns_multidim, returns[:, np.newaxis]) # test nstep = 10 returns = to_numpy(BasePolicy.compute_nstep_return( - batch, buf, indice, target_q_fn, gamma=.1, n_step=10).pop('returns')) - assert np.allclose(returns, [ - 3.4, 4, 5.678, 6.78, 7.8, 8, 10.122, 11.22, 12.2, 12]) + batch, buf, indice, target_q_fn, gamma=.1, n_step=10 + ).pop('returns').reshape(-1)) + assert np.allclose(returns, [3.4, 4, 5.678, 6.78, 7.8, 8, 10.122, 11.22, 12.2, 12]) r_ = compute_nstep_return_base(10, .1, buf, indice) assert np.allclose(returns, r_) returns_multidim = to_numpy(BasePolicy.compute_nstep_return( @@ -142,7 +164,7 @@ def test_nstep_returns(size=10000): if __name__ == '__main__': buf = ReplayBuffer(size) for i in range(int(size * 1.5)): - buf.add(obs=0, act=0, rew=i + 1, done=np.random.randint(3) == 0) + buf.add(Batch(obs=0, act=0, rew=i + 1, done=np.random.randint(3) == 0)) batch, indice = buf.sample(256) def vanilla(): diff --git a/test/continuous/test_ddpg.py b/test/continuous/test_ddpg.py index 52257342d..68d3fc433 100644 --- a/test/continuous/test_ddpg.py +++ b/test/continuous/test_ddpg.py @@ -11,7 +11,7 @@ from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer from tianshou.exploration import GaussianNoise -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import Actor, Critic @@ -25,14 +25,13 @@ def get_args(): parser.add_argument('--gamma', type=float, default=0.99) parser.add_argument('--tau', type=float, default=0.005) parser.add_argument('--exploration-noise', type=float, default=0.1) - parser.add_argument('--test-noise', type=float, default=0.1) parser.add_argument('--epoch', type=int, default=20) parser.add_argument('--step-per-epoch', type=int, default=2400) parser.add_argument('--collect-per-step', type=int, default=4) parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=4) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -86,9 +85,10 @@ def test_ddpg(args=get_args()): estimation_step=args.n_step) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) - test_collector = Collector( - policy, test_envs, action_noise=GaussianNoise(sigma=args.test_noise)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) + test_collector = Collector(policy, test_envs) # log log_path = os.path.join(args.logdir, args.task, 'ddpg') writer = SummaryWriter(log_path) @@ -112,7 +112,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/continuous/test_ppo.py b/test/continuous/test_ppo.py index 1a48aaee9..45a59f425 100644 --- a/test/continuous/test_ppo.py +++ b/test/continuous/test_ppo.py @@ -11,7 +11,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import onpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import ActorProb, Critic @@ -24,7 +24,7 @@ def get_args(): parser.add_argument('--gamma', type=float, default=0.99) parser.add_argument('--epoch', type=int, default=20) parser.add_argument('--step-per-epoch', type=int, default=2400) - parser.add_argument('--collect-per-step', type=int, default=1) + parser.add_argument('--collect-per-step', type=int, default=16) parser.add_argument('--repeat-per-collect', type=int, default=2) parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, @@ -104,7 +104,9 @@ def dist(*logits): gae_lambda=args.gae_lambda) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # log log_path = os.path.join(args.logdir, args.task, 'ppo') @@ -130,7 +132,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/continuous/test_sac_with_il.py b/test/continuous/test_sac_with_il.py index f35e18497..6e075bb5c 100644 --- a/test/continuous/test_sac_with_il.py +++ b/test/continuous/test_sac_with_il.py @@ -9,7 +9,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.policy import SACPolicy, ImitationPolicy from tianshou.utils.net.continuous import Actor, ActorProb, Critic @@ -33,7 +33,7 @@ def get_args(): nargs='*', default=[128, 128]) parser.add_argument('--imitation-hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -92,7 +92,9 @@ def test_sac_with_il(args=get_args()): estimation_step=args.n_step) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -118,7 +120,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") # here we define an imitation collector with a trivial policy policy.eval() @@ -149,7 +152,8 @@ def stop_fn(mean_rewards): il_policy.eval() collector = Collector(il_policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/continuous/test_td3.py b/test/continuous/test_td3.py index 32c0efd43..c90a92aa4 100644 --- a/test/continuous/test_td3.py +++ b/test/continuous/test_td3.py @@ -11,7 +11,7 @@ from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer from tianshou.exploration import GaussianNoise -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.continuous import Actor, Critic @@ -34,7 +34,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=128) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -97,7 +97,9 @@ def test_td3(args=get_args()): estimation_step=args.n_step) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -123,7 +125,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/discrete/test_a2c_with_il.py b/test/discrete/test_a2c_with_il.py index 08759f92e..ea7e6b6ad 100644 --- a/test/discrete/test_a2c_with_il.py +++ b/test/discrete/test_a2c_with_il.py @@ -8,7 +8,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.discrete import Actor, Critic from tianshou.policy import A2CPolicy, ImitationPolicy from tianshou.trainer import onpolicy_trainer, offpolicy_trainer @@ -24,7 +24,7 @@ def get_args(): parser.add_argument('--gamma', type=float, default=0.9) parser.add_argument('--epoch', type=int, default=10) parser.add_argument('--step-per-epoch', type=int, default=1000) - parser.add_argument('--collect-per-step', type=int, default=10) + parser.add_argument('--collect-per-step', type=int, default=8) parser.add_argument('--repeat-per-collect', type=int, default=1) parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, @@ -79,7 +79,9 @@ def test_a2c_with_il(args=get_args()): max_grad_norm=args.max_grad_norm, reward_normalization=args.rew_norm) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # log log_path = os.path.join(args.logdir, args.task, 'a2c') @@ -105,7 +107,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") policy.eval() # here we define an imitation collector with a trivial policy @@ -134,7 +137,8 @@ def stop_fn(mean_rewards): il_policy.eval() collector = Collector(il_policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/discrete/test_c51.py b/test/discrete/test_c51.py index 32a41d0df..684ce9696 100644 --- a/test/discrete/test_c51.py +++ b/test/discrete/test_c51.py @@ -10,7 +10,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer, PrioritizedReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer, PrioritizedVectorReplayBuffer def get_args(): @@ -29,7 +29,7 @@ def get_args(): parser.add_argument('--target-update-freq', type=int, default=320) parser.add_argument('--epoch', type=int, default=10) parser.add_argument('--step-per-epoch', type=int, default=1000) - parser.add_argument('--collect-per-step', type=int, default=10) + parser.add_argument('--collect-per-step', type=int, default=8) parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128, 128, 128]) @@ -75,15 +75,16 @@ def test_c51(args=get_args()): ).to(args.device) # buffer if args.prioritized_replay: - buf = PrioritizedReplayBuffer( - args.buffer_size, alpha=args.alpha, beta=args.beta) + buf = PrioritizedVectorReplayBuffer( + args.buffer_size, buffer_num=len(train_envs), + alpha=args.alpha, beta=args.beta) else: - buf = ReplayBuffer(args.buffer_size) + buf = VectorReplayBuffer(args.buffer_size, buffer_num=len(train_envs)) # collector - train_collector = Collector(policy, train_envs, buf) - test_collector = Collector(policy, test_envs) + train_collector = Collector(policy, train_envs, buf, exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # policy.set_eps(1) - train_collector.collect(n_step=args.batch_size) + train_collector.collect(n_step=args.batch_size * args.training_num) # log log_path = os.path.join(args.logdir, args.task, 'c51') writer = SummaryWriter(log_path) @@ -124,7 +125,8 @@ def test_fn(epoch, env_step): policy.set_eps(args.eps_test) collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") def test_pc51(args=get_args()): diff --git a/test/discrete/test_dqn.py b/test/discrete/test_dqn.py index 1e9f08984..df02684c0 100644 --- a/test/discrete/test_dqn.py +++ b/test/discrete/test_dqn.py @@ -11,7 +11,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer, PrioritizedReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer, PrioritizedVectorReplayBuffer def get_args(): @@ -31,7 +31,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128, 128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -77,15 +77,16 @@ def test_dqn(args=get_args()): target_update_freq=args.target_update_freq) # buffer if args.prioritized_replay: - buf = PrioritizedReplayBuffer( - args.buffer_size, alpha=args.alpha, beta=args.beta) + buf = PrioritizedVectorReplayBuffer( + args.buffer_size, buffer_num=len(train_envs), + alpha=args.alpha, beta=args.beta) else: - buf = ReplayBuffer(args.buffer_size) + buf = VectorReplayBuffer(args.buffer_size, buffer_num=len(train_envs)) # collector - train_collector = Collector(policy, train_envs, buf) - test_collector = Collector(policy, test_envs) + train_collector = Collector(policy, train_envs, buf, exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # policy.set_eps(1) - train_collector.collect(n_step=args.batch_size) + train_collector.collect(n_step=args.batch_size * args.training_num) # log log_path = os.path.join(args.logdir, args.task, 'dqn') writer = SummaryWriter(log_path) @@ -127,10 +128,11 @@ def test_fn(epoch, env_step): policy.set_eps(args.eps_test) collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") # save buffer in pickle format, for imitation learning unittest - buf = ReplayBuffer(args.buffer_size) + buf = VectorReplayBuffer(args.buffer_size, buffer_num=len(test_envs)) collector = Collector(policy, test_envs, buf) collector.collect(n_step=args.buffer_size) pickle.dump(buf, open(args.save_buffer_name, "wb")) diff --git a/test/discrete/test_drqn.py b/test/discrete/test_drqn.py index f3f00e69f..420f8e6cd 100644 --- a/test/discrete/test_drqn.py +++ b/test/discrete/test_drqn.py @@ -10,7 +10,7 @@ from tianshou.env import DummyVectorEnv from tianshou.trainer import offpolicy_trainer from tianshou.utils.net.common import Recurrent -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer def get_args(): @@ -30,7 +30,7 @@ def get_args(): parser.add_argument('--collect-per-step', type=int, default=10) parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--layer-num', type=int, default=3) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -65,13 +65,14 @@ def test_drqn(args=get_args()): net, optim, args.gamma, args.n_step, target_update_freq=args.target_update_freq) # collector - train_collector = Collector( - policy, train_envs, ReplayBuffer( - args.buffer_size, stack_num=args.stack_num, ignore_obs_next=True)) + buffer = VectorReplayBuffer( + args.buffer_size, buffer_num=len(train_envs), + stack_num=args.stack_num, ignore_obs_next=True) + train_collector = Collector(policy, train_envs, buffer, exploration_noise=True) # the stack_num is for RNN training: sample framestack obs - test_collector = Collector(policy, test_envs) + test_collector = Collector(policy, test_envs, exploration_noise=True) # policy.set_eps(1) - train_collector.collect(n_step=args.batch_size) + train_collector.collect(n_step=args.batch_size * args.training_num) # log log_path = os.path.join(args.logdir, args.task, 'drqn') writer = SummaryWriter(log_path) @@ -103,7 +104,8 @@ def test_fn(epoch, env_step): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/discrete/test_il_bcq.py b/test/discrete/test_il_bcq.py index e9e857ef1..3dd2b8d63 100644 --- a/test/discrete/test_il_bcq.py +++ b/test/discrete/test_il_bcq.py @@ -78,7 +78,7 @@ def test_discrete_bcq(args=get_args()): buffer = pickle.load(open(args.load_buffer_name, "rb")) # collector - test_collector = Collector(policy, test_envs) + test_collector = Collector(policy, test_envs, exploration_noise=True) log_path = os.path.join(args.logdir, args.task, 'discrete_bcq') writer = SummaryWriter(log_path) @@ -104,7 +104,8 @@ def stop_fn(mean_rewards): policy.set_eps(args.eps_test) collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == "__main__": diff --git a/test/discrete/test_pg.py b/test/discrete/test_pg.py index b26213c59..a413111b7 100644 --- a/test/discrete/test_pg.py +++ b/test/discrete/test_pg.py @@ -10,7 +10,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import onpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer def get_args(): @@ -22,7 +22,7 @@ def get_args(): parser.add_argument('--gamma', type=float, default=0.95) parser.add_argument('--epoch', type=int, default=10) parser.add_argument('--step-per-epoch', type=int, default=1000) - parser.add_argument('--collect-per-step', type=int, default=10) + parser.add_argument('--collect-per-step', type=int, default=8) parser.add_argument('--repeat-per-collect', type=int, default=2) parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, @@ -65,7 +65,9 @@ def test_pg(args=get_args()): reward_normalization=args.rew_norm) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # log log_path = os.path.join(args.logdir, args.task, 'pg') @@ -91,7 +93,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/discrete/test_ppo.py b/test/discrete/test_ppo.py index e2e671c99..9862ea7f7 100644 --- a/test/discrete/test_ppo.py +++ b/test/discrete/test_ppo.py @@ -10,7 +10,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import onpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.utils.net.discrete import Actor, Critic @@ -91,7 +91,9 @@ def test_ppo(args=get_args()): value_clip=args.value_clip) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # log log_path = os.path.join(args.logdir, args.task, 'ppo') @@ -117,7 +119,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/discrete/test_qrdqn.py b/test/discrete/test_qrdqn.py index 0d03fce97..006dd827b 100644 --- a/test/discrete/test_qrdqn.py +++ b/test/discrete/test_qrdqn.py @@ -10,7 +10,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer, PrioritizedReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer, PrioritizedVectorReplayBuffer def get_args(): @@ -31,7 +31,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128, 128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.) @@ -73,15 +73,16 @@ def test_qrdqn(args=get_args()): ).to(args.device) # buffer if args.prioritized_replay: - buf = PrioritizedReplayBuffer( - args.buffer_size, alpha=args.alpha, beta=args.beta) + buf = PrioritizedVectorReplayBuffer( + args.buffer_size, buffer_num=len(train_envs), + alpha=args.alpha, beta=args.beta) else: - buf = ReplayBuffer(args.buffer_size) + buf = VectorReplayBuffer(args.buffer_size, buffer_num=len(train_envs)) # collector - train_collector = Collector(policy, train_envs, buf) - test_collector = Collector(policy, test_envs) + train_collector = Collector(policy, train_envs, buf, exploration_noise=True) + test_collector = Collector(policy, test_envs, exploration_noise=True) # policy.set_eps(1) - train_collector.collect(n_step=args.batch_size) + train_collector.collect(n_step=args.batch_size * args.training_num) # log log_path = os.path.join(args.logdir, args.task, 'qrdqn') writer = SummaryWriter(log_path) @@ -122,7 +123,8 @@ def test_fn(epoch, env_step): policy.set_eps(args.eps_test) collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") def test_pqrdqn(args=get_args()): diff --git a/test/discrete/test_sac.py b/test/discrete/test_sac.py index 3d3df6f2c..16ab54cb2 100644 --- a/test/discrete/test_sac.py +++ b/test/discrete/test_sac.py @@ -9,7 +9,7 @@ from tianshou.env import SubprocVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.policy import DiscreteSACPolicy from tianshou.utils.net.discrete import Actor, Critic @@ -32,7 +32,7 @@ def get_args(): parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128]) - parser.add_argument('--training-num', type=int, default=16) + parser.add_argument('--training-num', type=int, default=5) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.0) @@ -90,7 +90,9 @@ def test_discrete_sac(args=get_args()): ignore_done=args.ignore_done) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # train_collector.collect(n_step=args.buffer_size) # log @@ -117,7 +119,8 @@ def stop_fn(mean_rewards): policy.eval() collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") if __name__ == '__main__': diff --git a/test/modelbase/test_psrl.py b/test/modelbase/test_psrl.py index 29dfb6b8c..5a813874f 100644 --- a/test/modelbase/test_psrl.py +++ b/test/modelbase/test_psrl.py @@ -7,7 +7,7 @@ from tianshou.policy import PSRLPolicy from tianshou.trainer import onpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.env import DummyVectorEnv, SubprocVectorEnv @@ -61,7 +61,9 @@ def test_psrl(args=get_args()): args.add_done_loop) # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # log writer = SummaryWriter(args.logdir + '/' + args.task) @@ -86,9 +88,9 @@ def stop_fn(mean_rewards): policy.eval() test_envs.seed(args.seed) test_collector.reset() - result = test_collector.collect(n_episode=[1] * args.test_num, - render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + result = test_collector.collect(n_episode=args.test_num, render=args.render) + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") elif env.spec.reward_threshold: assert result["best_reward"] >= env.spec.reward_threshold diff --git a/test/multiagent/Gomoku.py b/test/multiagent/Gomoku.py index 53652a2e4..0c0dcf58c 100644 --- a/test/multiagent/Gomoku.py +++ b/test/multiagent/Gomoku.py @@ -46,7 +46,7 @@ def env_func(): policy.replace_policy(opponent, 3 - args.agent_id) test_collector = Collector(policy, test_envs) results = test_collector.collect(n_episode=100) - rews.append(results['rew']) + rews.append(results['rews'].mean()) rews = np.array(rews) # weight opponent by their difficulty level rews = np.exp(-rews * 10.0) diff --git a/test/multiagent/test_tic_tac_toe.py b/test/multiagent/test_tic_tac_toe.py index 92ecb97c6..1cc06d374 100644 --- a/test/multiagent/test_tic_tac_toe.py +++ b/test/multiagent/test_tic_tac_toe.py @@ -1,10 +1,8 @@ import pprint -from tianshou.data import Collector from tic_tac_toe import get_args, train_agent, watch def test_tic_tac_toe(args=get_args()): - Collector._default_rew_metric = lambda x: x[args.agent_id - 1] if args.watch: watch(args) return diff --git a/test/multiagent/tic_tac_toe.py b/test/multiagent/tic_tac_toe.py index f9d6af104..b081a5055 100644 --- a/test/multiagent/tic_tac_toe.py +++ b/test/multiagent/tic_tac_toe.py @@ -9,7 +9,7 @@ from tianshou.env import DummyVectorEnv from tianshou.utils.net.common import Net from tianshou.trainer import offpolicy_trainer -from tianshou.data import Collector, ReplayBuffer +from tianshou.data import Collector, VectorReplayBuffer from tianshou.policy import BasePolicy, DQNPolicy, RandomPolicy, \ MultiAgentPolicyManager @@ -33,24 +33,24 @@ def get_parser() -> argparse.ArgumentParser: parser.add_argument('--batch-size', type=int, default=64) parser.add_argument('--hidden-sizes', type=int, nargs='*', default=[128, 128, 128, 128]) - parser.add_argument('--training-num', type=int, default=8) + parser.add_argument('--training-num', type=int, default=10) parser.add_argument('--test-num', type=int, default=100) parser.add_argument('--logdir', type=str, default='log') parser.add_argument('--render', type=float, default=0.1) - parser.add_argument('--board_size', type=int, default=6) - parser.add_argument('--win_size', type=int, default=4) - parser.add_argument('--win_rate', type=float, default=0.9, + parser.add_argument('--board-size', type=int, default=6) + parser.add_argument('--win-size', type=int, default=4) + parser.add_argument('--win-rate', type=float, default=0.9, help='the expected winning rate') parser.add_argument('--watch', default=False, action='store_true', help='no training, ' 'watch the play of pre-trained models') - parser.add_argument('--agent_id', type=int, default=2, + parser.add_argument('--agent-id', type=int, default=2, help='the learned agent plays as the' - ' agent_id-th player. choices are 1 and 2.') - parser.add_argument('--resume_path', type=str, default='', + ' agent_id-th player. Choices are 1 and 2.') + parser.add_argument('--resume-path', type=str, default='', help='the path of agent pth file ' 'for resuming from a pre-trained agent') - parser.add_argument('--opponent_path', type=str, default='', + parser.add_argument('--opponent-path', type=str, default='', help='the path of opponent agent pth file ' 'for resuming from a pre-trained agent') parser.add_argument( @@ -61,8 +61,7 @@ def get_parser() -> argparse.ArgumentParser: def get_args() -> argparse.Namespace: parser = get_parser() - args = parser.parse_known_args()[0] - return args + return parser.parse_known_args()[0] def get_agents( @@ -124,10 +123,12 @@ def env_func(): # collector train_collector = Collector( - policy, train_envs, ReplayBuffer(args.buffer_size)) + policy, train_envs, + VectorReplayBuffer(args.buffer_size, len(train_envs)), + exploration_noise=True) test_collector = Collector(policy, test_envs) # policy.set_eps(1) - train_collector.collect(n_step=args.batch_size) + train_collector.collect(n_step=args.batch_size * args.training_num) # log if not hasattr(args, 'writer'): log_path = os.path.join(args.logdir, 'tic_tac_toe', 'dqn') @@ -155,13 +156,16 @@ def train_fn(epoch, env_step): def test_fn(epoch, env_step): policy.policies[args.agent_id - 1].set_eps(args.eps_test) + def reward_metric(rews): + return rews[:, args.agent_id - 1] + # trainer result = offpolicy_trainer( policy, train_collector, test_collector, args.epoch, args.step_per_epoch, args.collect_per_step, args.test_num, args.batch_size, train_fn=train_fn, test_fn=test_fn, - stop_fn=stop_fn, save_fn=save_fn, writer=writer, - test_in_train=False) + stop_fn=stop_fn, save_fn=save_fn, reward_metric=reward_metric, + writer=writer, test_in_train=False) return result, policy.policies[args.agent_id - 1] @@ -178,4 +182,5 @@ def watch( policy.policies[args.agent_id - 1].set_eps(args.eps_test) collector = Collector(policy, env) result = collector.collect(n_episode=1, render=args.render) - print(f'Final reward: {result["rew"]}, length: {result["len"]}') + rews, lens = result["rews"], result["lens"] + print(f"Final reward: {rews.mean()}, length: {lens.mean()}") diff --git a/test/throughput/env.py b/test/throughput/env.py new file mode 120000 index 000000000..9a57534db --- /dev/null +++ b/test/throughput/env.py @@ -0,0 +1 @@ +../base/env.py \ No newline at end of file diff --git a/test/throughput/test_buffer_profile.py b/test/throughput/test_buffer_profile.py index 3134004f1..40ce68889 100644 --- a/test/throughput/test_buffer_profile.py +++ b/test/throughput/test_buffer_profile.py @@ -1,89 +1,61 @@ -import pytest +import sys +import gym +import time +import tqdm import numpy as np - -from tianshou.data import (ListReplayBuffer, PrioritizedReplayBuffer, - ReplayBuffer, SegmentTree) - - -@pytest.fixture(scope="module") -def data(): - np.random.seed(0) - obs = {'observable': np.random.rand(100, 100), - 'hidden': np.random.randint(1000, size=200)} - info = {'policy': "dqn", 'base': np.arange(10)} - add_data = {'obs': obs, 'rew': 1., 'act': np.random.rand(30), - 'done': False, 'obs_next': obs, 'info': info} - buffer = ReplayBuffer(int(1e3), stack_num=100) - buffer2 = ReplayBuffer(int(1e4), stack_num=100) - indexes = np.random.choice(int(1e3), size=3, replace=False) - return { - 'add_data': add_data, - 'buffer': buffer, - 'buffer2': buffer2, - 'slice': slice(-3000, -1000, 2), - 'indexes': indexes, - } - - -def test_init(): - for _ in np.arange(1e5): - _ = ReplayBuffer(1e5) - _ = PrioritizedReplayBuffer(size=int(1e5), alpha=0.5, beta=0.5) - _ = ListReplayBuffer() - - -def test_add(data): - buffer = data['buffer'] - for _ in np.arange(1e5): - buffer.add(**data['add_data']) - - -def test_update(data): - buffer = data['buffer'] - buffer2 = data['buffer2'] - for _ in np.arange(1e2): - buffer2.update(buffer) - - -def test_getitem_slice(data): - Slice = data['slice'] - buffer = data['buffer'] - for _ in np.arange(1e3): - _ = buffer[Slice] - - -def test_getitem_indexes(data): - indexes = data['indexes'] - buffer = data['buffer'] - for _ in np.arange(1e2): - _ = buffer[indexes] - - -def test_get(data): - indexes = data['indexes'] - buffer = data['buffer'] - for _ in np.arange(3e2): - buffer.get(indexes, 'obs') - buffer.get(indexes, 'rew') - buffer.get(indexes, 'done') - buffer.get(indexes, 'info') - - -def test_sample(data): - buffer = data['buffer'] - for _ in np.arange(1e1): - buffer.sample(int(1e2)) - - -def test_segtree(data): - size = 100000 - tree = SegmentTree(size) - tree[np.arange(size)] = np.random.rand(size) - - for i in np.arange(1e5): - scalar = np.random.rand(64) * tree.reduce() - tree.get_prefix_sum_idx(scalar) +from tianshou.data import Batch, ReplayBuffer, VectorReplayBuffer + + +def test_replaybuffer(task="Pendulum-v0"): + total_count = 5 + for _ in tqdm.trange(total_count, desc="ReplayBuffer"): + env = gym.make(task) + buf = ReplayBuffer(10000) + obs = env.reset() + for i in range(100000): + act = env.action_space.sample() + obs_next, rew, done, info = env.step(act) + batch = Batch( + obs=np.array([obs]), + act=np.array([act]), + rew=np.array([rew]), + done=np.array([done]), + obs_next=np.array([obs_next]), + info=np.array([info]), + ) + buf.add(batch, buffer_ids=[0]) + obs = obs_next + if done: + obs = env.reset() + + +def test_vectorbuffer(task="Pendulum-v0"): + total_count = 5 + for _ in tqdm.trange(total_count, desc="VectorReplayBuffer"): + env = gym.make(task) + buf = VectorReplayBuffer(total_size=10000, buffer_num=1) + obs = env.reset() + for i in range(100000): + act = env.action_space.sample() + obs_next, rew, done, info = env.step(act) + batch = Batch( + obs=np.array([obs]), + act=np.array([act]), + rew=np.array([rew]), + done=np.array([done]), + obs_next=np.array([obs_next]), + info=np.array([info]), + ) + buf.add(batch) + obs = obs_next + if done: + obs = env.reset() if __name__ == '__main__': - pytest.main(["-s", "-k buffer_profile", "--durations=0", "-v"]) + t0 = time.time() + test_replaybuffer(sys.argv[-1]) + print("test replaybuffer: ", time.time() - t0) + t0 = time.time() + test_vectorbuffer(sys.argv[-1]) + print("test vectorbuffer: ", time.time() - t0) diff --git a/test/throughput/test_collector_profile.py b/test/throughput/test_collector_profile.py index 4036472f7..6242e694b 100644 --- a/test/throughput/test_collector_profile.py +++ b/test/throughput/test_collector_profile.py @@ -1,144 +1,109 @@ -import gym +import tqdm import numpy as np -import pytest -from gym.spaces.discrete import Discrete -from gym.utils import seeding -from tianshou.data import Batch, Collector, ReplayBuffer -from tianshou.env import DummyVectorEnv, SubprocVectorEnv from tianshou.policy import BasePolicy +from tianshou.env import DummyVectorEnv, SubprocVectorEnv +from tianshou.data import Batch, Collector, AsyncCollector, VectorReplayBuffer - -class SimpleEnv(gym.Env): - """A simplest example of self-defined env, used to minimize - data collect time and profile collector.""" - - def __init__(self): - self.action_space = Discrete(200) - self._fake_data = np.ones((10, 10, 1)) - self.seed(0) - self.reset() - - def reset(self): - self._index = 0 - self.done = np.random.randint(3, high=200) - return {'observable': np.zeros((10, 10, 1)), 'hidden': self._index} - - def step(self, action): - if self._index == self.done: - raise ValueError('step after done !!!') - self._index += 1 - return {'observable': self._fake_data, 'hidden': self._index}, -1, \ - self._index == self.done, {} - - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - - -class SimplePolicy(BasePolicy): - """A simplest example of self-defined policy, used - to minimize data collect time.""" - - def __init__(self, **kwargs): - super().__init__(**kwargs) - - def learn(self, batch, **kwargs): - return super().learn(batch, **kwargs) - - def forward(self, batch, state=None, **kwargs): - return Batch(act=np.array([30] * len(batch)), state=None, logits=None) - - -@pytest.fixture(scope="module") -def data(): - np.random.seed(0) - env = SimpleEnv() - env.seed(0) - env_vec = DummyVectorEnv([lambda: SimpleEnv() for _ in range(100)]) - env_vec.seed(np.random.randint(1000, size=100).tolist()) - env_subproc = SubprocVectorEnv([lambda: SimpleEnv() for _ in range(8)]) - env_subproc.seed(np.random.randint(1000, size=100).tolist()) - env_subproc_init = SubprocVectorEnv( - [lambda: SimpleEnv() for _ in range(8)]) - env_subproc_init.seed(np.random.randint(1000, size=100).tolist()) - buffer = ReplayBuffer(50000) - policy = SimplePolicy() - collector = Collector(policy, env, ReplayBuffer(50000)) - collector_vec = Collector(policy, env_vec, ReplayBuffer(50000)) - collector_subproc = Collector(policy, env_subproc, ReplayBuffer(50000)) - return { - "env": env, - "env_vec": env_vec, - "env_subproc": env_subproc, - "env_subproc_init": env_subproc_init, - "policy": policy, - "buffer": buffer, - "collector": collector, - "collector_vec": collector_vec, - "collector_subproc": collector_subproc, - } - - -def test_init(data): - for _ in range(5000): - Collector(data["policy"], data["env"], data["buffer"]) - - -def test_reset(data): - for _ in range(5000): - data["collector"].reset() - - -def test_collect_st(data): - for _ in range(50): - data["collector"].collect(n_step=1000) - - -def test_collect_ep(data): - for _ in range(50): - data["collector"].collect(n_episode=10) - - -def test_init_vec_env(data): - for _ in range(5000): - Collector(data["policy"], data["env_vec"], data["buffer"]) - - -def test_reset_vec_env(data): - for _ in range(5000): - data["collector_vec"].reset() - - -def test_collect_vec_env_st(data): - for _ in range(50): - data["collector_vec"].collect(n_step=1000) - - -def test_collect_vec_env_ep(data): - for _ in range(50): - data["collector_vec"].collect(n_episode=10) - - -def test_init_subproc_env(data): - for _ in range(5000): - Collector(data["policy"], data["env_subproc_init"], data["buffer"]) - - -def test_reset_subproc_env(data): - for _ in range(5000): - data["collector_subproc"].reset() - - -def test_collect_subproc_env_st(data): - for _ in range(50): - data["collector_subproc"].collect(n_step=1000) - - -def test_collect_subproc_env_ep(data): - for _ in range(50): - data["collector_subproc"].collect(n_episode=10) +if __name__ == '__main__': + from env import MyTestEnv +else: # pytest + from test.base.env import MyTestEnv + + +class MyPolicy(BasePolicy): + def __init__(self, dict_state=False, need_state=True): + """ + :param bool dict_state: if the observation of the environment is a dict + :param bool need_state: if the policy needs the hidden state (for RNN) + """ + super().__init__() + self.dict_state = dict_state + self.need_state = need_state + + def forward(self, batch, state=None): + if self.need_state: + if state is None: + state = np.zeros((len(batch.obs), 2)) + else: + state += 1 + if self.dict_state: + return Batch(act=np.ones(len(batch.obs['index'])), state=state) + return Batch(act=np.ones(len(batch.obs)), state=state) + + def learn(self): + pass + + +def test_collector_nstep(): + policy = MyPolicy() + env_fns = [lambda x=i: MyTestEnv(size=x) for i in np.arange(2, 11)] + dum = DummyVectorEnv(env_fns) + num = len(env_fns) + c3 = Collector(policy, dum, + VectorReplayBuffer(total_size=40000, buffer_num=num)) + for i in tqdm.trange(1, 400, desc="test step collector n_step"): + c3.reset() + result = c3.collect(n_step=i * len(env_fns)) + assert result['n/st'] >= i + + +def test_collector_nepisode(): + policy = MyPolicy() + env_fns = [lambda x=i: MyTestEnv(size=x) for i in np.arange(2, 11)] + dum = DummyVectorEnv(env_fns) + num = len(env_fns) + c3 = Collector(policy, dum, + VectorReplayBuffer(total_size=40000, buffer_num=num)) + for i in tqdm.trange(1, 400, desc="test step collector n_episode"): + c3.reset() + result = c3.collect(n_episode=i) + assert result['n/ep'] == i + assert result['n/st'] == len(c3.buffer) + + +def test_asynccollector(): + env_lens = [2, 3, 4, 5] + env_fns = [lambda x=i: MyTestEnv(size=x, sleep=0.001, random_sleep=True) + for i in env_lens] + + venv = SubprocVectorEnv(env_fns, wait_num=len(env_fns) - 1) + policy = MyPolicy() + bufsize = 300 + c1 = AsyncCollector( + policy, venv, + VectorReplayBuffer(total_size=bufsize * 4, buffer_num=4)) + ptr = [0, 0, 0, 0] + for n_episode in tqdm.trange(1, 100, desc="test async n_episode"): + result = c1.collect(n_episode=n_episode) + assert result["n/ep"] >= n_episode + # check buffer data, obs and obs_next, env_id + for i, count in enumerate( + np.bincount(result["lens"], minlength=6)[2:]): + env_len = i + 2 + total = env_len * count + indices = np.arange(ptr[i], ptr[i] + total) % bufsize + ptr[i] = (ptr[i] + total) % bufsize + seq = np.arange(env_len) + buf = c1.buffer.buffers[i] + assert np.all(buf.info.env_id[indices] == i) + assert np.all(buf.obs[indices].reshape(count, env_len) == seq) + assert np.all(buf.obs_next[indices].reshape( + count, env_len) == seq + 1) + # test async n_step, for now the buffer should be full of data + for n_step in tqdm.trange(1, 150, desc="test async n_step"): + result = c1.collect(n_step=n_step) + assert result["n/st"] >= n_step + for i in range(4): + env_len = i + 2 + seq = np.arange(env_len) + buf = c1.buffer.buffers[i] + assert np.all(buf.info.env_id == i) + assert np.all(buf.obs.reshape(-1, env_len) == seq) + assert np.all(buf.obs_next.reshape(-1, env_len) == seq + 1) if __name__ == '__main__': - pytest.main(["-s", "-k collector_profile", "--durations=0", "-v"]) + test_collector_nstep() + test_collector_nepisode() + test_asynccollector() diff --git a/tianshou/data/__init__.py b/tianshou/data/__init__.py index 368427a19..137cf171f 100644 --- a/tianshou/data/__init__.py +++ b/tianshou/data/__init__.py @@ -1,9 +1,16 @@ from tianshou.data.batch import Batch from tianshou.data.utils.converter import to_numpy, to_torch, to_torch_as from tianshou.data.utils.segtree import SegmentTree -from tianshou.data.buffer import ReplayBuffer, ListReplayBuffer, \ - PrioritizedReplayBuffer, ReplayBufferManager, CachedReplayBuffer -from tianshou.data.collector import Collector +from tianshou.data.buffer import ( + ReplayBuffer, + PrioritizedReplayBuffer, + ReplayBufferManager, + PrioritizedReplayBufferManager, + VectorReplayBuffer, + PrioritizedVectorReplayBuffer, + CachedReplayBuffer, +) +from tianshou.data.collector import Collector, AsyncCollector __all__ = [ "Batch", @@ -12,9 +19,12 @@ "to_torch_as", "SegmentTree", "ReplayBuffer", - "ListReplayBuffer", "PrioritizedReplayBuffer", "ReplayBufferManager", + "PrioritizedReplayBufferManager", + "VectorReplayBuffer", + "PrioritizedVectorReplayBuffer", "CachedReplayBuffer", "Collector", + "AsyncCollector", ] diff --git a/tianshou/data/batch.py b/tianshou/data/batch.py index 4f15622ab..fd12e0ec4 100644 --- a/tianshou/data/batch.py +++ b/tianshou/data/batch.py @@ -5,8 +5,7 @@ from copy import deepcopy from numbers import Number from collections.abc import Collection -from typing import Any, List, Dict, Union, Iterator, Optional, Iterable, \ - Sequence +from typing import Any, List, Dict, Union, Iterator, Optional, Iterable, Sequence def _is_batch_set(data: Any) -> bool: @@ -35,8 +34,8 @@ def _is_scalar(value: Any) -> bool: if isinstance(value, torch.Tensor): return value.numel() == 1 and not value.shape else: - value = np.asanyarray(value) - return value.size == 1 and not value.shape + # np.asanyarray will cause dead loop in some cases + return np.isscalar(value) def _is_number(value: Any) -> bool: @@ -48,10 +47,8 @@ def _is_number(value: Any) -> bool: def _to_array_with_correct_type(v: Any) -> np.ndarray: - if isinstance(v, np.ndarray) and issubclass( - v.dtype.type, (np.bool_, np.number) - ): # most often case - return v + if isinstance(v, np.ndarray) and issubclass(v.dtype.type, (np.bool_, np.number)): + return v # most often case # convert the value to np.ndarray # convert to np.object data type if neither bool nor number # raises an exception if array's elements are tensors themself @@ -66,9 +63,7 @@ def _to_array_with_correct_type(v: Any) -> np.ndarray: # array([{}, array({}, dtype=object)], dtype=object) if not v.shape: v = v.item(0) - elif any( - isinstance(e, (np.ndarray, torch.Tensor)) for e in v.reshape(-1) - ): + elif any(isinstance(e, (np.ndarray, torch.Tensor)) for e in v.reshape(-1)): raise ValueError("Numpy arrays of tensors are not supported yet.") return v @@ -78,20 +73,16 @@ def _create_value( ) -> Union["Batch", np.ndarray, torch.Tensor]: """Create empty place-holders accroding to inst's shape. - :param bool stack: whether to stack or to concatenate. E.g. if inst has - shape of (3, 5), size = 10, stack=True returns an np.ndarry with shape - of (10, 3, 5), otherwise (10, 5) + :param bool stack: whether to stack or to concatenate. E.g. if inst has shape of + (3, 5), size = 10, stack=True returns an np.ndarry with shape of (10, 3, 5), + otherwise (10, 5) """ has_shape = isinstance(inst, (np.ndarray, torch.Tensor)) is_scalar = _is_scalar(inst) if not stack and is_scalar: - # _create_value(Batch(a={}, b=[1, 2, 3]), 10, False) will fail here - if isinstance(inst, Batch) and inst.is_empty(recurse=True): - return inst - # should never hit since it has already checked in Batch.cat_ - # here we do not consider scalar types, following the behavior of numpy - # which does not support concatenation of zero-dimensional arrays - # (scalars) + # should never hit since it has already checked in Batch.cat_ , here we do not + # consider scalar types, following the behavior of numpy which does not support + # concatenation of zero-dimensional arrays (scalars) raise TypeError(f"cannot concatenate with {inst} which is scalar") if has_shape: shape = (size, *inst.shape) if stack else (size, *inst.shape[1:]) @@ -106,9 +97,7 @@ def _create_value( dtype=target_type ) elif isinstance(inst, torch.Tensor): - return torch.full( - shape, fill_value=0, device=inst.device, dtype=inst.dtype - ) + return torch.full(shape, fill_value=0, device=inst.device, dtype=inst.dtype) elif isinstance(inst, (dict, Batch)): zero_batch = Batch() for key, val in inst.items(): @@ -121,9 +110,8 @@ def _create_value( def _assert_type_keys(keys: Iterable[str]) -> None: - assert all( - isinstance(e, str) for e in keys - ), f"keys should all be string, but got {keys}" + assert all(isinstance(e, str) for e in keys), \ + f"keys should all be string, but got {keys}" def _parse_value(v: Any) -> Optional[Union["Batch", np.ndarray, torch.Tensor]]: @@ -440,9 +428,7 @@ def __cat( val, sum_lens[-1], stack=False) self.__dict__[k][sum_lens[i]:sum_lens[i + 1]] = val - def cat_( - self, batches: Union["Batch", Sequence[Union[dict, "Batch"]]] - ) -> None: + def cat_(self, batches: Union["Batch", Sequence[Union[dict, "Batch"]]]) -> None: """Concatenate a list of (or one) Batch objects into current batch.""" if isinstance(batches, Batch): batches = [batches] @@ -498,9 +484,7 @@ def cat(batches: Sequence[Union[dict, "Batch"]]) -> "Batch": batch.cat_(batches) return batch - def stack_( - self, batches: Sequence[Union[dict, "Batch"]], axis: int = 0 - ) -> None: + def stack_(self, batches: Sequence[Union[dict, "Batch"]], axis: int = 0) -> None: """Stack a list of Batch object into current batch.""" # check input format batch_list = [] @@ -564,9 +548,7 @@ def stack_( self.__dict__[k][i] = val @staticmethod - def stack( - batches: Sequence[Union[dict, "Batch"]], axis: int = 0 - ) -> "Batch": + def stack(batches: Sequence[Union[dict, "Batch"]], axis: int = 0) -> "Batch": """Stack a list of Batch object into a single new batch. For keys that are not shared across all batches, batches that do not @@ -593,10 +575,7 @@ def stack( return batch def empty_( - self, - index: Union[ - str, slice, int, np.integer, np.ndarray, List[int] - ] = None, + self, index: Union[str, slice, int, np.integer, np.ndarray, List[int]] = None ) -> "Batch": """Return an empty Batch object with 0 or None filled. @@ -646,9 +625,7 @@ def empty_( @staticmethod def empty( batch: "Batch", - index: Union[ - str, slice, int, np.integer, np.ndarray, List[int] - ] = None, + index: Union[str, slice, int, np.integer, np.ndarray, List[int]] = None, ) -> "Batch": """Return an empty Batch object with 0 or None filled. @@ -674,9 +651,7 @@ def __len__(self) -> int: for v in self.__dict__.values(): if isinstance(v, Batch) and v.is_empty(recurse=True): continue - elif hasattr(v, "__len__") and ( - isinstance(v, Batch) or v.ndim > 0 - ): + elif hasattr(v, "__len__") and (isinstance(v, Batch) or v.ndim > 0): r.append(len(v)) else: raise TypeError(f"Object {v} in {self} has no len()") diff --git a/tianshou/data/buffer.py b/tianshou/data/buffer.py index 57b50017e..b24e8c6b2 100644 --- a/tianshou/data/buffer.py +++ b/tianshou/data/buffer.py @@ -1,38 +1,47 @@ import h5py import torch -import warnings import numpy as np -from numbers import Number -from typing import Any, Dict, List, Tuple, Union, Optional +from typing import Any, Dict, List, Tuple, Union, Sequence, Optional from tianshou.data.batch import _create_value from tianshou.data import Batch, SegmentTree, to_numpy from tianshou.data.utils.converter import to_hdf5, from_hdf5 +def _alloc_by_keys_diff( + meta: Batch, batch: Batch, size: int, stack: bool = True +) -> None: + for key in batch.keys(): + if key in meta.keys(): + if isinstance(meta[key], Batch) and isinstance(batch[key], Batch): + _alloc_by_keys_diff(meta[key], batch[key], size, stack) + elif isinstance(meta[key], Batch) and meta[key].is_empty(): + meta[key] = _create_value(batch[key], size, stack) + else: + meta[key] = _create_value(batch[key], size, stack) + + class ReplayBuffer: - """:class:`~tianshou.data.ReplayBuffer` stores data generated from \ - interaction between the policy and environment. + """:class:`~tianshou.data.ReplayBuffer` stores data generated from interaction \ + between the policy and environment. - ReplayBuffer can be considered as a specialized form (or management) of - Batch. It stores all the data in a batch with circular-queue style. + ReplayBuffer can be considered as a specialized form (or management) of Batch. It + stores all the data in a batch with circular-queue style. For the example usage of ReplayBuffer, please check out Section Buffer in :doc:`/tutorials/concepts`. :param int size: the maximum size of replay buffer. - :param int stack_num: the frame-stack sampling argument, should be greater - than or equal to 1, defaults to 1 (no stacking). - :param bool ignore_obs_next: whether to store obs_next, defaults to False. - :param bool save_only_last_obs: only save the last obs/obs_next when it has - a shape of (timestep, ...) because of temporal stacking, defaults to - False. - :param bool sample_avail: the parameter indicating sampling only available - index when using frame-stack sampling method, defaults to False. + :param int stack_num: the frame-stack sampling argument, should be greater than or + equal to 1. Default to 1 (no stacking). + :param bool ignore_obs_next: whether to store obs_next. Default to False. + :param bool save_only_last_obs: only save the last obs/obs_next when it has a shape + of (timestep, ...) because of temporal stacking. Default to False. + :param bool sample_avail: the parameter indicating sampling only available index + when using frame-stack sampling method. Default to False. """ - _reserved_keys = ("obs", "act", "rew", "done", - "obs_next", "info", "policy") + _reserved_keys = ("obs", "act", "rew", "done", "obs_next", "info", "policy") def __init__( self, @@ -41,6 +50,7 @@ def __init__( ignore_obs_next: bool = False, save_only_last_obs: bool = False, sample_avail: bool = False, + **kwargs: Any, # otherwise PrioritizedVectorReplayBuffer will cause TypeError ) -> None: self.options: Dict[str, Any] = { "stack_num": stack_num, @@ -86,8 +96,9 @@ def __setstate__(self, state: Dict[str, Any]) -> None: def __setattr__(self, key: str, value: Any) -> None: """Set self.key = value.""" - assert key not in self._reserved_keys, ( - "key '{}' is reserved and cannot be assigned".format(key)) + assert ( + key not in self._reserved_keys + ), "key '{}' is reserved and cannot be assigned".format(key) super().__setattr__(key, value) def save_hdf5(self, path: str) -> None: @@ -96,9 +107,7 @@ def save_hdf5(self, path: str) -> None: to_hdf5(self.__dict__, f) @classmethod - def load_hdf5( - cls, path: str, device: Optional[str] = None - ) -> "ReplayBuffer": + def load_hdf5(cls, path: str, device: Optional[str] = None) -> "ReplayBuffer": """Load replay buffer from HDF5 file.""" with h5py.File(path, "r") as f: buf = cls.__new__(cls) @@ -108,20 +117,19 @@ def load_hdf5( def reset(self) -> None: """Clear all the data in replay buffer and episode statistics.""" self._index = self._size = 0 - self._episode_length, self._episode_reward = 0, 0.0 + self._ep_rew, self._ep_len, self._ep_idx = 0.0, 0, 0 def set_batch(self, batch: Batch) -> None: """Manually choose the batch you want the ReplayBuffer to manage.""" - assert len(batch) == self.maxsize and \ - set(batch.keys()).issubset(self._reserved_keys), \ - "Input batch doesn't meet ReplayBuffer's data form requirement." + assert len(batch) == self.maxsize and set(batch.keys()).issubset( + self._reserved_keys + ), "Input batch doesn't meet ReplayBuffer's data form requirement." self._meta = batch def unfinished_index(self) -> np.ndarray: """Return the index of unfinished episode.""" last = (self._index - 1) % self._size if self._size else 0 - return np.array( - [last] if not self.done[last] and self._size else [], np.int) + return np.array([last] if not self.done[last] and self._size else [], np.int) def prev(self, index: Union[int, np.integer, np.ndarray]) -> np.ndarray: """Return the index of previous transition. @@ -140,119 +148,126 @@ def next(self, index: Union[int, np.integer, np.ndarray]) -> np.ndarray: end_flag = self.done[index] | np.isin(index, self.unfinished_index()) return (index + (1 - end_flag)) % self._size - def update(self, buffer: "ReplayBuffer") -> None: - """Move the data from the given buffer to current buffer.""" + def update(self, buffer: "ReplayBuffer") -> np.ndarray: + """Move the data from the given buffer to current buffer. + + Return the updated indices. If update fails, return an empty array. + """ if len(buffer) == 0 or self.maxsize == 0: - return + return np.array([], np.int) stack_num, buffer.stack_num = buffer.stack_num, 1 - save_only_last_obs = self._save_only_last_obs - self._save_only_last_obs = False - indices = buffer.sample_index(0) # get all available indices - for i in indices: - self.add(**buffer[i]) # type: ignore + from_indices = buffer.sample_index(0) # get all available indices buffer.stack_num = stack_num - self._save_only_last_obs = save_only_last_obs + if len(from_indices) == 0: + return np.array([], np.int) + to_indices = [] + for _ in range(len(from_indices)): + to_indices.append(self._index) + self._index = (self._index + 1) % self.maxsize + self._size = min(self._size + 1, self.maxsize) + to_indices = np.array(to_indices) + if self._meta.is_empty(): + self._meta = _create_value( # type: ignore + buffer._meta, self.maxsize, stack=False) + self._meta[to_indices] = buffer._meta[from_indices] + return to_indices + + def _add_index( + self, rew: Union[float, np.ndarray], done: bool + ) -> Tuple[int, Union[float, np.ndarray], int, int]: + """Maintain the buffer's state after adding one data batch. + + Return (index_to_be_modified, episode_reward, episode_length, + episode_start_index). + """ + ptr = self._index + self._size = min(self._size + 1, self.maxsize) + self._index = (self._index + 1) % self.maxsize - def _buffer_allocator(self, key: List[str], value: Any) -> None: - """Allocate memory on buffer._meta for new (key, value) pair.""" - data = self._meta - for k in key[:-1]: - data = data[k] - data[key[-1]] = _create_value(value, self.maxsize) + self._ep_rew += rew + self._ep_len += 1 - def _add_to_buffer(self, name: str, inst: Any) -> None: - try: - value = self._meta.__dict__[name] - except KeyError: - self._buffer_allocator([name], inst) - value = self._meta[name] - if isinstance(inst, (torch.Tensor, np.ndarray)): - if inst.shape != value.shape[1:]: - raise ValueError( - "Cannot add data to a buffer with different shape with key" - f" {name}, expect {value.shape[1:]}, given {inst.shape}." - ) - try: - value[self._index] = inst - except ValueError: # inst is a dict/Batch - for key in set(inst.keys()).difference(value.keys()): - self._buffer_allocator([name, key], inst[key]) - self._meta[name][self._index] = inst + if done: + result = ptr, self._ep_rew, self._ep_len, self._ep_idx + self._ep_rew, self._ep_len, self._ep_idx = 0.0, 0, self._index + return result + else: + return ptr, self._ep_rew * 0.0, 0, self._ep_idx def add( - self, - obs: Any, - act: Any, - rew: Union[Number, np.number, np.ndarray], - done: Union[Number, np.number, np.bool_], - obs_next: Any = None, - info: Optional[Union[dict, Batch]] = {}, - policy: Optional[Union[dict, Batch]] = {}, - **kwargs: Any, - ) -> Tuple[int, Union[float, np.ndarray]]: + self, batch: Batch, buffer_ids: Optional[Union[np.ndarray, List[int]]] = None + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Add a batch of data into replay buffer. - Return (episode_length, episode_reward) if one episode is terminated, - otherwise return (0, 0.0). + :param Batch batch: the input data batch. Its keys must belong to the 7 + reserved keys, and "obs", "act", "rew", "done" is required. + :param buffer_ids: to make consistent with other buffer's add function; if it + is not None, we assume the input batch's first dimension is always 1. + + Return (current_index, episode_reward, episode_length, episode_start_index). If + the episode is not finished, the return value of episode_length and + episode_reward is 0. """ - assert isinstance( - info, (dict, Batch) - ), "You should return a dict in the last argument of env.step()." + # preprocess batch + b = Batch() + for key in set(self._reserved_keys).intersection(batch.keys()): + b.__dict__[key] = batch[key] + batch = b + assert set(["obs", "act", "rew", "done"]).issubset(batch.keys()) + stacked_batch = buffer_ids is not None + if stacked_batch: + assert len(batch) == 1 if self._save_only_last_obs: - obs = obs[-1] - self._add_to_buffer("obs", obs) - self._add_to_buffer("act", act) - # make sure the data type of reward is float instead of int - # but rew may be np.ndarray, so that we cannot use float(rew) - rew = rew * 1.0 # type: ignore - self._add_to_buffer("rew", rew) - self._add_to_buffer("done", bool(done)) # done should be a bool scalar - if self._save_obs_next: - if obs_next is None: - obs_next = Batch() - elif self._save_only_last_obs: - obs_next = obs_next[-1] - self._add_to_buffer("obs_next", obs_next) - self._add_to_buffer("info", info) - self._add_to_buffer("policy", policy) - - if self.maxsize > 0: - self._size = min(self._size + 1, self.maxsize) - self._index = (self._index + 1) % self.maxsize - else: # TODO: remove this after deleting ListReplayBuffer - self._size = self._index = self._size + 1 - - self._episode_reward += rew - self._episode_length += 1 - - if done: - result = self._episode_length, self._episode_reward - self._episode_length, self._episode_reward = 0, 0.0 - return result + batch.obs = batch.obs[:, -1] if stacked_batch else batch.obs[-1] + if not self._save_obs_next: + batch.pop("obs_next", None) + elif self._save_only_last_obs: + batch.obs_next = ( + batch.obs_next[:, -1] if stacked_batch else batch.obs_next[-1] + ) + # get ptr + if stacked_batch: + rew, done = batch.rew[0], batch.done[0] else: - return 0, self._episode_reward * 0.0 + rew, done = batch.rew, batch.done + ptr, ep_rew, ep_len, ep_idx = list( + map(lambda x: np.array([x]), self._add_index(rew, done)) + ) + try: + self._meta[ptr] = batch + except ValueError: + stack = not stacked_batch + batch.rew = batch.rew.astype(np.float) + batch.done = batch.done.astype(np.bool_) + if self._meta.is_empty(): + self._meta = _create_value( # type: ignore + batch, self.maxsize, stack) + else: # dynamic key pops up in batch + _alloc_by_keys_diff(self._meta, batch, self.maxsize, stack) + self._meta[ptr] = batch + return ptr, ep_rew, ep_len, ep_idx def sample_index(self, batch_size: int) -> np.ndarray: """Get a random sample of index with size = batch_size. - Return all available indices in the buffer if batch_size is 0; return - an empty numpy array if batch_size < 0 or no available index can be - sampled. + Return all available indices in the buffer if batch_size is 0; return an empty + numpy array if batch_size < 0 or no available index can be sampled. """ if self.stack_num == 1 or not self._sample_avail: # most often case if batch_size > 0: return np.random.choice(self._size, batch_size) elif batch_size == 0: # construct current available indices - return np.concatenate([ - np.arange(self._index, self._size), - np.arange(self._index)]) + return np.concatenate( + [np.arange(self._index, self._size), np.arange(self._index)] + ) else: return np.array([], np.int) else: if batch_size < 0: return np.array([], np.int) - all_indices = prev_indices = np.concatenate([ - np.arange(self._index, self._size), np.arange(self._index)]) + all_indices = prev_indices = np.concatenate( + [np.arange(self._index, self._size), np.arange(self._index)] + ) for _ in range(self.stack_num - 2): prev_indices = self.prev(prev_indices) all_indices = all_indices[prev_indices != self.prev(prev_indices)] @@ -275,16 +290,18 @@ def get( self, index: Union[int, np.integer, np.ndarray], key: str, + default_value: Optional[Any] = None, stack_num: Optional[int] = None, ) -> Union[Batch, np.ndarray]: """Return the stacked result. - E.g. [s_{t-3}, s_{t-2}, s_{t-1}, s_t], where s is self.key, t is the - index. + E.g. [s_{t-3}, s_{t-2}, s_{t-1}, s_t], where s is self.key, t is the index. """ + if key not in self._meta and default_value is not None: + return default_value + val = self._meta[key] if stack_num is None: stack_num = self.stack_num - val = self._meta[key] try: if stack_num == 1: # the most often case return val[index] @@ -302,72 +319,35 @@ def get( raise e # val != Batch() return Batch() - def __getitem__( - self, index: Union[slice, int, np.integer, np.ndarray] - ) -> Batch: + def __getitem__(self, index: Union[slice, int, np.integer, np.ndarray]) -> Batch: """Return a data batch: self[index]. - If stack_num is larger than 1, return the stacked obs and obs_next with - shape (batch, len, ...). + If stack_num is larger than 1, return the stacked obs and obs_next with shape + (batch, len, ...). """ if isinstance(index, slice): # change slice to np array - index = self._indices[:len(self)][index] - # raise KeyError first instead of AttributeError, to support np.array + if index == slice(None): # buffer[:] will get all available data + index = self.sample_index(0) + else: + index = self._indices[:len(self)][index] + # raise KeyError first instead of AttributeError, + # to support np.array([ReplayBuffer()]) obs = self.get(index, "obs") if self._save_obs_next: - obs_next = self.get(index, "obs_next") + obs_next = self.get(index, "obs_next", Batch()) else: - obs_next = self.get(self.next(index), "obs") + obs_next = self.get(self.next(index), "obs", Batch()) return Batch( obs=obs, act=self.act[index], rew=self.rew[index], done=self.done[index], obs_next=obs_next, - info=self.get(index, "info"), - policy=self.get(index, "policy"), + info=self.get(index, "info", Batch()), + policy=self.get(index, "policy", Batch()), ) -class ListReplayBuffer(ReplayBuffer): - """List-based replay buffer. - - The function of :class:`~tianshou.data.ListReplayBuffer` is almost the same - as :class:`~tianshou.data.ReplayBuffer`. The only difference is that - :class:`~tianshou.data.ListReplayBuffer` is based on list. Therefore, - it does not support advanced indexing, which means you cannot sample a - batch of data out of it. It is typically used for storing data. - - .. seealso:: - - Please refer to :class:`~tianshou.data.ReplayBuffer` for more detailed - explanation. - """ - - def __init__(self, **kwargs: Any) -> None: - warnings.warn("ListReplayBuffer will be replaced in version 0.4.0.") - super().__init__(size=0, ignore_obs_next=False, **kwargs) - warnings.warn("ListReplayBuffer will be removed in version 0.4.0.") - - def sample(self, batch_size: int) -> Tuple[Batch, np.ndarray]: - raise NotImplementedError("ListReplayBuffer cannot be sampled!") - - def _add_to_buffer(self, name: str, inst: Any) -> None: - if self._meta.get(name) is None: - self._meta.__dict__[name] = [] - self._meta[name].append(inst) - - def reset(self) -> None: - super().reset() - for k in self._meta.keys(): - if isinstance(self._meta[k], list): - self._meta.__dict__[k] = [] - - def update(self, buffer: ReplayBuffer) -> None: - """The ListReplayBuffer cannot be updated by any buffer.""" - raise NotImplementedError - - class PrioritizedReplayBuffer(ReplayBuffer): """Implementation of Prioritized Experience Replay. arXiv:1511.05952. @@ -380,41 +360,34 @@ class PrioritizedReplayBuffer(ReplayBuffer): explanation. """ - def __init__( - self, size: int, alpha: float, beta: float, **kwargs: Any - ) -> None: - super().__init__(size, **kwargs) + def __init__(self, size: int, alpha: float, beta: float, **kwargs: Any) -> None: + # will raise KeyError in PrioritizedVectorReplayBuffer + # super().__init__(size, **kwargs) + ReplayBuffer.__init__(self, size, **kwargs) assert alpha > 0.0 and beta >= 0.0 self._alpha, self._beta = alpha, beta self._max_prio = self._min_prio = 1.0 # save weight directly in this class instead of self._meta self.weight = SegmentTree(size) self.__eps = np.finfo(np.float32).eps.item() + self.options.update(alpha=alpha, beta=beta) + + def init_weight(self, index: Union[int, np.ndarray]) -> None: + self.weight[index] = self._max_prio ** self._alpha + + def update(self, buffer: ReplayBuffer) -> np.ndarray: + indices = super().update(buffer) + self.init_weight(indices) def add( - self, - obs: Any, - act: Any, - rew: Union[Number, np.number, np.ndarray], - done: Union[Number, np.number, np.bool_], - obs_next: Any = None, - info: Optional[Union[dict, Batch]] = {}, - policy: Optional[Union[dict, Batch]] = {}, - weight: Optional[Union[Number, np.number]] = None, - **kwargs: Any, - ) -> Tuple[int, Union[float, np.ndarray]]: - if weight is None: - weight = self._max_prio - else: - weight = np.abs(weight) - self._max_prio = max(self._max_prio, weight) - self._min_prio = min(self._min_prio, weight) - self.weight[self._index] = weight ** self._alpha - return super().add(obs, act, rew, done, obs_next, - info, policy, **kwargs) + self, batch: Batch, buffer_ids: Optional[Union[np.ndarray, List[int]]] = None + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + ptr, ep_rew, ep_len, ep_idx = super().add(batch, buffer_ids) + self.init_weight(ptr) + return ptr, ep_rew, ep_len, ep_idx def sample_index(self, batch_size: int) -> np.ndarray: - if batch_size > 0 and self._size > 0: + if batch_size > 0 and len(self) > 0: scalar = np.random.rand(batch_size) * self.weight.reduce() return self.weight.get_prefix_sum_idx(scalar) else: @@ -425,9 +398,9 @@ def get_weight( ) -> np.ndarray: """Get the importance sampling weight. - The "weight" in the returned Batch is the weight on loss function - to de-bias the sampling process (some transition tuples are sampled - more often so their losses are weighted less). + The "weight" in the returned Batch is the weight on loss function to de-bias + the sampling process (some transition tuples are sampled more often so their + losses are weighted less). """ # important sampling weight calculation # original formula: ((p_j/p_sum*N)**(-beta))/((p_min/p_sum*N)**(-beta)) @@ -435,9 +408,7 @@ def get_weight( return (self.weight[index] / self._min_prio) ** (-self._beta) def update_weight( - self, - index: np.ndarray, - new_weight: Union[np.ndarray, torch.Tensor], + self, index: np.ndarray, new_weight: Union[np.ndarray, torch.Tensor] ) -> None: """Update priority weight by index in this buffer. @@ -449,21 +420,20 @@ def update_weight( self._max_prio = max(self._max_prio, weight.max()) self._min_prio = min(self._min_prio, weight.min()) - def __getitem__( - self, index: Union[slice, int, np.integer, np.ndarray] - ) -> Batch: + def __getitem__(self, index: Union[slice, int, np.integer, np.ndarray]) -> Batch: batch = super().__getitem__(index) batch.weight = self.get_weight(index) return batch class ReplayBufferManager(ReplayBuffer): - """ReplayBufferManager contains a list of ReplayBuffer. + """ReplayBufferManager contains a list of ReplayBuffer with exactly the same \ + configuration. - These replay buffers have contiguous memory layout, and the storage space - each buffer has is a shallow copy of the topmost memory. + These replay buffers have contiguous memory layout, and the storage space each + buffer has is a shallow copy of the topmost memory. - :param int buffer_list: a list of ReplayBuffers needed to be handled. + :param buffer_list: a list of ReplayBuffer needed to be handled. .. seealso:: @@ -471,19 +441,19 @@ class ReplayBufferManager(ReplayBuffer): explanation. """ - def __init__(self, buffer_list: List[ReplayBuffer], **kwargs: Any) -> None: + def __init__(self, buffer_list: List[ReplayBuffer]) -> None: self.buffer_num = len(buffer_list) - self.buffers = buffer_list - self._offset = [] - offset = 0 + self.buffers = np.array(buffer_list) + offset, size = [], 0 + buffer_type = type(self.buffers[0]) + kwargs = self.buffers[0].options for buf in self.buffers: - # overwrite sub-buffers' _buffer_allocator so that - # the top buffer can allocate new memory for all sub-buffers - buf._buffer_allocator = self._buffer_allocator # type: ignore assert buf._meta.is_empty() - self._offset.append(offset) - offset += buf.maxsize - super().__init__(size=offset, **kwargs) + assert isinstance(buf, buffer_type) and buf.options == kwargs + offset.append(size) + size += buf.maxsize + self._offset = np.array(offset) + super().__init__(size=size, **kwargs) def __len__(self) -> int: return sum([len(buf) for buf in self.buffers]) @@ -503,7 +473,8 @@ def set_batch(self, batch: Batch) -> None: def unfinished_index(self) -> np.ndarray: return np.concatenate([ buf.unfinished_index() + offset - for offset, buf in zip(self._offset, self.buffers)]) + for offset, buf in zip(self._offset, self.buffers) + ]) def prev(self, index: Union[int, np.integer, np.ndarray]) -> np.ndarray: index = np.asarray(index) % self.maxsize @@ -523,49 +494,60 @@ def next(self, index: Union[int, np.integer, np.ndarray]) -> np.ndarray: next_indices[mask] = buf.next(index[mask] - offset) + offset return next_indices - def update(self, buffer: ReplayBuffer) -> None: + def update(self, buffer: ReplayBuffer) -> np.ndarray: """The ReplayBufferManager cannot be updated by any buffer.""" raise NotImplementedError - def _buffer_allocator(self, key: List[str], value: Any) -> None: - super()._buffer_allocator(key, value) - self._set_batch_for_children() - - def add( # type: ignore - self, - obs: Any, - act: Any, - rew: np.ndarray, - done: np.ndarray, - obs_next: Any = Batch(), - info: Optional[Batch] = Batch(), - policy: Optional[Batch] = Batch(), - buffer_ids: Optional[Union[np.ndarray, List[int]]] = None, - **kwargs: Any - ) -> Tuple[np.ndarray, np.ndarray]: + def add( + self, batch: Batch, buffer_ids: Optional[Union[np.ndarray, List[int]]] = None + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Add a batch of data into ReplayBufferManager. Each of the data's length (first dimension) must equal to the length of buffer_ids. By default buffer_ids is [0, 1, ..., buffer_num - 1]. - Return the array of episode_length and episode_reward with shape - (len(buffer_ids), ...), where (episode_length[i], episode_reward[i]) - refers to the buffer_ids[i]'s corresponding episode result. + Return (current_index, episode_reward, episode_length, episode_start_index). If + the episode is not finished, the return value of episode_length and + episode_reward is 0. """ + # preprocess batch + b = Batch() + for key in set(self._reserved_keys).intersection(batch.keys()): + b.__dict__[key] = batch[key] + batch = b + assert set(["obs", "act", "rew", "done"]).issubset(batch.keys()) + if self._save_only_last_obs: + batch.obs = batch.obs[:, -1] + if not self._save_obs_next: + batch.pop("obs_next", None) + elif self._save_only_last_obs: + batch.obs_next = batch.obs_next[:, -1] + # get index if buffer_ids is None: buffer_ids = np.arange(self.buffer_num) - # assume each element in buffer_ids is unique - assert np.bincount(buffer_ids).max() == 1 - batch = Batch(obs=obs, act=act, rew=rew, done=done, - obs_next=obs_next, info=info, policy=policy) - assert len(buffer_ids) == len(batch) - episode_lengths = [] # (len(buffer_ids),) - episode_rewards = [] # (len(buffer_ids), ...) + ptrs, ep_lens, ep_rews, ep_idxs = [], [], [], [] for batch_idx, buffer_id in enumerate(buffer_ids): - length, reward = self.buffers[buffer_id].add(**batch[batch_idx]) - episode_lengths.append(length) - episode_rewards.append(reward) - return np.stack(episode_lengths), np.stack(episode_rewards) + ptr, ep_rew, ep_len, ep_idx = self.buffers[buffer_id]._add_index( + batch.rew[batch_idx], batch.done[batch_idx] + ) + ptrs.append(ptr + self._offset[buffer_id]) + ep_lens.append(ep_len) + ep_rews.append(ep_rew) + ep_idxs.append(ep_idx + self._offset[buffer_id]) + ptrs = np.array(ptrs) + try: + self._meta[ptrs] = batch + except ValueError: + batch.rew = batch.rew.astype(np.float) + batch.done = batch.done.astype(np.bool_) + if self._meta.is_empty(): + self._meta = _create_value( # type: ignore + batch, self.maxsize, stack=False) + else: # dynamic key pops up in batch + _alloc_by_keys_diff(self._meta, batch, self.maxsize, False) + self._set_batch_for_children() + self._meta[ptrs] = batch + return ptrs, np.array(ep_rews), np.array(ep_lens), np.array(ep_idxs) def sample_index(self, batch_size: int) -> np.ndarray: if batch_size < 0: @@ -573,7 +555,8 @@ def sample_index(self, batch_size: int) -> np.ndarray: if self._sample_avail and self.stack_num > 1: all_indices = np.concatenate([ buf.sample_index(0) + offset - for offset, buf in zip(self._offset, self.buffers)]) + for offset, buf in zip(self._offset, self.buffers) + ]) if batch_size == 0: return all_indices else: @@ -582,8 +565,9 @@ def sample_index(self, batch_size: int) -> np.ndarray: sample_num = np.zeros(self.buffer_num, np.int) else: buffer_lens = np.array([len(buf) for buf in self.buffers]) - buffer_idx = np.random.choice(self.buffer_num, batch_size, - p=buffer_lens / buffer_lens.sum()) + buffer_idx = np.random.choice( + self.buffer_num, batch_size, p=buffer_lens / buffer_lens.sum() + ) sample_num = np.bincount(buffer_idx, minlength=self.buffer_num) # avoid batch_size > 0 and sample_num == 0 -> get child's all data sample_num[sample_num == 0] = -1 @@ -594,29 +578,106 @@ def sample_index(self, batch_size: int) -> np.ndarray: ]) +class PrioritizedReplayBufferManager(PrioritizedReplayBuffer, ReplayBufferManager): + """PrioritizedReplayBufferManager contains a list of PrioritizedReplayBuffer with \ + exactly the same configuration. + + These replay buffers have contiguous memory layout, and the storage space each + buffer has is a shallow copy of the topmost memory. + + :param buffer_list: a list of PrioritizedReplayBuffer needed to be handled. + + .. seealso:: + + Please refer to :class:`~tianshou.data.ReplayBuffer`, + :class:`~tianshou.data.ReplayBufferManager`, and + :class:`~tianshou.data.PrioritizedReplayBuffer` for more detailed explanation. + """ + + def __init__(self, buffer_list: Sequence[PrioritizedReplayBuffer]) -> None: + ReplayBufferManager.__init__(self, buffer_list) # type: ignore + kwargs = buffer_list[0].options + for buf in buffer_list: + del buf.weight + PrioritizedReplayBuffer.__init__(self, self.maxsize, **kwargs) + + +class VectorReplayBuffer(ReplayBufferManager): + """VectorReplayBuffer contains n ReplayBuffer with the same size. + + It is used for storing data frame from different environments yet keeping the order + of time. + + :param int total_size: the total size of VectorReplayBuffer. + :param int buffer_num: the number of ReplayBuffer it uses, which are under the same + configuration. + + Other input arguments (stack_num/ignore_obs_next/save_only_last_obs/sample_avail) + are the same as :class:`~tianshou.data.ReplayBuffer`. + + .. seealso:: + + Please refer to :class:`~tianshou.data.ReplayBuffer` and + :class:`~tianshou.data.ReplayBufferManager` for more detailed explanation. + """ + + def __init__(self, total_size: int, buffer_num: int, **kwargs: Any) -> None: + assert buffer_num > 0 + size = int(np.ceil(total_size / buffer_num)) + buffer_list = [ReplayBuffer(size, **kwargs) for _ in range(buffer_num)] + super().__init__(buffer_list) + + +class PrioritizedVectorReplayBuffer(PrioritizedReplayBufferManager): + """PrioritizedVectorReplayBuffer contains n PrioritizedReplayBuffer with same size. + + It is used for storing data frame from different environments yet keeping the order + of time. + + :param int total_size: the total size of PrioritizedVectorReplayBuffer. + :param int buffer_num: the number of PrioritizedReplayBuffer it uses, which are + under the same configuration. + + Other input arguments (alpha/beta/stack_num/ignore_obs_next/save_only_last_obs/ + sample_avail) are the same as :class:`~tianshou.data.PrioritizedReplayBuffer`. + + .. seealso:: + + Please refer to :class:`~tianshou.data.ReplayBuffer` and + :class:`~tianshou.data.PrioritizedReplayBufferManager` for more detailed + explanation. + """ + + def __init__(self, total_size: int, buffer_num: int, **kwargs: Any) -> None: + assert buffer_num > 0 + size = int(np.ceil(total_size / buffer_num)) + buffer_list = [ + PrioritizedReplayBuffer(size, **kwargs) for _ in range(buffer_num) + ] + super().__init__(buffer_list) + + class CachedReplayBuffer(ReplayBufferManager): """CachedReplayBuffer contains a given main buffer and n cached buffers, \ cached_buffer_num * ReplayBuffer(size=max_episode_length). - The memory layout is: ``| main_buffer | cached_buffers[0] | - cached_buffers[1] | ... | cached_buffers[cached_buffer_num - 1]``. + The memory layout is: ``| main_buffer | cached_buffers[0] | cached_buffers[1] | ... + | cached_buffers[cached_buffer_num - 1]``. - The data is first stored in cached buffers. When the episode is - terminated, the data will move to the main buffer and the corresponding - cached buffer will be reset. + The data is first stored in cached buffers. When an episode is terminated, the data + will move to the main buffer and the corresponding cached buffer will be reset. - :param ReplayBuffer main_buffer: the main buffer whose ``.update()`` - function behaves normally. - :param int cached_buffer_num: number of ReplayBuffer needs to be created - for cached buffer. - :param int max_episode_length: the maximum length of one episode, used in - each cached buffer's maxsize. + :param ReplayBuffer main_buffer: the main buffer whose ``.update()`` function + behaves normally. + :param int cached_buffer_num: number of ReplayBuffer needs to be created for cached + buffer. + :param int max_episode_length: the maximum length of one episode, used in each + cached buffer's maxsize. .. seealso:: Please refer to :class:`~tianshou.data.ReplayBuffer` or - :class:`~tianshou.data.ReplayBufferManager` for more detailed - explanation. + :class:`~tianshou.data.ReplayBufferManager` for more detailed explanation. """ def __init__( @@ -626,73 +687,45 @@ def __init__( max_episode_length: int, ) -> None: assert cached_buffer_num > 0 and max_episode_length > 0 - self._is_prioritized = isinstance(main_buffer, PrioritizedReplayBuffer) + assert type(main_buffer) == ReplayBuffer kwargs = main_buffer.options - buffers = [main_buffer] + [ReplayBuffer(max_episode_length, **kwargs) - for _ in range(cached_buffer_num)] - super().__init__(buffer_list=buffers, **kwargs) + buffers = [main_buffer] + [ + ReplayBuffer(max_episode_length, **kwargs) + for _ in range(cached_buffer_num) + ] + super().__init__(buffer_list=buffers) self.main_buffer = self.buffers[0] self.cached_buffers = self.buffers[1:] self.cached_buffer_num = cached_buffer_num - def add( # type: ignore - self, - obs: Any, - act: Any, - rew: np.ndarray, - done: np.ndarray, - obs_next: Any = Batch(), - info: Optional[Batch] = Batch(), - policy: Optional[Batch] = Batch(), - cached_buffer_ids: Optional[Union[np.ndarray, List[int]]] = None, - **kwargs: Any, - ) -> Tuple[np.ndarray, np.ndarray]: + def add( + self, batch: Batch, buffer_ids: Optional[Union[np.ndarray, List[int]]] = None + ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """Add a batch of data into CachedReplayBuffer. Each of the data's length (first dimension) must equal to the length of - cached_buffer_ids. By default the cached_buffer_ids is [0, 1, ..., - cached_buffer_num - 1]. + buffer_ids. By default the buffer_ids is [0, 1, ..., cached_buffer_num - 1]. - Return the array of episode_length and episode_reward with shape - (len(cached_buffer_ids), ...), where (episode_length[i], - episode_reward[i]) refers to the cached_buffer_ids[i]th cached buffer's - corresponding episode result. + Return (current_index, episode_reward, episode_length, episode_start_index) + with each of the shape (len(buffer_ids), ...), where (current_index[i], + episode_reward[i], episode_length[i], episode_start_index[i]) refers to the + cached_buffer_ids[i]th cached buffer's corresponding episode result. """ - if cached_buffer_ids is None: - cached_buffer_ids = np.arange(self.cached_buffer_num) + if buffer_ids is None: + buffer_ids = np.arange(1, 1 + self.cached_buffer_num) else: # make sure it is np.ndarray - cached_buffer_ids = np.asarray(cached_buffer_ids) - # in self.buffers, the first buffer is main_buffer - buffer_ids = cached_buffer_ids + 1 # type: ignore - result = super().add(obs, act, rew, done, obs_next, info, - policy, buffer_ids=buffer_ids, **kwargs) + buffer_ids = np.asarray(buffer_ids) + 1 + ptr, ep_rew, ep_len, ep_idx = super().add(batch, buffer_ids=buffer_ids) # find the terminated episode, move data from cached buf to main buf - for buffer_idx in cached_buffer_ids[np.asarray(done, np.bool_)]: - self.main_buffer.update(self.cached_buffers[buffer_idx]) - self.cached_buffers[buffer_idx].reset() - return result - - def __getitem__( - self, index: Union[slice, int, np.integer, np.ndarray] - ) -> Batch: - batch = super().__getitem__(index) - if self._is_prioritized: - indice = self._indices[index] - mask = indice < self.main_buffer.maxsize - batch.weight = np.ones(len(indice)) - batch.weight[mask] = self.main_buffer.get_weight(indice[mask]) - return batch - - def update_weight( - self, - index: np.ndarray, - new_weight: Union[np.ndarray, torch.Tensor], - ) -> None: - """Update priority weight by index in main buffer. - - :param np.ndarray index: index you want to update weight. - :param np.ndarray new_weight: new priority weight you want to update. - """ - if self._is_prioritized: - mask = index < self.main_buffer.maxsize - self.main_buffer.update_weight(index[mask], new_weight[mask]) + updated_ptr, updated_ep_idx = [], [] + done = batch.done.astype(np.bool_) + for buffer_idx in buffer_ids[done]: + index = self.main_buffer.update(self.buffers[buffer_idx]) + if len(index) == 0: # unsuccessful move, replace with -1 + index = [-1] + updated_ep_idx.append(index[0]) + updated_ptr.append(index[-1]) + self.buffers[buffer_idx].reset() + ptr[done] = updated_ptr + ep_idx[done] = updated_ep_idx + return ptr, ep_rew, ep_len, ep_idx diff --git a/tianshou/data/collector.py b/tianshou/data/collector.py index 25f268dc0..74ee72d11 100644 --- a/tianshou/data/collector.py +++ b/tianshou/data/collector.py @@ -3,77 +3,72 @@ import torch import warnings import numpy as np -from copy import deepcopy -from numbers import Number -from typing import Dict, List, Union, Optional, Callable +from typing import Any, Dict, List, Union, Optional, Callable from tianshou.policy import BasePolicy -from tianshou.exploration import BaseNoise -from tianshou.data.batch import _create_value +from tianshou.data.buffer import _alloc_by_keys_diff from tianshou.env import BaseVectorEnv, DummyVectorEnv -from tianshou.data import Batch, ReplayBuffer, ListReplayBuffer, to_numpy +from tianshou.data import ( + Batch, + ReplayBuffer, + ReplayBufferManager, + VectorReplayBuffer, + CachedReplayBuffer, + to_numpy, +) class Collector(object): - """Collector enables the policy to interact with different types of envs. + """Collector enables the policy to interact with different types of envs with \ + exact number of steps or episodes. - :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` - class. + :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` class. :param env: a ``gym.Env`` environment or an instance of the :class:`~tianshou.env.BaseVectorEnv` class. - :param buffer: an instance of the :class:`~tianshou.data.ReplayBuffer` - class. If set to ``None`` (testing phase), it will not store the data. - :param function preprocess_fn: a function called before the data has been - added to the buffer, see issue #42 and :ref:`preprocess_fn`, defaults - to None. - :param BaseNoise action_noise: add a noise to continuous action. Normally - a policy already has a noise param for exploration in training phase, - so this is recommended to use in test collector for some purpose. - :param function reward_metric: to be used in multi-agent RL. The reward to - report is of shape [agent_num], but we need to return a single scalar - to monitor training. This function specifies what is the desired - metric, e.g., the reward of agent 1 or the average reward over all - agents. By default, the behavior is to select the reward of agent 1. - - The ``preprocess_fn`` is a function called before the data has been added - to the buffer with batch format, which receives up to 7 keys as listed in - :class:`~tianshou.data.Batch`. It will receive with only ``obs`` when the - collector resets the environment. It returns either a dict or a - :class:`~tianshou.data.Batch` with the modified keys and values. Examples - are in "test/base/test_collector.py". - - Here is the example: + :param buffer: an instance of the :class:`~tianshou.data.ReplayBuffer` class. + If set to None, it will not store the data. Default to None. + :param function preprocess_fn: a function called before the data has been added to + the buffer, see issue #42 and :ref:`preprocess_fn`. Default to None. + :param bool exploration_noise: determine whether the action needs to be modified + with corresponding policy's exploration noise. If so, "policy. + exploration_noise(act, batch)" will be called automatically to add the + exploration noise into action. Default to False. + + The "preprocess_fn" is a function called before the data has been added to the + buffer with batch format. It will receive with only "obs" when the collector resets + the environment, and will receive four keys "obs_next", "rew", "done", "info" in a + normal env step. It returns either a dict or a :class:`~tianshou.data.Batch` with + the modified keys and values. Examples are in "test/base/test_collector.py". + + Here are some example usages: :: policy = PGPolicy(...) # or other policies if you wish - env = gym.make('CartPole-v0') + env = gym.make("CartPole-v0") + replay_buffer = ReplayBuffer(size=10000) + # here we set up a collector with a single environment collector = Collector(policy, env, buffer=replay_buffer) # the collector supports vectorized environments as well - envs = DummyVectorEnv([lambda: gym.make('CartPole-v0') - for _ in range(3)]) - collector = Collector(policy, envs, buffer=replay_buffer) + vec_buffer = VectorReplayBuffer(total_size=10000, buffer_num=3) + # buffer_num should be equal to (suggested) or larger than #envs + envs = DummyVectorEnv([lambda: gym.make("CartPole-v0") for _ in range(3)]) + collector = Collector(policy, envs, buffer=vec_buffer) # collect 3 episodes collector.collect(n_episode=3) - # collect 1 episode for the first env, 3 for the third env - collector.collect(n_episode=[1, 0, 3]) # collect at least 2 steps collector.collect(n_step=2) - # collect episodes with visual rendering (the render argument is the - # sleep time between rendering consecutive frames) + # collect episodes with visual rendering ("render" is the sleep time between + # rendering consecutive frames) collector.collect(n_episode=1, render=0.03) - Collected data always consist of full episodes. So if only ``n_step`` - argument is give, the collector may return the data more than the - ``n_step`` limitation. Same as ``n_episode`` for the multiple environment - case. - .. note:: - Please make sure the given environment has a time limitation. + Please make sure the given environment has a time limitation if using n_episode + collect option. """ def __init__( @@ -82,322 +77,449 @@ def __init__( env: Union[gym.Env, BaseVectorEnv], buffer: Optional[ReplayBuffer] = None, preprocess_fn: Optional[Callable[..., Batch]] = None, - action_noise: Optional[BaseNoise] = None, - reward_metric: Optional[Callable[[np.ndarray], float]] = None, + exploration_noise: bool = False, ) -> None: super().__init__() if not isinstance(env, BaseVectorEnv): env = DummyVectorEnv([lambda: env]) self.env = env self.env_num = len(env) - # environments that are available in step() - # this means all environments in synchronous simulation - # but only a subset of environments in asynchronous simulation - self._ready_env_ids = np.arange(self.env_num) - # self.async is a flag to indicate whether this collector works - # with asynchronous simulation - self.is_async = env.is_async - # need cache buffers before storing in the main buffer - self._cached_buf = [ListReplayBuffer() for _ in range(self.env_num)] - self.buffer = buffer + self.exploration_noise = exploration_noise + self._assign_buffer(buffer) self.policy = policy self.preprocess_fn = preprocess_fn - self.process_fn = policy.process_fn self._action_space = env.action_space - self._action_noise = action_noise - self._rew_metric = reward_metric or Collector._default_rew_metric # avoid creating attribute outside __init__ self.reset() - @staticmethod - def _default_rew_metric( - x: Union[Number, np.number] - ) -> Union[Number, np.number]: - # this internal function is designed for single-agent RL - # for multi-agent RL, a reward_metric must be provided - assert np.asanyarray(x).size == 1, ( - "Please specify the reward_metric " - "since the reward is not a scalar." - ) - return x + def _assign_buffer(self, buffer: Optional[ReplayBuffer]) -> None: + """Check if the buffer matches the constraint.""" + if buffer is None: + buffer = VectorReplayBuffer(self.env_num, self.env_num) + elif isinstance(buffer, ReplayBufferManager): + assert buffer.buffer_num >= self.env_num + if isinstance(buffer, CachedReplayBuffer): + assert buffer.cached_buffer_num >= self.env_num + else: # ReplayBuffer or PrioritizedReplayBuffer + assert buffer.maxsize > 0 + if self.env_num > 1: + if type(buffer) == ReplayBuffer: + buffer_type = "ReplayBuffer" + vector_type = "VectorReplayBuffer" + else: + buffer_type = "PrioritizedReplayBuffer" + vector_type = "PrioritizedVectorReplayBuffer" + raise TypeError( + f"Cannot use {buffer_type}(size={buffer.maxsize}, ...) to collect " + f"{self.env_num} envs,\n\tplease use {vector_type}(total_size=" + f"{buffer.maxsize}, buffer_num={self.env_num}, ...) instead." + ) + self.buffer = buffer def reset(self) -> None: """Reset all related variables in the collector.""" - # use empty Batch for ``state`` so that ``self.data`` supports slicing + # use empty Batch for "state" so that self.data supports slicing # convert empty Batch to None when passing data to policy - self.data = Batch(state={}, obs={}, act={}, rew={}, done={}, info={}, - obs_next={}, policy={}) + self.data = Batch(obs={}, act={}, rew={}, done={}, + obs_next={}, info={}, policy={}) self.reset_env() self.reset_buffer() self.reset_stat() - if self._action_noise is not None: - self._action_noise.reset() def reset_stat(self) -> None: """Reset the statistic variables.""" - self.collect_time, self.collect_step, self.collect_episode = 0.0, 0, 0 + self.collect_step, self.collect_episode, self.collect_time = 0, 0, 0.0 def reset_buffer(self) -> None: - """Reset the main data buffer.""" - if self.buffer is not None: - self.buffer.reset() - - def get_env_num(self) -> int: - """Return the number of environments the collector have.""" - return self.env_num + """Reset the data buffer.""" + self.buffer.reset() def reset_env(self) -> None: - """Reset all of the environment(s)' states and the cache buffers.""" - self._ready_env_ids = np.arange(self.env_num) + """Reset all of the environments.""" obs = self.env.reset() if self.preprocess_fn: obs = self.preprocess_fn(obs=obs).get("obs", obs) self.data.obs = obs - for b in self._cached_buf: - b.reset() def _reset_state(self, id: Union[int, List[int]]) -> None: """Reset the hidden state: self.data.state[id].""" - state = self.data.state # it is a reference - if isinstance(state, torch.Tensor): - state[id].zero_() - elif isinstance(state, np.ndarray): - state[id] = None if state.dtype == np.object else 0 - elif isinstance(state, Batch): - state.empty_(id) + if hasattr(self.data.policy, "hidden_state"): + state = self.data.policy.hidden_state # it is a reference + if isinstance(state, torch.Tensor): + state[id].zero_() + elif isinstance(state, np.ndarray): + state[id] = None if state.dtype == np.object else 0 + elif isinstance(state, Batch): + state.empty_(id) def collect( self, n_step: Optional[int] = None, - n_episode: Optional[Union[int, List[int]]] = None, + n_episode: Optional[int] = None, random: bool = False, render: Optional[float] = None, no_grad: bool = True, - ) -> Dict[str, float]: + ) -> Dict[str, Any]: """Collect a specified number of step or episode. + To ensure unbiased sampling result with n_episode option, this function will + first collect ``n_episode - env_num`` episodes, then for the last ``env_num`` + episodes, they will be collected evenly from each env. + :param int n_step: how many steps you want to collect. - :param n_episode: how many episodes you want to collect. If it is an - int, it means to collect at lease ``n_episode`` episodes; if it is - a list, it means to collect exactly ``n_episode[i]`` episodes in - the i-th environment - :param bool random: whether to use random policy for collecting data, - defaults to False. - :param float render: the sleep time between rendering consecutive - frames, defaults to None (no rendering). - :param bool no_grad: whether to retain gradient in policy.forward, - defaults to True (no gradient retaining). + :param int n_episode: how many episodes you want to collect. + :param bool random: whether to use random policy for collecting data. Default + to False. + :param float render: the sleep time between rendering consecutive frames. + Default to None (no rendering). + :param bool no_grad: whether to retain gradient in policy.forward(). Default to + True (no gradient retaining). .. note:: - One and only one collection number specification is permitted, - either ``n_step`` or ``n_episode``. + One and only one collection number specification is permitted, either + ``n_step`` or ``n_episode``. :return: A dict including the following keys - * ``n/ep`` the collected number of episodes. - * ``n/st`` the collected number of steps. - * ``v/st`` the speed of steps per second. - * ``v/ep`` the speed of episode per second. - * ``rew`` the mean reward over collected episodes. - * ``len`` the mean length over collected episodes. + * ``n/ep`` collected number of episodes. + * ``n/st`` collected number of steps. + * ``rews`` list of episode reward over collected episodes. + * ``lens`` list of episode length over collected episodes. + * ``idxs`` list of episode start index in buffer over collected episodes. """ - assert (n_step is not None and n_episode is None and n_step > 0) or ( - n_step is None and n_episode is not None and np.sum(n_episode) > 0 - ), "Only one of n_step or n_episode is allowed in Collector.collect, " - f"got n_step = {n_step}, n_episode = {n_episode}." + assert not self.env.is_async, "Please use AsyncCollector if using async venv." + if n_step is not None: + assert n_episode is None, ( + f"Only one of n_step or n_episode is allowed in Collector." + f"collect, got n_step={n_step}, n_episode={n_episode}." + ) + assert n_step > 0 + if not n_step % self.env_num == 0: + warnings.warn( + f"n_step={n_step} is not a multiple of #env ({self.env_num}), " + "which may cause extra frame collected into the buffer." + ) + ready_env_ids = np.arange(self.env_num) + elif n_episode is not None: + assert n_episode > 0 + ready_env_ids = np.arange(min(self.env_num, n_episode)) + self.data = self.data[:min(self.env_num, n_episode)] + else: + raise TypeError("Please specify at least one (either n_step or n_episode) " + "in AsyncCollector.collect().") + start_time = time.time() + step_count = 0 - # episode of each environment - episode_count = np.zeros(self.env_num) - # If n_episode is a list, and some envs have collected the required - # number of episodes, these envs will be recorded in this list, and - # they will not be stepped. - finished_env_ids = [] - rewards = [] - whole_data = Batch() - if isinstance(n_episode, list): - assert len(n_episode) == self.get_env_num() - finished_env_ids = [ - i for i in self._ready_env_ids if n_episode[i] <= 0] - self._ready_env_ids = np.array( - [x for x in self._ready_env_ids if x not in finished_env_ids]) + episode_count = 0 + episode_rews = [] + episode_lens = [] + episode_start_indices = [] + while True: - if step_count >= 100000 and episode_count.sum() == 0: - warnings.warn( - "There are already many steps in an episode. " - "You should add a time limitation to your environment!", - Warning) - - is_async = self.is_async or len(finished_env_ids) > 0 - if is_async: - # self.data are the data for all environments in async - # simulation or some envs have finished, - # **only a subset of data are disposed**, - # so we store the whole data in ``whole_data``, let self.data - # to be the data available in ready environments, and finally - # set these back into all the data - whole_data = self.data - self.data = self.data[self._ready_env_ids] - - # restore the state and the input data - last_state = self.data.state - if isinstance(last_state, Batch) and last_state.is_empty(): - last_state = None - self.data.update(state=Batch(), obs_next=Batch(), policy=Batch()) - - # calculate the next action + assert len(self.data) == len(ready_env_ids) + # restore the state: if the last state is None, it won't store + last_state = self.data.policy.pop("hidden_state", None) + + # get the next action if random: - spaces = self._action_space - result = Batch( - act=[spaces[i].sample() for i in self._ready_env_ids]) + self.data.update( + act=[self._action_space[i].sample() for i in ready_env_ids]) else: if no_grad: with torch.no_grad(): # faster than retain_grad version + # self.data.obs will be used by agent to get result result = self.policy(self.data, last_state) else: result = self.policy(self.data, last_state) - - state = result.get("state", Batch()) - # convert None to Batch(), since None is reserved for 0-init - if state is None: - state = Batch() - self.data.update(state=state, policy=result.get("policy", Batch())) - # save hidden state to policy._state, in order to save into buffer - if not (isinstance(state, Batch) and state.is_empty()): - self.data.policy._state = self.data.state - - self.data.act = to_numpy(result.act) - if self._action_noise is not None: - assert isinstance(self.data.act, np.ndarray) - self.data.act += self._action_noise(self.data.act.shape) + # update state / act / policy into self.data + policy = result.get("policy", Batch()) + assert isinstance(policy, Batch) + state = result.get("state", None) + if state is not None: + policy.hidden_state = state # save state into buffer + act = to_numpy(result.act) + if self.exploration_noise: + act = self.policy.exploration_noise(act, self.data) + self.data.update(policy=policy, act=act) # step in env - if not is_async: - obs_next, rew, done, info = self.env.step(self.data.act) - else: - # store computed actions, states, etc - _batch_set_item( - whole_data, self._ready_env_ids, self.data, self.env_num) - # fetch finished data - obs_next, rew, done, info = self.env.step( - self.data.act, id=self._ready_env_ids) - self._ready_env_ids = np.array([i["env_id"] for i in info]) - # get the stepped data - self.data = whole_data[self._ready_env_ids] - # move data to self.data + obs_next, rew, done, info = self.env.step(self.data.act, id=ready_env_ids) + self.data.update(obs_next=obs_next, rew=rew, done=done, info=info) + if self.preprocess_fn: + self.data.update(self.preprocess_fn( + obs_next=self.data.obs_next, + rew=self.data.rew, + done=self.data.done, + info=self.data.info, + )) if render: self.env.render() - time.sleep(render) + if render > 0 and not np.isclose(render, 0): + time.sleep(render) # add data into the buffer - if self.preprocess_fn: - result = self.preprocess_fn(**self.data) # type: ignore - self.data.update(result) - - for j, i in enumerate(self._ready_env_ids): - # j is the index in current ready_env_ids - # i is the index in all environments - if self.buffer is None: - # users do not want to store data, so we store - # small fake data here to make the code clean - self._cached_buf[i].add(obs=0, act=0, rew=rew[j], done=0) - else: - self._cached_buf[i].add(**self.data[j]) - - if done[j]: - if not (isinstance(n_episode, list) - and episode_count[i] >= n_episode[i]): - episode_count[i] += 1 - rewards.append(self._rew_metric( - np.sum(self._cached_buf[i].rew, axis=0))) - step_count += len(self._cached_buf[i]) - if self.buffer is not None: - self.buffer.update(self._cached_buf[i]) - if isinstance(n_episode, list) and \ - episode_count[i] >= n_episode[i]: - # env i has collected enough data, it has finished - finished_env_ids.append(i) - self._cached_buf[i].reset() - self._reset_state(j) - obs_next = self.data.obs_next - if sum(done): + ptr, ep_rew, ep_len, ep_idx = self.buffer.add( + self.data, buffer_ids=ready_env_ids) + + # collect statistics + step_count += len(ready_env_ids) + + if np.any(done): env_ind_local = np.where(done)[0] - env_ind_global = self._ready_env_ids[env_ind_local] + env_ind_global = ready_env_ids[env_ind_local] + episode_count += len(env_ind_local) + episode_lens.append(ep_len[env_ind_local]) + episode_rews.append(ep_rew[env_ind_local]) + episode_start_indices.append(ep_idx[env_ind_local]) + # now we copy obs_next to obs, but since there might be + # finished episodes, we have to reset finished envs first. obs_reset = self.env.reset(env_ind_global) if self.preprocess_fn: - obs_reset = self.preprocess_fn( - obs=obs_reset).get("obs", obs_reset) - obs_next[env_ind_local] = obs_reset - self.data.obs = obs_next - if is_async: - # set data back - whole_data = deepcopy(whole_data) # avoid reference in ListBuf - _batch_set_item( - whole_data, self._ready_env_ids, self.data, self.env_num) - # let self.data be the data in all environments again - self.data = whole_data - self._ready_env_ids = np.array( - [x for x in self._ready_env_ids if x not in finished_env_ids]) - if n_step: - if step_count >= n_step: - break - else: - if isinstance(n_episode, int) and \ - episode_count.sum() >= n_episode: - break - if isinstance(n_episode, list) and \ - (episode_count >= n_episode).all(): - break - - # finished envs are ready, and can be used for the next collection - self._ready_env_ids = np.array( - self._ready_env_ids.tolist() + finished_env_ids) - - # generate the statistics - episode_count = sum(episode_count) - duration = max(time.time() - start_time, 1e-9) + obs_reset = self.preprocess_fn(obs=obs_reset).get("obs", obs_reset) + self.data.obs_next[env_ind_local] = obs_reset + for i in env_ind_local: + self._reset_state(i) + + # remove surplus env id from ready_env_ids + # to avoid bias in selecting environments + if n_episode: + surplus_env_num = len(ready_env_ids) - (n_episode - episode_count) + if surplus_env_num > 0: + mask = np.ones_like(ready_env_ids, np.bool) + mask[env_ind_local[:surplus_env_num]] = False + ready_env_ids = ready_env_ids[mask] + self.data = self.data[mask] + + self.data.obs = self.data.obs_next + + if (n_step and step_count >= n_step) or \ + (n_episode and episode_count >= n_episode): + break + + # generate statistics self.collect_step += step_count self.collect_episode += episode_count - self.collect_time += duration + self.collect_time += max(time.time() - start_time, 1e-9) + + if n_episode: + self.data = Batch(obs={}, act={}, rew={}, done={}, + obs_next={}, info={}, policy={}) + self.reset_env() + + if episode_count > 0: + rews, lens, idxs = list(map( + np.concatenate, [episode_rews, episode_lens, episode_start_indices])) + else: + rews, lens, idxs = np.array([]), np.array([], np.int), np.array([], np.int) + return { "n/ep": episode_count, "n/st": step_count, - "v/st": step_count / duration, - "v/ep": episode_count / duration, - "rew": np.mean(rewards), - "rew_std": np.std(rewards), - "len": step_count / episode_count, + "rews": rews, + "lens": lens, + "idxs": idxs, } -def _batch_set_item( - source: Batch, indices: np.ndarray, target: Batch, size: int -) -> None: - # for any key chain k, there are four cases - # 1. source[k] is non-reserved, but target[k] does not exist or is reserved - # 2. source[k] does not exist or is reserved, but target[k] is non-reserved - # 3. both source[k] and target[k] are non-reserved - # 4. both source[k] and target[k] do not exist or are reserved, do nothing. - # A special case in case 4, if target[k] is reserved but source[k] does - # not exist, make source[k] reserved, too. - for k, vt in target.items(): - if not isinstance(vt, Batch) or not vt.is_empty(): - # target[k] is non-reserved - vs = source.get(k, Batch()) - if isinstance(vs, Batch): - if vs.is_empty(): - # case 2, use __dict__ to avoid many type checks - source.__dict__[k] = _create_value(vt[0], size) +class AsyncCollector(Collector): + """Async Collector handles async vector environment. + + The arguments are exactly the same as :class:`~tianshou.data.Collector`, please + refer to :class:`~tianshou.data.Collector` for more detailed explanation. + """ + + def __init__( + self, + policy: BasePolicy, + env: BaseVectorEnv, + buffer: Optional[ReplayBuffer] = None, + preprocess_fn: Optional[Callable[..., Batch]] = None, + exploration_noise: bool = False, + ) -> None: + assert env.is_async + super().__init__(policy, env, buffer, preprocess_fn, exploration_noise) + + def reset_env(self) -> None: + super().reset_env() + self._ready_env_ids = np.arange(self.env_num) + + def collect( + self, + n_step: Optional[int] = None, + n_episode: Optional[int] = None, + random: bool = False, + render: Optional[float] = None, + no_grad: bool = True, + ) -> Dict[str, Any]: + """Collect a specified number of step or episode with async env setting. + + This function doesn't collect exactly n_step or n_episode number of frames. + Instead, in order to support async setting, it may collect more than given + n_step or n_episode frames and save into buffer. + + :param int n_step: how many steps you want to collect. + :param int n_episode: how many episodes you want to collect. + :param bool random: whether to use random policy for collecting data. Default + to False. + :param float render: the sleep time between rendering consecutive frames. + Default to None (no rendering). + :param bool no_grad: whether to retain gradient in policy.forward(). Default to + True (no gradient retaining). + + .. note:: + + One and only one collection number specification is permitted, either + ``n_step`` or ``n_episode``. + + :return: A dict including the following keys + + * ``n/ep`` collected number of episodes. + * ``n/st`` collected number of steps. + * ``rews`` list of episode reward over collected episodes. + * ``lens`` list of episode length over collected episodes. + * ``idxs`` list of episode start index in buffer over collected episodes. + """ + # collect at least n_step or n_episode + if n_step is not None: + assert n_episode is None, ( + "Only one of n_step or n_episode is allowed in Collector." + f"collect, got n_step={n_step}, n_episode={n_episode}." + ) + assert n_step > 0 + elif n_episode is not None: + assert n_episode > 0 + else: + raise TypeError("Please specify at least one (either n_step or n_episode) " + "in AsyncCollector.collect().") + warnings.warn("Using async setting may collect extra frames into buffer.") + + ready_env_ids = self._ready_env_ids + + start_time = time.time() + + step_count = 0 + episode_count = 0 + episode_rews = [] + episode_lens = [] + episode_start_indices = [] + + while True: + whole_data = self.data + self.data = self.data[ready_env_ids] + assert len(whole_data) == self.env_num # major difference + # restore the state: if the last state is None, it won't store + last_state = self.data.policy.pop("hidden_state", None) + + # get the next action + if random: + self.data.update( + act=[self._action_space[i].sample() for i in ready_env_ids]) + else: + if no_grad: + with torch.no_grad(): # faster than retain_grad version + # self.data.obs will be used by agent to get result + result = self.policy(self.data, last_state) else: - assert isinstance(vt, Batch) - _batch_set_item(source.__dict__[k], indices, vt, size) + result = self.policy(self.data, last_state) + # update state / act / policy into self.data + policy = result.get("policy", Batch()) + assert isinstance(policy, Batch) + state = result.get("state", None) + if state is not None: + policy.hidden_state = state # save state into buffer + act = to_numpy(result.act) + if self.exploration_noise: + act = self.policy.exploration_noise(act, self.data) + self.data.update(policy=policy, act=act) + + # save act/policy before env.step + try: + whole_data.act[ready_env_ids] = self.data.act + whole_data.policy[ready_env_ids] = self.data.policy + except ValueError: + _alloc_by_keys_diff(whole_data, self.data, self.env_num, False) + whole_data[ready_env_ids] = self.data # lots of overhead + + # step in env + obs_next, rew, done, info = self.env.step(self.data.act, id=ready_env_ids) + + # change self.data here because ready_env_ids has changed + ready_env_ids = np.array([i["env_id"] for i in info]) + self.data = whole_data[ready_env_ids] + + self.data.update(obs_next=obs_next, rew=rew, done=done, info=info) + if self.preprocess_fn: + self.data.update(self.preprocess_fn( + obs_next=self.data.obs_next, + rew=self.data.rew, + done=self.data.done, + info=self.data.info, + )) + + if render: + self.env.render() + if render > 0 and not np.isclose(render, 0): + time.sleep(render) + + # add data into the buffer + ptr, ep_rew, ep_len, ep_idx = self.buffer.add( + self.data, buffer_ids=ready_env_ids) + + # collect statistics + step_count += len(ready_env_ids) + + if np.any(done): + env_ind_local = np.where(done)[0] + env_ind_global = ready_env_ids[env_ind_local] + episode_count += len(env_ind_local) + episode_lens.append(ep_len[env_ind_local]) + episode_rews.append(ep_rew[env_ind_local]) + episode_start_indices.append(ep_idx[env_ind_local]) + # now we copy obs_next to obs, but since there might be + # finished episodes, we have to reset finished envs first. + obs_reset = self.env.reset(env_ind_global) + if self.preprocess_fn: + obs_reset = self.preprocess_fn(obs=obs_reset).get("obs", obs_reset) + self.data.obs_next[env_ind_local] = obs_reset + for i in env_ind_local: + self._reset_state(i) + + try: + whole_data.obs[ready_env_ids] = self.data.obs_next + whole_data.rew[ready_env_ids] = self.data.rew + whole_data.done[ready_env_ids] = self.data.done + whole_data.info[ready_env_ids] = self.data.info + except ValueError: + _alloc_by_keys_diff(whole_data, self.data, self.env_num, False) + self.data.obs = self.data.obs_next + whole_data[ready_env_ids] = self.data # lots of overhead + self.data = whole_data + + if (n_step and step_count >= n_step) or \ + (n_episode and episode_count >= n_episode): + break + + self._ready_env_ids = ready_env_ids + + # generate statistics + self.collect_step += step_count + self.collect_episode += episode_count + self.collect_time += max(time.time() - start_time, 1e-9) + + if episode_count > 0: + rews, lens, idxs = list(map( + np.concatenate, [episode_rews, episode_lens, episode_start_indices])) else: - # target[k] is reserved - # case 1 or special case of case 4 - if k not in source.__dict__: - source.__dict__[k] = Batch() - continue - source.__dict__[k][indices] = vt + rews, lens, idxs = np.array([]), np.array([], np.int), np.array([], np.int) + + return { + "n/ep": episode_count, + "n/st": step_count, + "rews": rews, + "lens": lens, + "idxs": idxs, + } diff --git a/tianshou/policy/__init__.py b/tianshou/policy/__init__.py index 01b7019af..a3625ca3f 100644 --- a/tianshou/policy/__init__.py +++ b/tianshou/policy/__init__.py @@ -1,6 +1,5 @@ from tianshou.policy.base import BasePolicy from tianshou.policy.random import RandomPolicy -from tianshou.policy.imitation.base import ImitationPolicy from tianshou.policy.modelfree.dqn import DQNPolicy from tianshou.policy.modelfree.c51 import C51Policy from tianshou.policy.modelfree.qrdqn import QRDQNPolicy @@ -11,6 +10,7 @@ from tianshou.policy.modelfree.td3 import TD3Policy from tianshou.policy.modelfree.sac import SACPolicy from tianshou.policy.modelfree.discrete_sac import DiscreteSACPolicy +from tianshou.policy.imitation.base import ImitationPolicy from tianshou.policy.imitation.discrete_bcq import DiscreteBCQPolicy from tianshou.policy.modelbase.psrl import PSRLPolicy from tianshou.policy.multiagent.mapolicy import MultiAgentPolicyManager @@ -19,7 +19,6 @@ __all__ = [ "BasePolicy", "RandomPolicy", - "ImitationPolicy", "DQNPolicy", "C51Policy", "QRDQNPolicy", @@ -30,6 +29,7 @@ "TD3Policy", "SACPolicy", "DiscreteSACPolicy", + "ImitationPolicy", "DiscreteBCQPolicy", "PSRLPolicy", "MultiAgentPolicyManager", diff --git a/tianshou/policy/base.py b/tianshou/policy/base.py index 99e16544b..be6d8216b 100644 --- a/tianshou/policy/base.py +++ b/tianshou/policy/base.py @@ -67,6 +67,20 @@ def set_agent_id(self, agent_id: int) -> None: """Set self.agent_id = agent_id, for MARL.""" self.agent_id = agent_id + def exploration_noise( + self, act: Union[np.ndarray, Batch], batch: Batch + ) -> Union[np.ndarray, Batch]: + """Modify the action from policy.forward with exploration noise. + + :param act: a data batch or numpy.ndarray which is the action taken by + policy.forward. + :param batch: the input batch for policy.forward, kept for advanced usage. + + :return: action in the same form of input "act" but with added exploration + noise. + """ + return act + @abstractmethod def forward( self, @@ -76,8 +90,7 @@ def forward( ) -> Batch: """Compute action over the given batch data. - :return: A :class:`~tianshou.data.Batch` which MUST have the following\ - keys: + :return: A :class:`~tianshou.data.Batch` which MUST have the following keys: * ``act`` an numpy.ndarray or a torch.Tensor, the action over \ given batch data. @@ -106,8 +119,7 @@ def process_fn( ) -> Batch: """Pre-process the data from the provided replay buffer. - Used in :meth:`update`. Check out :ref:`process_fn` for more - information. + Used in :meth:`update`. Check out :ref:`process_fn` for more information. """ return batch @@ -173,21 +185,48 @@ def update( self.updating = False return result + @staticmethod + def value_mask(buffer: ReplayBuffer, indice: np.ndarray) -> np.ndarray: + """Value mask determines whether the obs_next of buffer[indice] is valid. + + For instance, usually "obs_next" after "done" flag is considered to be invalid, + and its q/advantage value can provide meaningless (even misleading) + information, and should be set to 0 by hand. But if "done" flag is generated + because timelimit of game length (info["TimeLimit.truncated"] is set to True in + gym's settings), "obs_next" will instead be valid. Value mask is typically used + for assisting in calculating the correct q/advantage value. + + :param ReplayBuffer buffer: the corresponding replay buffer. + :param numpy.ndarray indice: indices of replay buffer whose "obs_next" will be + judged. + + :return: A bool type numpy.ndarray in the same shape with indice. "True" means + "obs_next" of that buffer[indice] is valid. + """ + return ~buffer.done[indice].astype(np.bool) + @staticmethod def compute_episodic_return( batch: Batch, + buffer: ReplayBuffer, + indice: np.ndarray, v_s_: Optional[Union[np.ndarray, torch.Tensor]] = None, gamma: float = 0.99, gae_lambda: float = 0.95, rew_norm: bool = False, ) -> Batch: - """Compute returns over given full-length episodes. + """Compute returns over given batch. - Implementation of Generalized Advantage Estimator (arXiv:1506.02438). + Use Implementation of Generalized Advantage Estimator (arXiv:1506.02438) + to calculate q function/reward to go of given batch. - :param batch: a data batch which contains several full-episode data - chronologically. + :param batch: a data batch which contains several episodes of data + in sequential order. Mind that the end of each finished episode of batch + should be marked by done flag, unfinished (or collecting) episodes will be + recongized by buffer.unfinished_index(). :type batch: :class:`~tianshou.data.Batch` + :param numpy.ndarray indice: tell batch's location in buffer, batch is + equal to buffer[indice]. :param v_s_: the value function of all next states :math:`V(s')`. :type v_s_: numpy.ndarray :param float gamma: the discount factor, should be in [0, 1], defaults @@ -201,8 +240,15 @@ def compute_episodic_return( array with shape (bsz, ). """ rew = batch.rew - v_s_ = np.zeros_like(rew) if v_s_ is None else to_numpy(v_s_.flatten()) - returns = _episodic_return(v_s_, rew, batch.done, gamma, gae_lambda) + if v_s_ is None: + assert np.isclose(gae_lambda, 1.0) + v_s_ = np.zeros_like(rew) + else: + v_s_ = to_numpy(v_s_.flatten()) * BasePolicy.value_mask(buffer, indice) + + end_flag = batch.done.copy() + end_flag[np.isin(indice, buffer.unfinished_index())] = True + returns = _episodic_return(v_s_, rew, end_flag, gamma, gae_lambda) if rew_norm and not np.isclose(returns.std(), 0.0, 1e-2): returns = (returns - returns.mean()) / returns.std() batch.returns = returns @@ -224,19 +270,15 @@ def compute_nstep_return( G_t = \sum_{i = t}^{t + n - 1} \gamma^{i - t}(1 - d_i)r_i + \gamma^n (1 - d_{t + n}) Q_{\mathrm{target}}(s_{t + n}) - where :math:`\gamma` is the discount factor, - :math:`\gamma \in [0, 1]`, :math:`d_t` is the done flag of step - :math:`t`. + where :math:`\gamma` is the discount factor, :math:`\gamma \in [0, 1]`, + :math:`d_t` is the done flag of step :math:`t`. :param batch: a data batch, which is equal to buffer[indice]. :type batch: :class:`~tianshou.data.Batch` - :param buffer: a data buffer which contains several full-episode data - chronologically. + :param buffer: the data buffer. :type buffer: :class:`~tianshou.data.ReplayBuffer` - :param indice: sampled timestep. - :type indice: numpy.ndarray - :param function target_q_fn: a function receives :math:`t+n-1` step's - data and compute target Q value. + :param function target_q_fn: a function which compute target Q value + of "obs_next" given data buffer and wanted indices. :param float gamma: the discount factor, should be in [0, 1], defaults to 0.99. :param int n_step: the number of estimation step, should be an int @@ -248,21 +290,30 @@ def compute_nstep_return( torch.Tensor with the same shape as target_q_fn's return tensor. """ rew = buffer.rew - if rew_norm: + bsz = len(indice) + if rew_norm: # TODO: remove it or fix this bug bfr = rew[:min(len(buffer), 1000)] # avoid large buffer mean, std = bfr.mean(), bfr.std() if np.isclose(std, 0, 1e-2): mean, std = 0.0, 1.0 else: mean, std = 0.0, 1.0 - buf_len = len(buffer) - terminal = (indice + n_step - 1) % buf_len + indices = [indice] + for _ in range(n_step - 1): + indices.append(buffer.next(indices[-1])) + indices = np.stack(indices) + # terminal indicates buffer indexes nstep after 'indice', + # and are truncated at the end of each episode + terminal = indices[-1] with torch.no_grad(): target_q_torch = target_q_fn(buffer, terminal) # (bsz, ?) - target_q = to_numpy(target_q_torch) + target_q = to_numpy(target_q_torch.reshape(bsz, -1)) + target_q = target_q * BasePolicy.value_mask(buffer, terminal).reshape(-1, 1) + end_flag = buffer.done.copy() + end_flag[buffer.unfinished_index()] = True + target_q = _nstep_return(rew, end_flag, target_q, + indices, gamma, n_step, mean, std) - target_q = _nstep_return(rew, buffer.done, target_q, indice, - gamma, n_step, len(buffer), mean, std) batch.returns = to_torch_as(target_q, target_q_torch) if hasattr(batch, "weight"): # prio buffer update batch.weight = to_torch_as(batch.weight, target_q_torch) @@ -272,57 +323,70 @@ def _compile(self) -> None: f64 = np.array([0, 1], dtype=np.float64) f32 = np.array([0, 1], dtype=np.float32) b = np.array([False, True], dtype=np.bool_) - i64 = np.array([0, 1], dtype=np.int64) + i64 = np.array([[0, 1]], dtype=np.int64) + _gae_return(f64, f64, f64, b, 0.1, 0.1) + _gae_return(f32, f32, f64, b, 0.1, 0.1) _episodic_return(f64, f64, b, 0.1, 0.1) _episodic_return(f32, f64, b, 0.1, 0.1) - _nstep_return(f64, b, f32, i64, 0.1, 1, 4, 0.0, 1.0) + _nstep_return(f64, b, f32.reshape(-1, 1), i64, 0.1, 1, 0.0, 1.0) @njit -def _episodic_return( +def _gae_return( + v_s: np.ndarray, v_s_: np.ndarray, rew: np.ndarray, - done: np.ndarray, + end_flag: np.ndarray, gamma: float, gae_lambda: float, ) -> np.ndarray: - """Numba speedup: 4.1s -> 0.057s.""" - returns = np.roll(v_s_, 1) - m = (1.0 - done) * gamma - delta = rew + v_s_ * m - returns - m *= gae_lambda + returns = np.zeros(rew.shape) + delta = rew + v_s_ * gamma - v_s + m = (1.0 - end_flag) * (gamma * gae_lambda) gae = 0.0 for i in range(len(rew) - 1, -1, -1): gae = delta[i] + m[i] * gae - returns[i] += gae + returns[i] = gae return returns +@njit +def _episodic_return( + v_s_: np.ndarray, + rew: np.ndarray, + end_flag: np.ndarray, + gamma: float, + gae_lambda: float, +) -> np.ndarray: + """Numba speedup: 4.1s -> 0.057s.""" + v_s = np.roll(v_s_, 1) + return _gae_return(v_s, v_s_, rew, end_flag, gamma, gae_lambda) + v_s + + @njit def _nstep_return( rew: np.ndarray, - done: np.ndarray, + end_flag: np.ndarray, target_q: np.ndarray, - indice: np.ndarray, + indices: np.ndarray, gamma: float, n_step: int, - buf_len: int, mean: float, std: float, ) -> np.ndarray: - """Numba speedup: 0.3s -> 0.15s.""" + gamma_buffer = np.ones(n_step + 1) + for i in range(1, n_step + 1): + gamma_buffer[i] = gamma_buffer[i - 1] * gamma target_shape = target_q.shape bsz = target_shape[0] # change target_q to 2d array target_q = target_q.reshape(bsz, -1) returns = np.zeros(target_q.shape) - gammas = np.full(indice.shape, n_step) + gammas = np.full(indices[0].shape, n_step) for n in range(n_step - 1, -1, -1): - now = (indice + n) % buf_len - gammas[done[now] > 0] = n - returns[done[now] > 0] = 0.0 - returns = (rew[now].reshape(-1, 1) - mean) / std + gamma * returns - target_q[gammas != n_step] = 0.0 - gammas = gammas.reshape(-1, 1) - target_q = target_q * (gamma ** gammas) + returns + now = indices[n] + gammas[end_flag[now] > 0] = n + returns[end_flag[now] > 0] = 0.0 + returns = (rew[now].reshape(bsz, 1) - mean) / std + gamma * returns + target_q = target_q * gamma_buffer[gammas].reshape(bsz, 1) + returns return target_q.reshape(target_shape) diff --git a/tianshou/policy/imitation/discrete_bcq.py b/tianshou/policy/imitation/discrete_bcq.py index 688a9901d..0061ea20f 100644 --- a/tianshou/policy/imitation/discrete_bcq.py +++ b/tianshou/policy/imitation/discrete_bcq.py @@ -74,7 +74,7 @@ def _target_q( ) -> torch.Tensor: batch = buffer[indice] # batch.obs_next: s_{t+n} # target_Q = Q_old(s_, argmax(Q_new(s_, *))) - act = self(batch, input="obs_next", eps=0.0).act + act = self(batch, input="obs_next").act target_q, _ = self.model_old(batch.obs_next) target_q = target_q[np.arange(len(act)), act] return target_q @@ -84,13 +84,12 @@ def forward( # type: ignore batch: Batch, state: Optional[Union[dict, Batch, np.ndarray]] = None, input: str = "obs", - eps: Optional[float] = None, **kwargs: Any, ) -> Batch: - if eps is None: - eps = self._eps obs = batch[input] q_value, state = self.model(obs, state=state, info=batch.info) + if not hasattr(self, "max_action_num"): + self.max_action_num = q_value.shape[1] imitation_logits, _ = self.imitator(obs, state=state, info=batch.info) # mask actions for argmax @@ -99,24 +98,25 @@ def forward( # type: ignore mask = (ratio < self._log_tau).float() action = (q_value - np.inf * mask).argmax(dim=-1) - # add eps to act - if not np.isclose(eps, 0.0): - bsz, action_num = q_value.shape - mask = np.random.rand(bsz) < eps - action_rand = torch.randint( - action_num, size=[bsz], device=action.device) - action[mask] = action_rand[mask] - return Batch(act=action, state=state, q_value=q_value, imitation_logits=imitation_logits) + def exploration_noise(self, act: np.ndarray, batch: Batch) -> np.ndarray: + # add eps to act + if not np.isclose(self._eps, 0.0): + bsz = len(act) + mask = np.random.rand(bsz) < self._eps + act_rand = np.random.randint(self.max_action_num, size=[bsz]) + act[mask] = act_rand[mask] + return act + def learn(self, batch: Batch, **kwargs: Any) -> Dict[str, float]: if self._iter % self._freq == 0: self.sync_weight() self._iter += 1 target_q = batch.returns.flatten() - result = self(batch, eps=0.0) + result = self(batch) imitation_logits = result.imitation_logits current_q = result.q_value[np.arange(len(target_q)), batch.act] act = to_torch(batch.act, dtype=torch.long, device=target_q.device) diff --git a/tianshou/policy/modelfree/a2c.py b/tianshou/policy/modelfree/a2c.py index e215c6bbd..1c8edb300 100644 --- a/tianshou/policy/modelfree/a2c.py +++ b/tianshou/policy/modelfree/a2c.py @@ -69,15 +69,17 @@ def process_fn( ) -> Batch: if self._lambda in [0.0, 1.0]: return self.compute_episodic_return( - batch, None, gamma=self._gamma, gae_lambda=self._lambda) + batch, buffer, indice, + None, gamma=self._gamma, gae_lambda=self._lambda) v_ = [] with torch.no_grad(): for b in batch.split(self._batch, shuffle=False, merge_last=True): v_.append(to_numpy(self.critic(b.obs_next))) v_ = np.concatenate(v_, axis=0) return self.compute_episodic_return( - batch, v_, gamma=self._gamma, gae_lambda=self._lambda, - rew_norm=self._rew_norm) + batch, buffer, indice, + v_, gamma=self._gamma, + gae_lambda=self._lambda, rew_norm=self._rew_norm) def forward( self, diff --git a/tianshou/policy/modelfree/c51.py b/tianshou/policy/modelfree/c51.py index b0e94c616..dce2112a8 100644 --- a/tianshou/policy/modelfree/c51.py +++ b/tianshou/policy/modelfree/c51.py @@ -70,9 +70,7 @@ def compute_q_value(self, logits: torch.Tensor) -> torch.Tensor: def _target_dist(self, batch: Batch) -> torch.Tensor: if self._target: a = self(batch, input="obs_next").act - next_dist = self( - batch, model="model_old", input="obs_next" - ).logits + next_dist = self(batch, model="model_old", input="obs_next").logits else: next_b = self(batch, input="obs_next") a = next_b.act diff --git a/tianshou/policy/modelfree/ddpg.py b/tianshou/policy/modelfree/ddpg.py index 1ac293c88..efa9fb7f9 100644 --- a/tianshou/policy/modelfree/ddpg.py +++ b/tianshou/policy/modelfree/ddpg.py @@ -5,7 +5,7 @@ from tianshou.policy import BasePolicy from tianshou.exploration import BaseNoise, GaussianNoise -from tianshou.data import Batch, ReplayBuffer, to_torch_as +from tianshou.data import Batch, ReplayBuffer class DDPGPolicy(BasePolicy): @@ -141,9 +141,6 @@ def forward( obs = batch[input] actions, h = model(obs, state=state, info=batch.info) actions += self._action_bias - if self._noise and not self.updating: - actions += to_torch_as(self._noise(actions.shape), actions) - actions = actions.clamp(self._range[0], self._range[1]) return Batch(act=actions, state=h) def learn(self, batch: Batch, **kwargs: Any) -> Dict[str, float]: @@ -166,3 +163,9 @@ def learn(self, batch: Batch, **kwargs: Any) -> Dict[str, float]: "loss/actor": actor_loss.item(), "loss/critic": critic_loss.item(), } + + def exploration_noise(self, act: np.ndarray, batch: Batch) -> np.ndarray: + if self._noise: + act = act + self._noise(act.shape) + act = act.clip(self._range[0], self._range[1]) + return act diff --git a/tianshou/policy/modelfree/discrete_sac.py b/tianshou/policy/modelfree/discrete_sac.py index 4db2dc9dc..9c46fc4a3 100644 --- a/tianshou/policy/modelfree/discrete_sac.py +++ b/tianshou/policy/modelfree/discrete_sac.py @@ -119,8 +119,7 @@ def learn(self, batch: Batch, **kwargs: Any) -> Dict[str, float]: current_q1a = self.critic1(batch.obs) current_q2a = self.critic2(batch.obs) q = torch.min(current_q1a, current_q2a) - actor_loss = -(self._alpha * entropy - + (dist.probs * q).sum(dim=-1)).mean() + actor_loss = -(self._alpha * entropy + (dist.probs * q).sum(dim=-1)).mean() self.actor_optim.zero_grad() actor_loss.backward() self.actor_optim.step() @@ -145,3 +144,8 @@ def learn(self, batch: Batch, **kwargs: Any) -> Dict[str, float]: result["alpha"] = self._alpha.item() # type: ignore return result + + def exploration_noise( + self, act: Union[np.ndarray, Batch], batch: Batch + ) -> Union[np.ndarray, Batch]: + return act diff --git a/tianshou/policy/modelfree/dqn.py b/tianshou/policy/modelfree/dqn.py index 54397915c..e79ff3206 100644 --- a/tianshou/policy/modelfree/dqn.py +++ b/tianshou/policy/modelfree/dqn.py @@ -46,9 +46,7 @@ def __init__( self.model = model self.optim = optim self.eps = 0.0 - assert ( - 0.0 <= discount_factor <= 1.0 - ), "discount factor should be in [0, 1]" + assert 0.0 <= discount_factor <= 1.0, "discount factor should be in [0, 1]" self._gamma = discount_factor assert estimation_step > 0, "estimation_step should be greater than 0" self._n_step = estimation_step @@ -81,9 +79,7 @@ def _target_q( # target_Q = Q_old(s_, argmax(Q_new(s_, *))) if self._target: a = self(batch, input="obs_next").act - target_q = self( - batch, model="model_old", input="obs_next" - ).logits + target_q = self(batch, model="model_old", input="obs_next").logits target_q = target_q[np.arange(len(a)), a] else: target_q = self(batch, input="obs_next").logits.max(dim=1)[0] @@ -148,20 +144,14 @@ def forward( obs_ = obs.obs if hasattr(obs, "obs") else obs logits, h = model(obs_, state=state, info=batch.info) q = self.compute_q_value(logits) + if not hasattr(self, "max_action_num"): + self.max_action_num = q.shape[1] act: np.ndarray = to_numpy(q.max(dim=1)[1]) if hasattr(obs, "mask"): # some of actions are masked, they cannot be selected q_: np.ndarray = to_numpy(q) q_[~obs.mask] = -np.inf act = q_.argmax(axis=1) - # add eps to act in training or testing phase - if not self.updating and not np.isclose(self.eps, 0.0): - for i in range(len(q)): - if np.random.rand() < self.eps: - q_ = np.random.rand(*q[i].shape) - if hasattr(obs, "mask"): - q_[~obs.mask[i]] = -np.inf - act[i] = q_.argmax() return Batch(logits=logits, act=act, state=h) def learn(self, batch: Batch, **kwargs: Any) -> Dict[str, float]: @@ -179,3 +169,13 @@ def learn(self, batch: Batch, **kwargs: Any) -> Dict[str, float]: self.optim.step() self._iter += 1 return {"loss": loss.item()} + + def exploration_noise(self, act: np.ndarray, batch: Batch) -> np.ndarray: + if not np.isclose(self.eps, 0.0): + for i in range(len(act)): + if np.random.rand() < self.eps: + q_ = np.random.rand(self.max_action_num) + if hasattr(batch["obs"], "mask"): + q_[~batch["obs"].mask[i]] = -np.inf + act[i] = q_.argmax() + return act diff --git a/tianshou/policy/modelfree/pg.py b/tianshou/policy/modelfree/pg.py index 2f3304658..92b7f5d89 100644 --- a/tianshou/policy/modelfree/pg.py +++ b/tianshou/policy/modelfree/pg.py @@ -56,7 +56,8 @@ def process_fn( # batch.returns = self._vanilla_returns(batch) # batch.returns = self._vectorized_returns(batch) return self.compute_episodic_return( - batch, gamma=self._gamma, gae_lambda=1.0, rew_norm=self._rew_norm) + batch, buffer, indice, gamma=self._gamma, + gae_lambda=1.0, rew_norm=self._rew_norm) def forward( self, diff --git a/tianshou/policy/modelfree/ppo.py b/tianshou/policy/modelfree/ppo.py index 4cf9f9054..7fd6f1f26 100644 --- a/tianshou/policy/modelfree/ppo.py +++ b/tianshou/policy/modelfree/ppo.py @@ -100,8 +100,8 @@ def process_fn( ) v_ = to_numpy(torch.cat(v_, dim=0)) batch = self.compute_episodic_return( - batch, v_, gamma=self._gamma, gae_lambda=self._lambda, - rew_norm=self._rew_norm) + batch, buffer, indice, v_, gamma=self._gamma, + gae_lambda=self._lambda, rew_norm=self._rew_norm) batch.v = torch.cat(v, dim=0).flatten() # old value batch.act = to_torch_as(batch.act, v[0]) batch.logp_old = torch.cat(old_log_prob, dim=0) diff --git a/tianshou/policy/modelfree/qrdqn.py b/tianshou/policy/modelfree/qrdqn.py index 754d9acce..8816b6b1a 100644 --- a/tianshou/policy/modelfree/qrdqn.py +++ b/tianshou/policy/modelfree/qrdqn.py @@ -56,9 +56,7 @@ def _target_q( batch = buffer[indice] # batch.obs_next: s_{t+n} if self._target: a = self(batch, input="obs_next").act - next_dist = self( - batch, model="model_old", input="obs_next" - ).logits + next_dist = self(batch, model="model_old", input="obs_next").logits else: next_b = self(batch, input="obs_next") a = next_b.act diff --git a/tianshou/policy/modelfree/sac.py b/tianshou/policy/modelfree/sac.py index fbdd12297..091d1243c 100644 --- a/tianshou/policy/modelfree/sac.py +++ b/tianshou/policy/modelfree/sac.py @@ -6,7 +6,7 @@ from tianshou.policy import DDPGPolicy from tianshou.exploration import BaseNoise -from tianshou.data import Batch, ReplayBuffer, to_torch_as +from tianshou.data import Batch, ReplayBuffer class SACPolicy(DDPGPolicy): @@ -130,9 +130,7 @@ def forward( # type: ignore y = self._action_scale * (1 - y.pow(2)) + self.__eps log_prob = dist.log_prob(x).unsqueeze(-1) log_prob = log_prob - torch.log(y).sum(-1, keepdim=True) - if self._noise is not None and self.training and not self.updating: - act += to_torch_as(self._noise(act.shape), act) - act = act.clamp(self._range[0], self._range[1]) + return Batch( logits=logits, act=act, state=h, dist=dist, log_prob=log_prob) diff --git a/tianshou/policy/multiagent/mapolicy.py b/tianshou/policy/multiagent/mapolicy.py index 5bfd5e9b2..7aa1f661c 100644 --- a/tianshou/policy/multiagent/mapolicy.py +++ b/tianshou/policy/multiagent/mapolicy.py @@ -59,6 +59,18 @@ def process_fn( buffer._meta.rew = save_rew return Batch(results) + def exploration_noise( + self, act: Union[np.ndarray, Batch], batch: Batch + ) -> Union[np.ndarray, Batch]: + """Add exploration noise from sub-policy onto act.""" + for policy in self.policies: + agent_index = np.nonzero(batch.obs.agent_id == policy.agent_id)[0] + if len(agent_index) == 0: + continue + act[agent_index] = policy.exploration_noise( + act[agent_index], batch[agent_index]) + return act + def forward( self, batch: Batch, diff --git a/tianshou/trainer/offline.py b/tianshou/trainer/offline.py index e69364135..7eb6ec5b5 100644 --- a/tianshou/trainer/offline.py +++ b/tianshou/trainer/offline.py @@ -1,8 +1,9 @@ import time import tqdm +import numpy as np from collections import defaultdict from torch.utils.tensorboard import SummaryWriter -from typing import Dict, List, Union, Callable, Optional +from typing import Dict, Union, Callable, Optional from tianshou.policy import BasePolicy from tianshou.utils import tqdm_config, MovAvg @@ -16,11 +17,12 @@ def offline_trainer( test_collector: Collector, max_epoch: int, step_per_epoch: int, - episode_per_test: Union[int, List[int]], + episode_per_test: int, batch_size: int, test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, + reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, writer: Optional[SummaryWriter] = None, log_interval: int = 1, verbose: bool = True, @@ -29,8 +31,7 @@ def offline_trainer( The "step" in trainer means a policy network update. - :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` - class. + :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` class. :param test_collector: the collector used for testing. :type test_collector: :class:`~tianshou.data.Collector` :param int max_epoch: the maximum number of epochs for training. The @@ -49,6 +50,12 @@ def offline_trainer( :param function stop_fn: a function with signature ``f(mean_rewards: float) -> bool``, receives the average undiscounted returns of the testing result, returns a boolean which indicates whether reaching the goal. + :param function reward_metric: a function with signature ``f(rewards: np.ndarray + with shape (num_episode, agent_num)) -> np.ndarray with shape (num_episode,)``, + used in multi-agent RL. We need to return a single scalar for each episode's + result to monitor training in the multi-agent RL setting. This function + specifies what is the desired metric, e.g., the reward of agent 1 or the + average reward over all agents. :param torch.utils.tensorboard.SummaryWriter writer: a TensorBoard SummaryWriter; if None is given, it will not write logs to TensorBoard. :param int log_interval: the log interval of the writer. @@ -81,15 +88,15 @@ def offline_trainer( t.set_postfix(**data) # test result = test_episode(policy, test_collector, test_fn, epoch, - episode_per_test, writer, gradient_step) - if best_epoch == -1 or best_reward < result["rew"]: - best_reward, best_reward_std = result["rew"], result["rew_std"] + episode_per_test, writer, gradient_step, reward_metric) + if best_epoch == -1 or best_reward < result["rews"].mean(): + best_reward, best_reward_std = result["rews"].mean(), result['rews'].std() best_epoch = epoch if save_fn: save_fn(policy) if verbose: - print(f"Epoch #{epoch}: test_reward: {result['rew']:.6f} ± " - f"{result['rew_std']:.6f}, best_reward: {best_reward:.6f} ± " + print(f"Epoch #{epoch}: test_reward: {result['rews'].mean():.6f} ± " + f"{result['rews'].std():.6f}, best_reward: {best_reward:.6f} ± " f"{best_reward_std:.6f} in #{best_epoch}") if stop_fn and stop_fn(best_reward): break diff --git a/tianshou/trainer/offpolicy.py b/tianshou/trainer/offpolicy.py index f34f5b281..ba08c2e0b 100644 --- a/tianshou/trainer/offpolicy.py +++ b/tianshou/trainer/offpolicy.py @@ -1,8 +1,9 @@ import time import tqdm +import numpy as np from collections import defaultdict from torch.utils.tensorboard import SummaryWriter -from typing import Dict, List, Union, Callable, Optional +from typing import Dict, Union, Callable, Optional from tianshou.data import Collector from tianshou.policy import BasePolicy @@ -17,13 +18,14 @@ def offpolicy_trainer( max_epoch: int, step_per_epoch: int, collect_per_step: int, - episode_per_test: Union[int, List[int]], + episode_per_test: int, batch_size: int, update_per_step: int = 1, train_fn: Optional[Callable[[int, int], None]] = None, test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, + reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, writer: Optional[SummaryWriter] = None, log_interval: int = 1, verbose: bool = True, @@ -33,8 +35,7 @@ def offpolicy_trainer( The "step" in trainer means a policy network update. - :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` - class. + :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` class. :param train_collector: the collector used for training. :type train_collector: :class:`~tianshou.data.Collector` :param test_collector: the collector used for testing. @@ -65,6 +66,12 @@ def offpolicy_trainer( :param function stop_fn: a function with signature ``f(mean_rewards: float) -> bool``, receives the average undiscounted returns of the testing result, returns a boolean which indicates whether reaching the goal. + :param function reward_metric: a function with signature ``f(rewards: np.ndarray + with shape (num_episode, agent_num)) -> np.ndarray with shape (num_episode,)``, + used in multi-agent RL. We need to return a single scalar for each episode's + result to monitor training in the multi-agent RL setting. This function + specifies what is the desired metric, e.g., the reward of agent 1 or the + average reward over all agents. :param torch.utils.tensorboard.SummaryWriter writer: a TensorBoard SummaryWriter; if None is given, it will not write logs to TensorBoard. :param int log_interval: the log interval of the writer. @@ -90,35 +97,35 @@ def offpolicy_trainer( if train_fn: train_fn(epoch, env_step) result = train_collector.collect(n_step=collect_per_step) + if len(result["rews"]) > 0 and reward_metric: + result["rews"] = reward_metric(result["rews"]) env_step += int(result["n/st"]) data = { "env_step": str(env_step), - "rew": f"{result['rew']:.2f}", - "len": str(int(result["len"])), + "rew": f"{result['rews'].mean():.2f}", + "len": str(result["lens"].mean()), "n/ep": str(int(result["n/ep"])), "n/st": str(int(result["n/st"])), - "v/ep": f"{result['v/ep']:.2f}", - "v/st": f"{result['v/st']:.2f}", } - if writer and env_step % log_interval == 0: - for k in result.keys(): + if result["n/ep"] > 0: + if writer and env_step % log_interval == 0: writer.add_scalar( - "train/" + k, result[k], global_step=env_step) - if test_in_train and stop_fn and stop_fn(result["rew"]): - test_result = test_episode( - policy, test_collector, test_fn, - epoch, episode_per_test, writer, env_step) - if stop_fn(test_result["rew"]): - if save_fn: - save_fn(policy) - for k in result.keys(): - data[k] = f"{result[k]:.2f}" - t.set_postfix(**data) - return gather_info( - start_time, train_collector, test_collector, - test_result["rew"], test_result["rew_std"]) - else: - policy.train() + "train/rew", result['rews'].mean(), global_step=env_step) + writer.add_scalar( + "train/len", result['lens'].mean(), global_step=env_step) + if test_in_train and stop_fn and stop_fn(result["rews"].mean()): + test_result = test_episode( + policy, test_collector, test_fn, + epoch, episode_per_test, writer, env_step) + if stop_fn(test_result["rews"].mean()): + if save_fn: + save_fn(policy) + t.set_postfix(**data) + return gather_info( + start_time, train_collector, test_collector, + test_result["rews"].mean(), test_result["rews"].std()) + else: + policy.train() for i in range(update_per_step * min( result["n/st"] // collect_per_step, t.total - t.n)): gradient_step += 1 @@ -135,15 +142,15 @@ def offpolicy_trainer( t.update() # test result = test_episode(policy, test_collector, test_fn, epoch, - episode_per_test, writer, env_step) - if best_epoch == -1 or best_reward < result["rew"]: - best_reward, best_reward_std = result["rew"], result["rew_std"] + episode_per_test, writer, env_step, reward_metric) + if best_epoch == -1 or best_reward < result["rews"].mean(): + best_reward, best_reward_std = result["rews"].mean(), result["rews"].std() best_epoch = epoch if save_fn: save_fn(policy) if verbose: - print(f"Epoch #{epoch}: test_reward: {result['rew']:.6f} ± " - f"{result['rew_std']:.6f}, best_reward: {best_reward:.6f} ± " + print(f"Epoch #{epoch}: test_reward: {result['rews'].mean():.6f} ± " + f"{result['rews'].std():.6f}, best_reward: {best_reward:.6f} ± " f"{best_reward_std:.6f} in #{best_epoch}") if stop_fn and stop_fn(best_reward): break diff --git a/tianshou/trainer/onpolicy.py b/tianshou/trainer/onpolicy.py index f094ddd7d..b951f9a9e 100644 --- a/tianshou/trainer/onpolicy.py +++ b/tianshou/trainer/onpolicy.py @@ -1,8 +1,9 @@ import time import tqdm +import numpy as np from collections import defaultdict from torch.utils.tensorboard import SummaryWriter -from typing import Dict, List, Union, Callable, Optional +from typing import Dict, Union, Callable, Optional from tianshou.data import Collector from tianshou.policy import BasePolicy @@ -18,12 +19,13 @@ def onpolicy_trainer( step_per_epoch: int, collect_per_step: int, repeat_per_collect: int, - episode_per_test: Union[int, List[int]], + episode_per_test: int, batch_size: int, train_fn: Optional[Callable[[int, int], None]] = None, test_fn: Optional[Callable[[int, Optional[int]], None]] = None, stop_fn: Optional[Callable[[float], bool]] = None, save_fn: Optional[Callable[[BasePolicy], None]] = None, + reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, writer: Optional[SummaryWriter] = None, log_interval: int = 1, verbose: bool = True, @@ -33,8 +35,7 @@ def onpolicy_trainer( The "step" in trainer means a policy network update. - :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` - class. + :param policy: an instance of the :class:`~tianshou.policy.BasePolicy` class. :param train_collector: the collector used for training. :type train_collector: :class:`~tianshou.data.Collector` :param test_collector: the collector used for testing. @@ -65,6 +66,12 @@ def onpolicy_trainer( :param function stop_fn: a function with signature ``f(mean_rewards: float) -> bool``, receives the average undiscounted returns of the testing result, returns a boolean which indicates whether reaching the goal. + :param function reward_metric: a function with signature ``f(rewards: np.ndarray + with shape (num_episode, agent_num)) -> np.ndarray with shape (num_episode,)``, + used in multi-agent RL. We need to return a single scalar for each episode's + result to monitor training in the multi-agent RL setting. This function + specifies what is the desired metric, e.g., the reward of agent 1 or the + average reward over all agents. :param torch.utils.tensorboard.SummaryWriter writer: a TensorBoard SummaryWriter; if None is given, it will not write logs to TensorBoard. :param int log_interval: the log interval of the writer. @@ -90,33 +97,32 @@ def onpolicy_trainer( if train_fn: train_fn(epoch, env_step) result = train_collector.collect(n_episode=collect_per_step) + if reward_metric: + result["rews"] = reward_metric(result["rews"]) env_step += int(result["n/st"]) data = { "env_step": str(env_step), - "rew": f"{result['rew']:.2f}", - "len": str(int(result["len"])), + "rew": f"{result['rews'].mean():.2f}", + "len": str(int(result["lens"].mean())), "n/ep": str(int(result["n/ep"])), "n/st": str(int(result["n/st"])), - "v/ep": f"{result['v/ep']:.2f}", - "v/st": f"{result['v/st']:.2f}", } if writer and env_step % log_interval == 0: - for k in result.keys(): - writer.add_scalar( - "train/" + k, result[k], global_step=env_step) - if test_in_train and stop_fn and stop_fn(result["rew"]): + writer.add_scalar( + "train/rew", result['rews'].mean(), global_step=env_step) + writer.add_scalar( + "train/len", result['lens'].mean(), global_step=env_step) + if test_in_train and stop_fn and stop_fn(result["rews"].mean()): test_result = test_episode( policy, test_collector, test_fn, epoch, episode_per_test, writer, env_step) - if stop_fn(test_result["rew"]): + if stop_fn(test_result["rews"].mean()): if save_fn: save_fn(policy) - for k in result.keys(): - data[k] = f"{result[k]:.2f}" t.set_postfix(**data) return gather_info( start_time, train_collector, test_collector, - test_result["rew"], test_result["rew_std"]) + test_result["rews"].mean(), test_result["rews"].std()) else: policy.train() losses = policy.update( @@ -139,14 +145,14 @@ def onpolicy_trainer( # test result = test_episode(policy, test_collector, test_fn, epoch, episode_per_test, writer, env_step) - if best_epoch == -1 or best_reward < result["rew"]: - best_reward, best_reward_std = result["rew"], result["rew_std"] + if best_epoch == -1 or best_reward < result["rews"].mean(): + best_reward, best_reward_std = result["rews"].mean(), result["rews"].std() best_epoch = epoch if save_fn: save_fn(policy) if verbose: - print(f"Epoch #{epoch}: test_reward: {result['rew']:.6f} ± " - f"{result['rew_std']:.6f}, best_reward: {best_reward:.6f} ± " + print(f"Epoch #{epoch}: test_reward: {result['rews'].mean():.6f} ± " + f"{result['rews'].std():.6f}, best_reward: {best_reward:.6f} ± " f"{best_reward_std:.6f} in #{best_epoch}") if stop_fn and stop_fn(best_reward): break diff --git a/tianshou/trainer/utils.py b/tianshou/trainer/utils.py index dfffd71a4..2cdeb15fe 100644 --- a/tianshou/trainer/utils.py +++ b/tianshou/trainer/utils.py @@ -1,7 +1,7 @@ import time import numpy as np from torch.utils.tensorboard import SummaryWriter -from typing import Dict, List, Union, Callable, Optional +from typing import Any, Dict, Union, Callable, Optional from tianshou.data import Collector from tianshou.policy import BasePolicy @@ -12,25 +12,26 @@ def test_episode( collector: Collector, test_fn: Optional[Callable[[int, Optional[int]], None]], epoch: int, - n_episode: Union[int, List[int]], + n_episode: int, writer: Optional[SummaryWriter] = None, global_step: Optional[int] = None, -) -> Dict[str, float]: + reward_metric: Optional[Callable[[np.ndarray], np.ndarray]] = None, +) -> Dict[str, Any]: """A simple wrapper of testing policy in collector.""" collector.reset_env() collector.reset_buffer() policy.eval() if test_fn: test_fn(epoch, global_step) - if collector.get_env_num() > 1 and isinstance(n_episode, int): - n = collector.get_env_num() - n_ = np.zeros(n) + n_episode // n - n_[:n_episode % n] += 1 - n_episode = list(n_) result = collector.collect(n_episode=n_episode) + if reward_metric: + result["rews"] = reward_metric(result["rews"]) if writer is not None and global_step is not None: - for k in result.keys(): - writer.add_scalar("test/" + k, result[k], global_step=global_step) + rews, lens = result["rews"], result["lens"] + writer.add_scalar("test/rew", rews.mean(), global_step=global_step) + writer.add_scalar("test/rew_std", rews.std(), global_step=global_step) + writer.add_scalar("test/len", lens.mean(), global_step=global_step) + writer.add_scalar("test/len_std", lens.std(), global_step=global_step) return result