diff --git a/speed_benchmark/install.sh b/speed_benchmark/install.sh new file mode 100755 index 000000000..0b99c2cc0 --- /dev/null +++ b/speed_benchmark/install.sh @@ -0,0 +1 @@ +python3 -m pip install pettingzoo open_spiel tianshou pygame cloudpickle chess pgx diff --git a/workspace/compariosn/vector_env.py b/speed_benchmark/run_open_spiel_forloop.py similarity index 51% rename from workspace/compariosn/vector_env.py rename to speed_benchmark/run_open_spiel_forloop.py index df6eb4291..d85461dcf 100644 --- a/workspace/compariosn/vector_env.py +++ b/speed_benchmark/run_open_spiel_forloop.py @@ -1,6 +1,15 @@ +import argparse +import json +import collections +import time +import numpy as np +import pyspiel +from open_spiel.python.rl_environment import Environment, ChanceEventSampler -# Copied from https://github.com/deepmind/open_spiel/blob/master/open_spiel/python/vector_env.py +# SyncVectorEnv is copied from +# https://github.com/deepmind/open_spiel/blob/master/open_spiel/python/vector_env.py +# # Copyright 2022 DeepMind Technologies Limited # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -14,9 +23,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""A vectorized RL Environment.""" - - class SyncVectorEnv(object): """A vectorized RL Environment. This environment is synchronized - games do not execute in parallel. Speedups @@ -75,4 +81,47 @@ def reset(self, envs_to_reset=None): if envs_to_reset[i] else self.envs[i].get_time_step() for i in range(len(self.envs)) ] - return time_steps \ No newline at end of file + return time_steps + + +def make_single_env(env_name: str, seed: int): + def gen_env(): + game = pyspiel.load_game(env_name) + return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) + return gen_env() + + +def make_env(env_name: str, n_envs: int, seed: int) -> SyncVectorEnv: + return SyncVectorEnv([make_single_env(env_name, seed + i) for i in range(n_envs)]) + + +def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): + # random play for open spiel + StepOutput = collections.namedtuple("step_output", ["action"]) + time_step = env.reset() + assert len(env.envs) == len(time_step) # ensure parallerization + rng = np.random.default_rng() + step_num = 0 + while step_num < n_steps_lim: + # See https://github.com/deepmind/open_spiel/blob/master/open_spiel/python/examples/rl_example.py + actions = [rng.choice(ts.observations["legal_actions"][ts.observations["current_player"]]) for ts in time_step] + step_outputs = [StepOutput(action=action) for action in actions] + time_step, reward, done, unreset_time_steps = env.step(step_outputs, reset_if_done=True) + step_num += batch_size + return step_num + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("env_name") # go, chess backgammon tic_tac_toe + parser.add_argument("batch_size", type=int) + parser.add_argument("n_steps_lim", default=2 ** 10 * 10, type=int) + parser.add_argument("--seed", default=0, type=int) + args = parser.parse_args() + assert args.n_steps_lim % args.batch_size == 0 + env = make_env(args.env_name, args.batch_size, args.seed) + time_sta = time.time() + steps_num = random_play(env, args.n_steps_lim, args.batch_size) + time_end = time.time() + sec = time_end-time_sta + json.dumps({"game": args.env_name, "venv": "for-loop", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec, "batch_size": args.batch_size}) diff --git a/workspace/compariosn/tianshou_env/pettingzoo_env.py b/speed_benchmark/run_open_spiel_subproc.py similarity index 53% rename from workspace/compariosn/tianshou_env/pettingzoo_env.py rename to speed_benchmark/run_open_spiel_subproc.py index aee47f1bc..fe0630ec4 100644 --- a/workspace/compariosn/tianshou_env/pettingzoo_env.py +++ b/speed_benchmark/run_open_spiel_subproc.py @@ -1,37 +1,26 @@ -""" -Copied from TienShou repository: - -https://github.com/thu-ml/tianshou/blob/master/tianshou/env/pettingzoo_env.py - -Distributed under MIT LICENSE: - -https://github.com/thu-ml/tianshou/blob/master/LICENSE - -Modified to use OpenSpiel in SubprocVecEnv (see #384 for changes) -""" - - - +import json +from tianshou.env import SubprocVectorEnv +import numpy as np +import time +import argparse import warnings from abc import ABC from typing import Any, Dict, List, Tuple -import pettingzoo -from gymnasium import spaces -from packaging import version +import pyspiel from pettingzoo.utils.env import AECEnv -from pettingzoo.utils.wrappers import BaseWrapper from open_spiel.python.rl_environment import Environment, ChanceEventSampler -if version.parse(pettingzoo.__version__) < version.parse("1.21.0"): - warnings.warn( - f"You are using PettingZoo {pettingzoo.__version__}. " - f"Future tianshou versions may not support PettingZoo<1.21.0. " - f"Consider upgrading your PettingZoo version.", DeprecationWarning - ) - +# OpenSpielEnv is modified from TianShou repository (see #384 for changes): +# This wrapper enables to use TianShou's SubprocVectorEnv for OpenSpiel +# +# https://github.com/thu-ml/tianshou/blob/master/tianshou/env/pettingzoo_env.py +# +# Distributed under MIT LICENSE: +# +# https://github.com/thu-ml/tianshou/blob/master/LICENSE class OpenSpielEnv(AECEnv, ABC): """The interface for petting zoo environments. @@ -100,3 +89,41 @@ def seed(self, seed: Any = None) -> None: def render(self) -> Any: return self.env.render() + + +def make_single_env(env_name: str, seed: int): + def gen_env(): + game = pyspiel.load_game(env_name) + return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) + return gen_env() + +def make_env(env_name: str, n_envs: int, seed: int): + return SubprocVectorEnv([lambda: OpenSpielEnv(make_single_env(env_name, seed)) for _ in range(n_envs)]) + + +def random_play(env: SubprocVectorEnv, n_steps_lim: int, batch_size: int): + step_num = 0 + rng = np.random.default_rng() + observation, info = env.reset() + while step_num < n_steps_lim: + legal_action_mask = [observation[i]["mask"] for i in range(len(observation))] + action = [rng.choice(legal_action_mask[i]) for i in range(len(legal_action_mask))] # chose action randomly + observation, reward, terminated, _, info = env.step(action) + step_num += batch_size + return step_num + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("env_name") + parser.add_argument("batch_size", type=int) + parser.add_argument("n_steps_lim", default=2 ** 10 * 10, type=int) + parser.add_argument("--seed", default=100, type=bool) + args = parser.parse_args() + assert args.n_steps_lim % args.batch_size == 0 + env = make_env(args.env_name, args.batch_size, args.seed) + time_sta = time.time() + steps_num = random_play(env, args.n_steps_lim, args.batch_size) + time_end = time.time() + sec = time_end - time_sta + print(json.dumps({"game": args.env_name, "venv": "subproc", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec, "batch_size": args.batch_size})) diff --git a/speed_benchmark/run_petting_zoo.py b/speed_benchmark/run_petting_zoo.py new file mode 100644 index 000000000..75023d52a --- /dev/null +++ b/speed_benchmark/run_petting_zoo.py @@ -0,0 +1,77 @@ +import argparse +import time +import json +import numpy as np +import collections +from tianshou.env.pettingzoo_env import PettingZooEnv + + +class AutoResetPettingZooEnv(PettingZooEnv): + def __init__(self, env): + super().__init__(env) + + def step(self, action): + obs, reward, term, trunc, info = super().step(action) + if term: + obs = super().reset() + return obs, reward, term, trunc, info + + +def make_env(env_name, n_envs, vec_env): + + def get_go_env(): + from pettingzoo.classic.go import go + return AutoResetPettingZooEnv(go.env()) + + def get_tictactoe_env(): + from pettingzoo.classic.tictactoe import tictactoe + return AutoResetPettingZooEnv(tictactoe.env()) + + def get_chess_env(): + from pettingzoo.classic.chess import chess + return AutoResetPettingZooEnv(chess.env()) + + if vec_env == "for-loop": + from tianshou.env import DummyVectorEnv as VecEnv + elif vec_env == "subproc": + from tianshou.env import SubprocVectorEnv as VecEnv + + if env_name == "go": + env_fn = get_go_env + elif env_name == "tic_tac_toe": + env_fn = get_tictactoe_env + elif env_name == "chess": + env_fn = get_chess_env + + return VecEnv([env_fn for _ in range(n_envs)]) + + +def random_play(env, n_steps_lim: int, batch_size: int) -> int: + # petting zooのgo環境でrandom gaentを終局まで動かす. + step_num = 0 + rng = np.random.default_rng() + observation = env.reset() + assert len(env._env_fns) == len(observation) # ensure parallerization + while step_num < n_steps_lim: + legal_action_mask = [observation[i]["mask"] for i in range(batch_size)] + action = [rng.choice(np.where(legal_action_mask[i])[0]) for i in range(batch_size)] # chose action randomly + observation, reward, terminated, _, _ = env.step(action) + step_num += batch_size + return step_num + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("env_name") # go, chess, tic_tac_toe + parser.add_argument("venv") # for-loop, subproc + parser.add_argument("batch_size", type=int) + parser.add_argument("n_steps_lim", default=2 ** 10 * 10, type=int) + parser.add_argument("--seed", default=0, type=int) + args = parser.parse_args() + assert args.n_steps_lim % args.batch_size == 0 + env = make_env(args.env_name, args.batch_size, args.venv) + time_sta = time.time() + steps_num = random_play(env, args.n_steps_lim, args.batch_size) + time_end = time.time() + sec = time_end - time_sta + print(json.dumps({"game": args.env_name, "venv": args.venv, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec, "batch_size": args.batch_size})) \ No newline at end of file diff --git a/workspace/speed_benchmark.py b/speed_benchmark/run_pgx.py similarity index 50% rename from workspace/speed_benchmark.py rename to speed_benchmark/run_pgx.py index c10f287ed..6e0bc66d2 100644 --- a/workspace/speed_benchmark.py +++ b/speed_benchmark/run_pgx.py @@ -1,4 +1,5 @@ import time +import json import jax import pgx from pgx.utils import act_randomly @@ -37,28 +38,22 @@ def benchmark(env_id: pgx.EnvId, batch_size, num_steps=(2 ** 12) * 1000): state = step(state, action) te = time.time() - return f"{num_steps / (te - ts):.05f}" + return num_steps, te - ts -N = (2 ** 12) * 100 -print(f"Total # of steps: {N}") -bs_list = [2 ** i for i in range(5, 13)] -print("| env_id |" + "|".join([str(bs) for bs in bs_list]) + "|") -print("|:---:|" + "|".join([":---:" for bs in bs_list]) + "|") -for env_id in get_args(pgx.EnvId): - s = f"|{env_id}|" - for bs in tqdm(bs_list, leave=False): - s += benchmark(env_id, bs, N) - s += "|" - print(s) +games = { + "tic_tac_toe": "tic_tac_toe/v0", + "backgammon": "backgammon/v0", + "shogi": "shogi/v0", + "go": "go-19x19/v0", +} -""" -Total # of steps: 409600 -| env_id |32|64|128|256|512|1024|2048|4096| -|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:| -|tic_tac_toe/v0|25193.54921|51203.80125|99197.43688|206175.78196|413948.81221|723250.23824|1664977.36893|3265886.46947| -|go-19x19/v0|15146.45769|29891.62027|58064.94882|108400.19704|173638.40814|286740.95368|379331.88909|449555.32632| -|shogi/v0|21047.48879|42130.05279|82988.81210|175415.01266|259940.79393|290410.20642|299800.69880|308552.26434| -|minatar/asterix/v0|13066.68075|25836.00751|52134.46018|102929.03752|205880.59846|384825.85566|806843.53100|1553951.76960| -""" \ No newline at end of file +N = 2 ** 10 * 10 +bs_list = [2 ** i for i in range(1, 11)] +d = {} +for game, env_id in games.items(): + for bs in bs_list: + num_steps, sec = benchmark(env_id, bs, N) + print(json.dumps({"game": game, "library": "pgx", + "total_steps": num_steps, "total_sec": sec, "steps/sec": num_steps / sec, "batch_size": bs})) \ No newline at end of file diff --git a/workspace/compariosn/compare.sh b/workspace/compariosn/compare.sh deleted file mode 100755 index 22f662d7e..000000000 --- a/workspace/compariosn/compare.sh +++ /dev/null @@ -1,6 +0,0 @@ -echo "| library | game | type | n_envs | steps | time | time per step |" -echo "| :--- | ---: | ---: | ---: | ---: | ---: | ---: |" -python compare_subproc.py open_spiel go 10 1000 -python compare_subproc.py petting_zoo go 10 1000 -python compare_for_loop.py open_spiel go 10 1000 -python compare_for_loop.py petting_zoo go 10 1000 \ No newline at end of file diff --git a/workspace/compariosn/compare_for_loop.py b/workspace/compariosn/compare_for_loop.py deleted file mode 100644 index bd816009b..000000000 --- a/workspace/compariosn/compare_for_loop.py +++ /dev/null @@ -1,104 +0,0 @@ -from vector_env import SyncVectorEnv -import argparse -import time -import numpy as np -import collections -from tianshou.env import DummyVectorEnv -from tianshou.env.pettingzoo_env import PettingZooEnv - - -class AutoResetPettingZooEnv(PettingZooEnv): - def __init__(self, env): - super().__init__(env) - - def step(self, action): - obs, reward, term, trunc, info = super().step(action) - if term: - obs = super().reset() - return obs, reward, term, trunc, info - - -def petting_zoo_make_env(env_name, n_envs): - from pettingzoo.classic.go import go - #from pettingzoo.classic import chess_v5 - def get_go_env(): - return AutoResetPettingZooEnv(go.env()) - if env_name == "go": - return DummyVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - - -def petting_zoo_random_play(env: DummyVectorEnv, n_steps_lim: int) -> int: - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, _ = env.step(action) - step_num += 1 - return step_num - - -def open_spile_make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler - def gen_env(): - game = pyspiel.load_game(env_name) - return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) - return gen_env() - - -def open_spiel_make_env(env_name: str, n_envs: int, seed: int) -> SyncVectorEnv: - return SyncVectorEnv([open_spile_make_single_env(env_name, seed) for i in range(n_envs)]) - - -def open_spile_random_play(env: SyncVectorEnv, n_steps_lim: int): - # random play for open spiel - StepOutput = collections.namedtuple("step_output", ["action"]) - time_step = env.reset() - rng = np.random.default_rng() - step_num = 0 - while step_num < n_steps_lim: - legal_actions = np.array([ts.observations["legal_actions"][ts.observations["current_player"]] for ts in time_step]) - assert len(env.envs) == len(legal_actions) # ensure parallerization - action = rng.choice(legal_actions, axis=1) # same actions so far - step_outputs = [StepOutput(action=a) for a in action] - time_step, reward, done, unreset_time_steps = env.step(step_outputs, reset_if_done=True) - step_num += 1 - return step_num - - -def measure_time(args): - if args.library == "open_spiel": - env = open_spiel_make_env(args.env_name, args.n_envs, args.seed) - random_play_fn = open_spile_random_play - elif args.library == "petting_zoo": - env = petting_zoo_make_env(args.env_name, args.n_envs) - random_play_fn = petting_zoo_random_play - else: - raise ValueError("Incorrect library name") - time_sta = time.time() - step_num = random_play_fn(env, args.n_steps_lim) - time_end = time.time() - tim = time_end- time_sta - tim_per_step = tim / (step_num * args.n_envs) - print(f"| `{args.library}` | {args.env_name} | forloop | {args.n_envs} | {step_num} | {round(tim, 2)} | {round(tim_per_step, 6)}s |") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("library") - parser.add_argument("env_name") - parser.add_argument("n_envs", type=int) - parser.add_argument("n_steps_lim", type=int) - parser.add_argument("--seed", default=100, type=bool) - args = parser.parse_args() - measure_time(args) \ No newline at end of file diff --git a/workspace/compariosn/compare_subproc.py b/workspace/compariosn/compare_subproc.py deleted file mode 100644 index c99347786..000000000 --- a/workspace/compariosn/compare_subproc.py +++ /dev/null @@ -1,260 +0,0 @@ -""" -Modified from: - -https://github.com/DLR-RM/stable-baselines3/blob/master/stable_baselines3/common/vec_env/base_vec_env.py - -Distributed under MIT License: - -https://github.com/DLR-RM/stable-baselines3/blob/master/LICENSE -""" - -import multiprocessing as mp -from comparison import open_spile_make_env, petting_zoo_make_env -from collections import OrderedDict -from typing import Any, Callable, List, Optional, Sequence, Tuple, Type, Union, Dict, Iterable -import numpy as np -import time -import cloudpickle -import argparse - - -def petting_zoo_make_env(env_name, n_envs): - from tianshou.env import SubprocVectorEnv - from pettingzoo.classic.go import go - from tianshou.env.pettingzoo_env import PettingZooEnv - - class AutoResetPettingZooEnv(PettingZooEnv): # 全体でpetting_zooの関数, classをimportするとopen_spielの速度が落ちる. - def __init__(self, env): - super().__init__(env) - - def step(self, action): - obs, reward, term, trunc, info = super().step(action) - if term: - obs = super().reset() - return obs, reward, term, trunc, info - - #from pettingzoo.classic import chess_v5 - def get_go_env(): - return AutoResetPettingZooEnv(go.env()) - if env_name == "go": - return SubprocVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - - -def petting_zoo_random_play(env, n_steps_lim: int) -> int: - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, _ = env.step(action) - step_num += 1 - return step_num - - -class CloudpickleWrapper: - """ - Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle) - - :param var: the variable you wish to wrap for pickling with cloudpickle - """ - - def __init__(self, var: Any): - self.var = var - - def __getstate__(self) -> Any: - return cloudpickle.dumps(self.var) - - def __setstate__(self, var: Any) -> None: - self.var = cloudpickle.loads(var) - - -def _open_spiel_worker( - remote, parent_remote, state_wrapper: CloudpickleWrapper, env_name: str, -) -> None: - parent_remote.close() - state = state_wrapper.var() - while True: - try: - cmd, data = remote.recv() - if cmd == "step": - action = data - state.apply_action(action) - legal_actions = state.legal_actions() - terminated = state.is_terminal() - if terminated: # auto_reset - state = open_spile_make_env(env_name).new_initial_state() - legal_actions = state.legal_actions() - remote.send((legal_actions, terminated)) - - elif cmd == "reset": - legal_actions = state.legal_actions() - terminated = state.is_terminal() - remote.send((legal_actions, terminated)) - elif cmd == "render": - pass - elif cmd == "close": - remote.close() - break - elif cmd == "get_spaces": - pass - elif cmd == "env_method": - pass - elif cmd == "get_attr": - pass - elif cmd == "set_attr": - pass - elif cmd == "is_wrapped": - pass - else: - raise NotImplementedError(f"`{cmd}` is not implemented in the worker") - except EOFError: - break - - -class SubprocVecEnv(object): - """ - Creates a multiprocess vectorized wrapper for multiple environments, distributing each environment to its own - process, allowing significant speed up when the environment is computationally complex. - - For performance reasons, if your environment is not IO bound, the number of environments should not exceed the - number of logical cores on your CPU. - - .. warning:: - - Only 'forkserver' and 'spawn' start methods are thread-safe, - which is important when TensorFlow sessions or other non thread-safe - libraries are used in the parent (see issue #217). However, compared to - 'fork' they incur a small start-up cost and have restrictions on - global variables. With those methods, users must wrap the code in an - ``if __name__ == "__main__":`` block. - For more information, see the multiprocessing documentation. - - :param env_fns: Environments to run in subprocesses - :param start_method: method used to start the subprocesses. - Must be one of the methods returned by multiprocessing.get_all_start_methods(). - Defaults to 'forkserver' on available platforms, and 'spawn' otherwise. - """ - - def __init__(self, states, env_name, start_method: Optional[str] = None): - self.waiting = False - self.closed = False - self.n_envs = len(states) - env_names = [env_name] * self.n_envs - if start_method is None: - # Fork is not a thread safe method (see issue #217) - # but is more user friendly (does not require to wrap the code in - # a `if __name__ == "__main__":`) - forkserver_available = "forkserver" in mp.get_all_start_methods() - start_method = "forkserver" if forkserver_available else "spawn" - ctx = mp.get_context(start_method) - _worker = _open_spiel_worker - - self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.n_envs)]) - self.processes = [] - for work_remote, remote, state, env_name in zip(self.work_remotes, self.remotes, states, env_names): - args = (work_remote, remote, CloudpickleWrapper(state), env_name) - # daemon=True: if the main process crashes, we should not cause things to hang - process = ctx.Process(target=_worker, args=args, daemon=True) # pytype:disable=attribute-error - process.start() - self.processes.append(process) - work_remote.close() - - - def step_async(self, actions: np.ndarray) -> None: - for remote, action in zip(self.remotes, actions): - remote.send(("step", (action))) - self.waiting = True - - - def step_wait(self) -> Tuple: - results = [remote.recv() for remote in self.remotes] - self.waiting = False - legal_actions, terminated = zip(*results) - return np.stack(legal_actions), np.stack(terminated) - - - def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: - if seed is None: - seed = np.random.randint(0, 2**32 - 1) - for idx, remote in enumerate(self.remotes): - remote.send(("seed", seed + idx)) - return [remote.recv() for remote in self.remotes] - - - def reset(self) -> Tuple: - for remote in self.remotes: - remote.send(("reset", None)) - results = [remote.recv() for remote in self.remotes] - legal_actions, terminated = zip(*results) - return np.stack(legal_actions), np.stack(terminated) - - - def step(self, actions) -> Tuple: - self.step_async(actions) - return self.step_wait() - - - def close(self) -> None: - if self.closed: - return - if self.waiting: - for remote in self.remotes: - remote.recv() - for remote in self.remotes: - remote.send(("close", None)) - for process in self.processes: - process.join() - self.closed = True - -def open_spiel_make_env(env_name: str, n_envs: int): - states = [lambda: open_spile_make_env(env_name).new_initial_state() for i in range(n_envs)] - return SubprocVecEnv(states, env_name) - - -def open_spiel_random_play(env, n_steps_lim): - legal_actions, terminated = env.reset() - n_steps = 0 - rng = np.random.default_rng() - while n_steps < n_steps_lim: - action = rng.choice(legal_actions, axis=1) # n_envs - legal_actions, terminated = env.step(action) - assert env.n_envs == legal_actions.shape[0] # 並列化されていることを確認. - n_steps += 1 - return n_steps - - -def measure_time(args): - if args.library == "open_spiel": - env = open_spiel_make_env(args.env_name, args.n_envs) - random_play_fn = open_spiel_random_play - elif args.library == "petting_zoo": - env = petting_zoo_make_env(args.env_name, args.n_envs) - random_play_fn = petting_zoo_random_play - else: - raise ValueError("Incorrect library name") - time_sta = time.time() - step_num = random_play_fn(env, args.n_steps_lim) - time_end = time.time() - tim = time_end- time_sta - env.close() - tim_per_step = tim / (step_num * args.n_envs) - print(f"| `{args.library}` | {args.env_name} | subprocess | {args.n_envs} | {step_num} | {round(tim, 2)} | {round(tim_per_step, 6)}s |") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("library") - parser.add_argument("env_name") - parser.add_argument("n_envs", type=int) - parser.add_argument("n_steps_lim", type=int) - args = parser.parse_args() - measure_time(args) diff --git a/workspace/compariosn/comparison.py b/workspace/compariosn/comparison.py deleted file mode 100644 index 23aa7f37f..000000000 --- a/workspace/compariosn/comparison.py +++ /dev/null @@ -1,108 +0,0 @@ -""" -Petting zooとopen spileの囲碁環境とpgxの囲碁環境の速度比較を行う -""" -import random -import numpy as np -import time -import argparse -from multiprocessing import Pool -from multiprocessing import Queue -import multiprocessing -from pgx.go import init, step, observe -import jax -from jax import vmap, jit -import jax.numpy as jnp -import numpy as np - -def petting_zoo_make_env(env_name): - from pettingzoo.classic.go import go - #from pettingzoo.classic import chess_v5 - if env_name == "go": - return go.env() - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - -def petting_zoo_random_play(tup): - # petting zooのgo環境でrandom gaentを終局まで動かす. - id, do_print, env_name = tup - env = petting_zoo_make_env(env_name) - step_nums = 0 - env.reset() - for agent in env.agent_iter(): - observation, reward, termination, truncation, info = env.last() - action = None if termination or truncation else np.random.choice(np.where(observation["action_mask"]==1)[0]) # this is where you would insert your policy - env.step(action) - step_nums += 1 - if do_print: - print(id, step_nums) - return id, step_nums - - -def open_spile_make_env(env_name): - import pyspiel - if env_name == "go": - return pyspiel.load_game("go") - elif env_name == "backgammon": - return pyspiel.load_game("backgammon") - elif env_name == "bridge": - return pyspiel.load_game("bridge") - elif env_name == "chess": - return pyspiel.load_game("chess") - else: - raise ValueError("no such environment in open spile") - - - -def open_spile_random_play(tup): - import pyspiel - # open spileのgo環境でrandom gaentを終局まで動かす. - id, do_print, env_name = tup - game = open_spile_make_env(env_name) - state = game.new_initial_state() - step_nums = 0 - while not state.is_terminal(): - legal_actions = state.legal_actions() - # Sample a chance event outcome. - action = np.random.choice(legal_actions) - state.apply_action(action) - step_nums += 1 - if do_print: - print(id, step_nums) - return id, step_nums - - -def measure_time(args): - if args.library == "open_spiel": - random_play_fn = open_spile_random_play - elif args.library == "petting_zoo": - random_play_fn = petting_zoo_random_play - else: - raise ValueError("Incorrect library name") - - p = Pool(args.n_processes) - process_list = [] - time_sta = time.time() - ex = p.map_async(random_play_fn, iterable=[(i, args.print_per_game, args.env_name) for i in range(args.n_games)]) - result = ex.get() - time_end = time.time() - tim = time_end- time_sta - p.close() - avarage_steps = sum(list(map(lambda x: x[1], ex.get())))//args.n_games - print("library: {} env: {} n_games: {} n_processes: {} execution time is {}, avarage number of steps is {}".format(args.library, args.env_name, args.n_games, args.n_processes, tim, avarage_steps)) - - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("library") - parser.add_argument("env_name") - parser.add_argument("n_processes", type=int) - parser.add_argument("n_games", type=int) - parser.add_argument("--print_per_game", default=False, type=bool) - args = parser.parse_args() - measure_time(args) - - \ No newline at end of file diff --git a/workspace/compariosn/install.sh b/workspace/compariosn/install.sh deleted file mode 100755 index dd0b16f55..000000000 --- a/workspace/compariosn/install.sh +++ /dev/null @@ -1,3 +0,0 @@ -pip install pettingzoo tianshou -python3 -m pip install open_spiel -pip install pygame cloudpickle \ No newline at end of file diff --git a/workspace/compariosn/open_spiel_forloop.py b/workspace/compariosn/open_spiel_forloop.py deleted file mode 100644 index 143f1b9e2..000000000 --- a/workspace/compariosn/open_spiel_forloop.py +++ /dev/null @@ -1,49 +0,0 @@ -from vector_env import SyncVectorEnv -import argparse -import time -import numpy as np -import collections - - -def make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler - def gen_env(): - game = pyspiel.load_game(env_name) - return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) - return gen_env() - - -def make_env(env_name: str, n_envs: int, seed: int) -> SyncVectorEnv: - return SyncVectorEnv([make_single_env(env_name, seed) for i in range(n_envs)]) - - -def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): - # random play for open spiel - StepOutput = collections.namedtuple("step_output", ["action"]) - time_step = env.reset() - rng = np.random.default_rng() - step_num = 0 - while step_num < n_steps_lim: - legal_actions = np.array([ts.observations["legal_actions"][ts.observations["current_player"]] for ts in time_step]) - assert len(env.envs) == len(legal_actions) # ensure parallerization - action = [rng.choice(legal_actions[i]) for i in range(len(legal_actions))] - step_outputs = [StepOutput(action=a) for a in action] - time_step, reward, done, unreset_time_steps = env.step(step_outputs, reset_if_done=True) - step_num += batch_size - return step_num - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("env_name") - parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) - parser.add_argument("--seed", default=100, type=bool) - args = parser.parse_args() - assert args.n_steps_lim % args.batch_size == 0 - env = make_env(args.env_name, args.batch_size, args.seed) - time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) - time_end = time.time() - print((step_num)/(time_end-time_sta), time_end-time_sta) diff --git a/workspace/compariosn/open_spiel_subproc.py b/workspace/compariosn/open_spiel_subproc.py deleted file mode 100644 index 668f1cbfc..000000000 --- a/workspace/compariosn/open_spiel_subproc.py +++ /dev/null @@ -1,45 +0,0 @@ -from tianshou_env.pettingzoo_env import OpenSpielEnv -from tianshou_env.venvs import SubprocVectorEnv -import numpy as np -import time -import argparse - - -def make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler - def gen_env(): - game = pyspiel.load_game(env_name) - return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) - return gen_env() - -def make_env(env_name: str, n_envs: int, seed: int): - return SubprocVectorEnv([lambda: OpenSpielEnv(make_single_env(env_name, seed)) for _ in range(n_envs)]) - - -def random_play(env: SubprocVectorEnv, n_steps_lim: int, batch_size: int): - step_num = 0 - rng = np.random.default_rng() - observation, info = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - legal_action_mask = [observation[i]["mask"] for i in range(len(observation))] - action = [rng.choice(legal_action_mask[i]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, info = env.step(action) - step_num += batch_size - return step_num - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("env_name") - parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) - parser.add_argument("--seed", default=100, type=bool) - args = parser.parse_args() - assert args.n_steps_lim % args.batch_size == 0 - env = make_env(args.env_name, args.batch_size, args.seed) - time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) - time_end = time.time() - print((step_num)/(time_end-time_sta)) diff --git a/workspace/compariosn/petting_zoo_forloop.py b/workspace/compariosn/petting_zoo_forloop.py deleted file mode 100644 index 6b97b4a26..000000000 --- a/workspace/compariosn/petting_zoo_forloop.py +++ /dev/null @@ -1,64 +0,0 @@ -import argparse -import time -import numpy as np -import collections -from tianshou.env import DummyVectorEnv -from tianshou.env.pettingzoo_env import PettingZooEnv -from pettingzoo.classic.tictactoe import tictactoe - -class AutoResetPettingZooEnv(PettingZooEnv): - def __init__(self, env): - super().__init__(env) - - def step(self, action): - obs, reward, term, trunc, info = super().step(action) - if term: - obs = super().reset() - return obs, reward, term, trunc, info - - -def make_env(env_name, n_envs): - from pettingzoo.classic.go import go - #from pettingzoo.classic import chess_v5 - def get_go_env(): - return AutoResetPettingZooEnv(go.env()) - def get_tictactoe_env(): - return AutoResetPettingZooEnv(tictactoe.env()) - if env_name == "go": - return DummyVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "tictactoe": - return DummyVectorEnv([get_tictactoe_env for _ in range(n_envs)]) - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - - -def random_play(env: DummyVectorEnv, n_steps_lim: int, batch_size: int) -> int: - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, _ = env.step(action) - step_num += batch_size - return step_num - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("env_name") - parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) - args = parser.parse_args() - assert args.n_steps_lim % args.batch_size == 0 - env = make_env(args.env_name, args.batch_size) - time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) - time_end = time.time() - print((step_num)/(time_end-time_sta)) \ No newline at end of file diff --git a/workspace/compariosn/petting_zoo_subproc.py b/workspace/compariosn/petting_zoo_subproc.py deleted file mode 100644 index ed34cfdd5..000000000 --- a/workspace/compariosn/petting_zoo_subproc.py +++ /dev/null @@ -1,64 +0,0 @@ -from tianshou.env import SubprocVectorEnv -from pettingzoo.classic.go import go -from pettingzoo.classic.tictactoe import tictactoe -from tianshou.env.pettingzoo_env import PettingZooEnv -import argparse -import numpy as np -import time - -class AutoResetPettingZooEnv(PettingZooEnv): # 全体でpetting_zooの関数, classをimportするとopen_spielの速度が落ちる. - def __init__(self, env): - super().__init__(env) - - def step(self, action): - obs, reward, term, trunc, info = super().step(action) - if term: - obs = super().reset() - return obs, reward, term, trunc, info - - -def make_env(env_name, n_envs): - - #from pettingzoo.classic import chess_v5 - def get_go_env(): - return AutoResetPettingZooEnv(go.env()) - def get_tictactoe_env(): - return AutoResetPettingZooEnv(tictactoe.env()) - if env_name == "go": - return SubprocVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "tictactoe": - return SubprocVectorEnv([get_tictactoe_env for _ in range(n_envs)]) - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - - -def random_play(env, n_steps_lim: int, batch_size: int) -> int: - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, _ = env.step(action) - step_num += batch_size - return step_num - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("env_name") - parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) - args = parser.parse_args() - assert args.n_steps_lim % args.batch_size == 0 - env = make_env(args.env_name, args.batch_size) - time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) - time_end = time.time() - print((step_num)/(time_end-time_sta)) \ No newline at end of file diff --git a/workspace/compariosn/tianshou_env/__init__.py b/workspace/compariosn/tianshou_env/__init__.py deleted file mode 100644 index a00c3cd38..000000000 --- a/workspace/compariosn/tianshou_env/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Env package.""" - -from tianshou.env.gym_wrappers import ( - ContinuousToDiscrete, - MultiDiscreteToDiscrete, - TruncatedAsTerminated, -) -from tianshou.env.venv_wrappers import VectorEnvNormObs, VectorEnvWrapper -from tianshou.env.venvs import ( - BaseVectorEnv, - DummyVectorEnv, - RayVectorEnv, - ShmemVectorEnv, - SubprocVectorEnv, -) - -try: - from tianshou.env.pettingzoo_env import PettingZooEnv -except ImportError: - pass - -__all__ = [ - "BaseVectorEnv", - "DummyVectorEnv", - "SubprocVectorEnv", - "ShmemVectorEnv", - "RayVectorEnv", - "VectorEnvWrapper", - "VectorEnvNormObs", - "PettingZooEnv", - "ContinuousToDiscrete", - "MultiDiscreteToDiscrete", - "TruncatedAsTerminated", -] diff --git a/workspace/compariosn/tianshou_env/gym_wrappers.py b/workspace/compariosn/tianshou_env/gym_wrappers.py deleted file mode 100644 index c9ce66acb..000000000 --- a/workspace/compariosn/tianshou_env/gym_wrappers.py +++ /dev/null @@ -1,81 +0,0 @@ -from typing import Any, Dict, List, SupportsFloat, Tuple, Union - -import gymnasium as gym -import numpy as np -from packaging import version - - -class ContinuousToDiscrete(gym.ActionWrapper): - """Gym environment wrapper to take discrete action in a continuous environment. - - :param gym.Env env: gym environment with continuous action space. - :param int action_per_dim: number of discrete actions in each dimension - of the action space. - """ - - def __init__(self, env: gym.Env, action_per_dim: Union[int, List[int]]) -> None: - super().__init__(env) - assert isinstance(env.action_space, gym.spaces.Box) - low, high = env.action_space.low, env.action_space.high - if isinstance(action_per_dim, int): - action_per_dim = [action_per_dim] * env.action_space.shape[0] - assert len(action_per_dim) == env.action_space.shape[0] - self.action_space = gym.spaces.MultiDiscrete(action_per_dim) - self.mesh = np.array( - [np.linspace(lo, hi, a) for lo, hi, a in zip(low, high, action_per_dim)], - dtype=object - ) - - def action(self, act: np.ndarray) -> np.ndarray: # type: ignore - # modify act - assert len(act.shape) <= 2, f"Unknown action format with shape {act.shape}." - if len(act.shape) == 1: - return np.array([self.mesh[i][a] for i, a in enumerate(act)]) - return np.array([[self.mesh[i][a] for i, a in enumerate(a_)] for a_ in act]) - - -class MultiDiscreteToDiscrete(gym.ActionWrapper): - """Gym environment wrapper to take discrete action in multidiscrete environment. - - :param gym.Env env: gym environment with multidiscrete action space. - """ - - def __init__(self, env: gym.Env) -> None: - super().__init__(env) - assert isinstance(env.action_space, gym.spaces.MultiDiscrete) - nvec = env.action_space.nvec - assert nvec.ndim == 1 - self.bases = np.ones_like(nvec) - for i in range(1, len(self.bases)): - self.bases[i] = self.bases[i - 1] * nvec[-i] - self.action_space = gym.spaces.Discrete(np.prod(nvec)) - - def action(self, act: np.ndarray) -> np.ndarray: # type: ignore - converted_act = [] - for b in np.flip(self.bases): - converted_act.append(act // b) - act = act % b - return np.array(converted_act).transpose() - - -class TruncatedAsTerminated(gym.Wrapper): - """A wrapper that set ``terminated = terminated or truncated`` for ``step()``. - - It's intended to use with ``gym.wrappers.TimeLimit``. - - :param gym.Env env: gym environment. - """ - - def __init__(self, env: gym.Env): - super().__init__(env) - if not version.parse(gym.__version__) >= version.parse('0.26.0'): - raise EnvironmentError( - f"TruncatedAsTerminated is not applicable with gym version \ - {gym.__version__}" - ) - - def step(self, - act: np.ndarray) -> Tuple[Any, SupportsFloat, bool, bool, Dict[str, Any]]: - observation, reward, terminated, truncated, info = super().step(act) - terminated = (terminated or truncated) - return observation, reward, terminated, truncated, info diff --git a/workspace/compariosn/tianshou_env/test_tianshou_subproc.py b/workspace/compariosn/tianshou_env/test_tianshou_subproc.py deleted file mode 100644 index 39cd1172a..000000000 --- a/workspace/compariosn/tianshou_env/test_tianshou_subproc.py +++ /dev/null @@ -1,46 +0,0 @@ -from pettingzoo_env import OpenSpielEnv -from venvs import SubprocVectorEnv -import numpy as np -import time -import pyspiel - - -if __name__ == "__main__": - env_name = "go" - n_envs = 10 - seed = 100 - n_steps_lim = 1000 - - def open_spile_make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler - def gen_env(): - game = pyspiel.load_game(env_name) - return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) - return gen_env() - def get_go_env(): - return OpenSpielEnv(open_spile_make_single_env(env_name, seed)) - - env = SubprocVectorEnv([lambda: OpenSpielEnv(open_spile_make_single_env(env_name, seed)) for _ in range(n_envs)]) - - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation, info = env.reset() - terminated = np.zeros(len(env._env_fns)) - time_sta = time.time() - while step_num < n_steps_lim: - legal_action_mask = [observation[i]["mask"] for i in range(len(observation))] - #old_board_black = [info[i]["info"][0][:361] if observation[i]["agent_id"] == 0 else info[i]["info"][0][361: 722] for i in range(n_envs)] # 手番のplayerから見たboard - action = [rng.choice(legal_action_mask[i]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, info = env.step(action) - #new_legal_action_mask = [observation[i]["mask"] for i in range(len(observation))] - #assert sum([((not action[i] in new_legal_action_mask[i]) & (not terminated[i])) | (action[i]==361)| terminated[i] for i in range(n_envs)]) == n_envs # 実行済みのactionが消えていることを確認. 361はパス. - #new_board_black = [info[i]["info"][0][361: 722] if observation[i]["agent_id"] == 0 else info[i]["info"][0][:361] for i in range(n_envs)] # 直前の手番のplayerから見たboard - #assert sum([(sum(new_board_black[i]) > sum(old_board_black[i])) | (action[i]==361) | terminated[i] for i in range(n_envs)]) # 石を置いた場合は増えているかどうか 361はパス. - #step_num += 1 - time_end = time.time() - tim = time_end - time_sta - print(tim) - - diff --git a/workspace/compariosn/tianshou_env/utils.py b/workspace/compariosn/tianshou_env/utils.py deleted file mode 100644 index cbd36d998..000000000 --- a/workspace/compariosn/tianshou_env/utils.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import TYPE_CHECKING, Any, Tuple, Union - -import cloudpickle -import gymnasium -import numpy as np - -try: - from tianshou.env.pettingzoo_env import PettingZooEnv -except ImportError: - PettingZooEnv = None # type: ignore - -if TYPE_CHECKING: - import gym - -ENV_TYPE = Union[gymnasium.Env, "gym.Env", PettingZooEnv] - -gym_new_venv_step_type = Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, - np.ndarray] - - -class CloudpickleWrapper(object): - """A cloudpickle wrapper used in SubprocVectorEnv.""" - - def __init__(self, data: Any) -> None: - self.data = data - - def __getstate__(self) -> str: - return cloudpickle.dumps(self.data) - - def __setstate__(self, data: str) -> None: - self.data = cloudpickle.loads(data) diff --git a/workspace/compariosn/tianshou_env/venv_wrappers.py b/workspace/compariosn/tianshou_env/venv_wrappers.py deleted file mode 100644 index 66470289c..000000000 --- a/workspace/compariosn/tianshou_env/venv_wrappers.py +++ /dev/null @@ -1,123 +0,0 @@ -from typing import Any, List, Optional, Tuple, Union - -import numpy as np - -from tianshou.env.utils import gym_new_venv_step_type -from tianshou.env.venvs import GYM_RESERVED_KEYS, BaseVectorEnv -from tianshou.utils import RunningMeanStd - - -class VectorEnvWrapper(BaseVectorEnv): - """Base class for vectorized environments wrapper.""" - - def __init__(self, venv: BaseVectorEnv) -> None: - self.venv = venv - self.is_async = venv.is_async - - def __len__(self) -> int: - return len(self.venv) - - def __getattribute__(self, key: str) -> Any: - if key in GYM_RESERVED_KEYS: # reserved keys in gym.Env - return getattr(self.venv, key) - else: - return super().__getattribute__(key) - - def get_env_attr( - self, - key: str, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> List[Any]: - return self.venv.get_env_attr(key, id) - - def set_env_attr( - self, - key: str, - value: Any, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> None: - return self.venv.set_env_attr(key, value, id) - - def reset( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - **kwargs: Any, - ) -> Tuple[np.ndarray, Union[dict, List[dict]]]: - return self.venv.reset(id, **kwargs) - - def step( - self, - action: np.ndarray, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> gym_new_venv_step_type: - return self.venv.step(action, id) - - def seed( - self, - seed: Optional[Union[int, List[int]]] = None, - ) -> List[Optional[List[int]]]: - return self.venv.seed(seed) - - def render(self, **kwargs: Any) -> List[Any]: - return self.venv.render(**kwargs) - - def close(self) -> None: - self.venv.close() - - -class VectorEnvNormObs(VectorEnvWrapper): - """An observation normalization wrapper for vectorized environments. - - :param bool update_obs_rms: whether to update obs_rms. Default to True. - """ - - def __init__( - self, - venv: BaseVectorEnv, - update_obs_rms: bool = True, - ) -> None: - super().__init__(venv) - # initialize observation running mean/std - self.update_obs_rms = update_obs_rms - self.obs_rms = RunningMeanStd() - - def reset( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - **kwargs: Any, - ) -> Tuple[np.ndarray, Union[dict, List[dict]]]: - obs, info = self.venv.reset(id, **kwargs) - - if isinstance(obs, tuple): # type: ignore - raise TypeError( - "Tuple observation space is not supported. ", - "Please change it to array or dict space", - ) - - if self.obs_rms and self.update_obs_rms: - self.obs_rms.update(obs) - obs = self._norm_obs(obs) - return obs, info - - def step( - self, - action: np.ndarray, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> gym_new_venv_step_type: - step_results = self.venv.step(action, id) - if self.obs_rms and self.update_obs_rms: - self.obs_rms.update(step_results[0]) - return (self._norm_obs(step_results[0]), *step_results[1:]) - - def _norm_obs(self, obs: np.ndarray) -> np.ndarray: - if self.obs_rms: - return self.obs_rms.norm(obs) # type: ignore - return obs - - def set_obs_rms(self, obs_rms: RunningMeanStd) -> None: - """Set with given observation running mean/std.""" - self.obs_rms = obs_rms - - def get_obs_rms(self) -> RunningMeanStd: - """Return observation running mean/std.""" - return self.obs_rms diff --git a/workspace/compariosn/tianshou_env/venvs.py b/workspace/compariosn/tianshou_env/venvs.py deleted file mode 100644 index 09379e2a8..000000000 --- a/workspace/compariosn/tianshou_env/venvs.py +++ /dev/null @@ -1,483 +0,0 @@ -import warnings -from typing import Any, Callable, List, Optional, Tuple, Union - -import gymnasium as gym -import numpy as np -import packaging - -from .pettingzoo_env import OpenSpielEnv -from .utils import ENV_TYPE, gym_new_venv_step_type -from .worker import ( - DummyEnvWorker, - EnvWorker, - RayEnvWorker, - SubprocEnvWorker, -) - -try: - import gym as old_gym - has_old_gym = True -except ImportError: - has_old_gym = False - -GYM_RESERVED_KEYS = [ - "metadata", "reward_range", "spec", "action_space", "observation_space" -] - - -def _patch_env_generator(fn: Callable[[], ENV_TYPE]) -> Callable[[], gym.Env]: - """Takes an environment generator and patches it to return Gymnasium envs. - - This function takes the environment generator `fn` and returns a patched - generator, without invoking `fn`. The original generator may return - Gymnasium or OpenAI Gym environments, but the patched generator wraps - the result of `fn` in a shimmy wrapper to convert it to Gymnasium, - if necessary. - """ - - def patched() -> gym.Env: - assert callable( - fn - ), "Env generators that are provided to vector environemnts must be callable." - - env = fn() - if isinstance(env, (gym.Env, OpenSpielEnv)): - return env - - if not has_old_gym or not isinstance(env, old_gym.Env): - raise ValueError( - f"Environment generator returned a {type(env)}, not a Gymnasium " - f"environment. In this case, we expect OpenAI Gym to be " - f"installed and the environment to be an OpenAI Gym environment." - ) - - try: - import shimmy - except ImportError as e: - raise ImportError( - "Missing shimmy installation. You provided an environment generator " - "that returned an OpenAI Gym environment. " - "Tianshou has transitioned to using Gymnasium internally. " - "In order to use OpenAI Gym environments with tianshou, you need to " - "install shimmy (`pip install shimmy`)." - ) from e - - warnings.warn( - "You provided an environment generator that returned an OpenAI Gym " - "environment. We strongly recommend transitioning to Gymnasium " - "environments. " - "Tianshou is automatically wrapping your environments in a compatibility " - "layer, which could potentially cause issues." - ) - - gym_version = packaging.version.parse(old_gym.__version__) - if gym_version >= packaging.version.parse("0.26.0"): - return shimmy.GymV26CompatibilityV0(env=env) - elif gym_version >= packaging.version.parse("0.22.0"): - return shimmy.GymV22CompatibilityV0(env=env) - else: - raise Exception( - f"Found OpenAI Gym version {gym.__version__}. " - f"Tianshou only supports OpenAI Gym environments of version>=0.22.0" - ) - - return patched - - -class BaseVectorEnv(object): - """Base class for vectorized environments. - - Usage: - :: - - env_num = 8 - envs = DummyVectorEnv([lambda: gym.make(task) for _ in range(env_num)]) - assert len(envs) == env_num - - It accepts a list of environment generators. In other words, an environment - generator ``efn`` of a specific task means that ``efn()`` returns the - environment of the given task, for example, ``gym.make(task)``. - - All of the VectorEnv must inherit :class:`~tianshou.env.BaseVectorEnv`. - Here are some other usages: - :: - - envs.seed(2) # which is equal to the next line - envs.seed([2, 3, 4, 5, 6, 7, 8, 9]) # set specific seed for each env - obs = envs.reset() # reset all environments - obs = envs.reset([0, 5, 7]) # reset 3 specific environments - obs, rew, done, info = envs.step([1] * 8) # step synchronously - envs.render() # render all environments - envs.close() # close all environments - - .. warning:: - - If you use your own environment, please make sure the ``seed`` method - is set up properly, e.g., - :: - - def seed(self, seed): - np.random.seed(seed) - - Otherwise, the outputs of these envs may be the same with each other. - - :param env_fns: a list of callable envs, ``env_fns[i]()`` generates the i-th env. - :param worker_fn: a callable worker, ``worker_fn(env_fns[i])`` generates a - worker which contains the i-th env. - :param int wait_num: use in asynchronous simulation if the time cost of - ``env.step`` varies with time and synchronously waiting for all - environments to finish a step is time-wasting. In that case, we can - return when ``wait_num`` environments finish a step and keep on - simulation in these environments. If ``None``, asynchronous simulation - is disabled; else, ``1 <= wait_num <= env_num``. - :param float timeout: use in asynchronous simulation same as above, in each - vectorized step it only deal with those environments spending time - within ``timeout`` seconds. - """ - - def __init__( - self, - env_fns: List[Callable[[], ENV_TYPE]], - worker_fn: Callable[[Callable[[], gym.Env]], EnvWorker], - wait_num: Optional[int] = None, - timeout: Optional[float] = None, - ) -> None: - self._env_fns = env_fns - # A VectorEnv contains a pool of EnvWorkers, which corresponds to - # interact with the given envs (one worker <-> one env). - self.workers = [worker_fn(_patch_env_generator(fn)) for fn in env_fns] - self.worker_class = type(self.workers[0]) - assert issubclass(self.worker_class, EnvWorker) - assert all([isinstance(w, self.worker_class) for w in self.workers]) - - self.env_num = len(env_fns) - self.wait_num = wait_num or len(env_fns) - assert 1 <= self.wait_num <= len(env_fns), \ - f"wait_num should be in [1, {len(env_fns)}], but got {wait_num}" - self.timeout = timeout - assert self.timeout is None or self.timeout > 0, \ - f"timeout is {timeout}, it should be positive if provided!" - self.is_async = self.wait_num != len(env_fns) or timeout is not None - self.waiting_conn: List[EnvWorker] = [] - # environments in self.ready_id is actually ready - # but environments in self.waiting_id are just waiting when checked, - # and they may be ready now, but this is not known until we check it - # in the step() function - self.waiting_id: List[int] = [] - # all environments are ready in the beginning - self.ready_id = list(range(self.env_num)) - self.is_closed = False - - def _assert_is_not_closed(self) -> None: - assert not self.is_closed, \ - f"Methods of {self.__class__.__name__} cannot be called after close." - - def __len__(self) -> int: - """Return len(self), which is the number of environments.""" - return self.env_num - - def __getattribute__(self, key: str) -> Any: - """Switch the attribute getter depending on the key. - - Any class who inherits ``gym.Env`` will inherit some attributes, like - ``action_space``. However, we would like the attribute lookup to go straight - into the worker (in fact, this vector env's action_space is always None). - """ - if key in GYM_RESERVED_KEYS: # reserved keys in gym.Env - return self.get_env_attr(key) - else: - return super().__getattribute__(key) - - def get_env_attr( - self, - key: str, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> List[Any]: - """Get an attribute from the underlying environments. - - If id is an int, retrieve the attribute denoted by key from the environment - underlying the worker at index id. The result is returned as a list with one - element. Otherwise, retrieve the attribute for all workers at indices id and - return a list that is ordered correspondingly to id. - - :param str key: The key of the desired attribute. - :param id: Indice(s) of the desired worker(s). Default to None for all env_id. - - :return list: The list of environment attributes. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if self.is_async: - self._assert_id(id) - - return [self.workers[j].get_env_attr(key) for j in id] - - def set_env_attr( - self, - key: str, - value: Any, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> None: - """Set an attribute in the underlying environments. - - If id is an int, set the attribute denoted by key from the environment - underlying the worker at index id to value. - Otherwise, set the attribute for all workers at indices id. - - :param str key: The key of the desired attribute. - :param Any value: The new value of the attribute. - :param id: Indice(s) of the desired worker(s). Default to None for all env_id. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if self.is_async: - self._assert_id(id) - for j in id: - self.workers[j].set_env_attr(key, value) - - def _wrap_id( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> Union[List[int], np.ndarray]: - if id is None: - return list(range(self.env_num)) - return [id] if np.isscalar(id) else id # type: ignore - - def _assert_id(self, id: Union[List[int], np.ndarray]) -> None: - for i in id: - assert i not in self.waiting_id, \ - f"Cannot interact with environment {i} which is stepping now." - assert i in self.ready_id, \ - f"Can only interact with ready environments {self.ready_id}." - - def reset( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - **kwargs: Any, - ) -> Tuple[np.ndarray, Union[dict, List[dict]]]: - """Reset the state of some envs and return initial observations. - - If id is None, reset the state of all the environments and return - initial observations, otherwise reset the specific environments with - the given id, either an int or a list. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if self.is_async: - self._assert_id(id) - - # send(None) == reset() in worker - for i in id: - self.workers[i].send(None, **kwargs) - ret_list = [self.workers[i].recv() for i in id] - #assert isinstance(ret_list[0], (tuple, list)) and len( - # ret_list[0] - #) == 2 and isinstance(ret_list[0][1], dict) - - obs_list = [r[0] for r in ret_list] - - if isinstance(obs_list[0], tuple): # type: ignore - raise TypeError( - "Tuple observation space is not supported. ", - "Please change it to array or dict space", - ) - try: - obs = np.stack(obs_list) - except ValueError: # different len(obs) - obs = np.array(obs_list, dtype=object) - - infos = [r[1] for r in ret_list] - return obs, infos # type: ignore - - def step( - self, - action: np.ndarray, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> gym_new_venv_step_type: - """Run one timestep of some environments' dynamics. - - If id is None, run one timestep of all the environments’ dynamics; - otherwise run one timestep for some environments with given id, either - an int or a list. When the end of episode is reached, you are - responsible for calling reset(id) to reset this environment’s state. - - Accept a batch of action and return a tuple (batch_obs, batch_rew, - batch_done, batch_info) in numpy format. - - :param numpy.ndarray action: a batch of action provided by the agent. - - :return: A tuple consisting of either: - - * ``obs`` a numpy.ndarray, the agent's observation of current environments - * ``rew`` a numpy.ndarray, the amount of rewards returned after \ - previous actions - * ``terminated`` a numpy.ndarray, whether these episodes have been \ - terminated - * ``truncated`` a numpy.ndarray, whether these episodes have been truncated - * ``info`` a numpy.ndarray, contains auxiliary diagnostic \ - information (helpful for debugging, and sometimes learning) - - For the async simulation: - - Provide the given action to the environments. The action sequence - should correspond to the ``id`` argument, and the ``id`` argument - should be a subset of the ``env_id`` in the last returned ``info`` - (initially they are env_ids of all the environments). If action is - None, fetch unfinished step() calls instead. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if not self.is_async: - assert len(action) == len(id) - for i, j in enumerate(id): - self.workers[j].send(action[i]) - result = [] - for j in id: - env_return = self.workers[j].recv() - env_return[-1]["env_id"] = j - result.append(env_return) - else: - if action is not None: - self._assert_id(id) - assert len(action) == len(id) - for act, env_id in zip(action, id): - self.workers[env_id].send(act) - self.waiting_conn.append(self.workers[env_id]) - self.waiting_id.append(env_id) - self.ready_id = [x for x in self.ready_id if x not in id] - ready_conns: List[EnvWorker] = [] - while not ready_conns: - ready_conns = self.worker_class.wait( - self.waiting_conn, self.wait_num, self.timeout - ) - result = [] - for conn in ready_conns: - waiting_index = self.waiting_conn.index(conn) - self.waiting_conn.pop(waiting_index) - env_id = self.waiting_id.pop(waiting_index) - # env_return can be (obs, reward, done, info) or - # (obs, reward, terminated, truncated, info) - env_return = conn.recv() - env_return[-1]["env_id"] = env_id # Add `env_id` to info - result.append(env_return) - self.ready_id.append(env_id) - obs_list, rew_list, term_list, trunc_list, info_list = tuple(zip(*result)) - try: - obs_stack = np.stack(obs_list) - except ValueError: # different len(obs) - obs_stack = np.array(obs_list, dtype=object) - return obs_stack, np.stack(rew_list), np.stack(term_list), np.stack( - trunc_list - ), np.stack(info_list) - - def seed( - self, - seed: Optional[Union[int, List[int]]] = None, - ) -> List[Optional[List[int]]]: - """Set the seed for all environments. - - Accept ``None``, an int (which will extend ``i`` to - ``[i, i + 1, i + 2, ...]``) or a list. - - :return: The list of seeds used in this env's random number generators. - The first value in the list should be the "main" seed, or the value - which a reproducer pass to "seed". - """ - self._assert_is_not_closed() - seed_list: Union[List[None], List[int]] - if seed is None: - seed_list = [seed] * self.env_num - elif isinstance(seed, int): - seed_list = [seed + i for i in range(self.env_num)] - else: - seed_list = seed - return [w.seed(s) for w, s in zip(self.workers, seed_list)] - - def render(self, **kwargs: Any) -> List[Any]: - """Render all of the environments.""" - self._assert_is_not_closed() - if self.is_async and len(self.waiting_id) > 0: - raise RuntimeError( - f"Environments {self.waiting_id} are still stepping, cannot " - "render them now." - ) - return [w.render(**kwargs) for w in self.workers] - - def close(self) -> None: - """Close all of the environments. - - This function will be called only once (if not, it will be called during - garbage collected). This way, ``close`` of all workers can be assured. - """ - self._assert_is_not_closed() - for w in self.workers: - w.close() - self.is_closed = True - - -class DummyVectorEnv(BaseVectorEnv): - """Dummy vectorized environment wrapper, implemented in for-loop. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - super().__init__(env_fns, DummyEnvWorker, **kwargs) - - -class SubprocVectorEnv(BaseVectorEnv): - """Vectorized environment wrapper based on subprocess. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - - def worker_fn(fn: Callable[[], gym.Env]) -> SubprocEnvWorker: - return SubprocEnvWorker(fn, share_memory=False) - - super().__init__(env_fns, worker_fn, **kwargs) - - -class ShmemVectorEnv(BaseVectorEnv): - """Optimized SubprocVectorEnv with shared buffers to exchange observations. - - ShmemVectorEnv has exactly the same API as SubprocVectorEnv. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - - def worker_fn(fn: Callable[[], gym.Env]) -> SubprocEnvWorker: - return SubprocEnvWorker(fn, share_memory=True) - - super().__init__(env_fns, worker_fn, **kwargs) - - -class RayVectorEnv(BaseVectorEnv): - """Vectorized environment wrapper based on ray. - - This is a choice to run distributed environments in a cluster. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - try: - import ray - except ImportError as exception: - raise ImportError( - "Please install ray to support RayVectorEnv: pip install ray" - ) from exception - if not ray.is_initialized(): - ray.init() - super().__init__(env_fns, RayEnvWorker, **kwargs) diff --git a/workspace/compariosn/tianshou_env/worker/__init__.py b/workspace/compariosn/tianshou_env/worker/__init__.py deleted file mode 100644 index 1b1f37510..000000000 --- a/workspace/compariosn/tianshou_env/worker/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from tianshou.env.worker.base import EnvWorker -from tianshou.env.worker.dummy import DummyEnvWorker -from tianshou.env.worker.ray import RayEnvWorker -from tianshou.env.worker.subproc import SubprocEnvWorker - -__all__ = [ - "EnvWorker", - "DummyEnvWorker", - "SubprocEnvWorker", - "RayEnvWorker", -] diff --git a/workspace/compariosn/tianshou_env/worker/base.py b/workspace/compariosn/tianshou_env/worker/base.py deleted file mode 100644 index 773d56bce..000000000 --- a/workspace/compariosn/tianshou_env/worker/base.py +++ /dev/null @@ -1,106 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any, Callable, List, Optional, Tuple, Union - -import gymnasium as gym -import numpy as np - -from tianshou.env.utils import gym_new_venv_step_type -from tianshou.utils import deprecation - - -class EnvWorker(ABC): - """An abstract worker for an environment.""" - - def __init__(self, env_fn: Callable[[], gym.Env]) -> None: - self._env_fn = env_fn - self.is_closed = False - self.result: Union[gym_new_venv_step_type, Tuple[np.ndarray, dict]] - self.action_space = self.get_env_attr("action_space") # noqa: B009 - self.is_reset = False - - @abstractmethod - def get_env_attr(self, key: str) -> Any: - pass - - @abstractmethod - def set_env_attr(self, key: str, value: Any) -> None: - pass - - def send(self, action: Optional[np.ndarray]) -> None: - """Send action signal to low-level worker. - - When action is None, it indicates sending "reset" signal; otherwise - it indicates "step" signal. The paired return value from "recv" - function is determined by such kind of different signal. - """ - if hasattr(self, "send_action"): - deprecation( - "send_action will soon be deprecated. " - "Please use send and recv for your own EnvWorker." - ) - if action is None: - self.is_reset = True - self.result = self.reset() - else: - self.is_reset = False - self.send_action(action) - - def recv( - self - ) -> Union[gym_new_venv_step_type, Tuple[np.ndarray, dict], ]: # noqa:E125 - """Receive result from low-level worker. - - If the last "send" function sends a NULL action, it only returns a - single observation; otherwise it returns a tuple of (obs, rew, done, - info) or (obs, rew, terminated, truncated, info), based on whether - the environment is using the old step API or the new one. - """ - if hasattr(self, "get_result"): - deprecation( - "get_result will soon be deprecated. " - "Please use send and recv for your own EnvWorker." - ) - if not self.is_reset: - self.result = self.get_result() - return self.result - - @abstractmethod - def reset(self, **kwargs: Any) -> Tuple[np.ndarray, dict]: - pass - - def step(self, action: np.ndarray) -> gym_new_venv_step_type: - """Perform one timestep of the environment's dynamic. - - "send" and "recv" are coupled in sync simulation, so users only call - "step" function. But they can be called separately in async - simulation, i.e. someone calls "send" first, and calls "recv" later. - """ - self.send(action) - return self.recv() # type: ignore - - @staticmethod - def wait( - workers: List["EnvWorker"], - wait_num: int, - timeout: Optional[float] = None - ) -> List["EnvWorker"]: - """Given a list of workers, return those ready ones.""" - raise NotImplementedError - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - return self.action_space.seed(seed) # issue 299 - - @abstractmethod - def render(self, **kwargs: Any) -> Any: - """Render the environment.""" - pass - - @abstractmethod - def close_env(self) -> None: - pass - - def close(self) -> None: - if self.is_closed: - return None - self.is_closed = True - self.close_env() diff --git a/workspace/compariosn/tianshou_env/worker/dummy.py b/workspace/compariosn/tianshou_env/worker/dummy.py deleted file mode 100644 index 4eec4e0fa..000000000 --- a/workspace/compariosn/tianshou_env/worker/dummy.py +++ /dev/null @@ -1,52 +0,0 @@ -from typing import Any, Callable, List, Optional, Tuple - -import gymnasium as gym -import numpy as np - -from tianshou.env.worker import EnvWorker - - -class DummyEnvWorker(EnvWorker): - """Dummy worker used in sequential vector environments.""" - - def __init__(self, env_fn: Callable[[], gym.Env]) -> None: - self.env = env_fn() - super().__init__(env_fn) - - def get_env_attr(self, key: str) -> Any: - return getattr(self.env, key) - - def set_env_attr(self, key: str, value: Any) -> None: - setattr(self.env.unwrapped, key, value) - - def reset(self, **kwargs: Any) -> Tuple[np.ndarray, dict]: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - return self.env.reset(**kwargs) - - @staticmethod - def wait( # type: ignore - workers: List["DummyEnvWorker"], wait_num: int, timeout: Optional[float] = None - ) -> List["DummyEnvWorker"]: - # Sequential EnvWorker objects are always ready - return workers - - def send(self, action: Optional[np.ndarray], **kwargs: Any) -> None: - if action is None: - self.result = self.env.reset(**kwargs) - else: - self.result = self.env.step(action) # type: ignore - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - super().seed(seed) - try: - return self.env.seed(seed) # type: ignore - except (AttributeError, NotImplementedError): - self.env.reset(seed=seed) - return [seed] # type: ignore - - def render(self, **kwargs: Any) -> Any: - return self.env.render(**kwargs) - - def close_env(self) -> None: - self.env.close() diff --git a/workspace/compariosn/tianshou_env/worker/ray.py b/workspace/compariosn/tianshou_env/worker/ray.py deleted file mode 100644 index fe2b8fe8d..000000000 --- a/workspace/compariosn/tianshou_env/worker/ray.py +++ /dev/null @@ -1,74 +0,0 @@ -from typing import Any, Callable, List, Optional - -import gymnasium as gym -import numpy as np - -from tianshou.env.utils import gym_new_venv_step_type -from tianshou.env.worker import EnvWorker - -try: - import ray -except ImportError: - pass - - -class _SetAttrWrapper(gym.Wrapper): - - def set_env_attr(self, key: str, value: Any) -> None: - setattr(self.env.unwrapped, key, value) - - def get_env_attr(self, key: str) -> Any: - return getattr(self.env, key) - - -class RayEnvWorker(EnvWorker): - """Ray worker used in RayVectorEnv.""" - - def __init__(self, env_fn: Callable[[], gym.Env]) -> None: - self.env = ray.remote(_SetAttrWrapper).options( # type: ignore - num_cpus=0 - ).remote(env_fn()) - super().__init__(env_fn) - - def get_env_attr(self, key: str) -> Any: - return ray.get(self.env.get_env_attr.remote(key)) - - def set_env_attr(self, key: str, value: Any) -> None: - ray.get(self.env.set_env_attr.remote(key, value)) - - def reset(self, **kwargs: Any) -> Any: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - return ray.get(self.env.reset.remote(**kwargs)) - - @staticmethod - def wait( # type: ignore - workers: List["RayEnvWorker"], wait_num: int, timeout: Optional[float] = None - ) -> List["RayEnvWorker"]: - results = [x.result for x in workers] - ready_results, _ = ray.wait(results, num_returns=wait_num, timeout=timeout) - return [workers[results.index(result)] for result in ready_results] - - def send(self, action: Optional[np.ndarray], **kwargs: Any) -> None: - # self.result is actually a handle - if action is None: - self.result = self.env.reset.remote(**kwargs) - else: - self.result = self.env.step.remote(action) - - def recv(self) -> gym_new_venv_step_type: - return ray.get(self.result) # type: ignore - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - super().seed(seed) - try: - return ray.get(self.env.seed.remote(seed)) - except (AttributeError, NotImplementedError): - self.env.reset.remote(seed=seed) - return None - - def render(self, **kwargs: Any) -> Any: - return ray.get(self.env.render.remote(**kwargs)) - - def close_env(self) -> None: - ray.get(self.env.close.remote()) diff --git a/workspace/compariosn/tianshou_env/worker/subproc.py b/workspace/compariosn/tianshou_env/worker/subproc.py deleted file mode 100644 index 68f34e687..000000000 --- a/workspace/compariosn/tianshou_env/worker/subproc.py +++ /dev/null @@ -1,256 +0,0 @@ -import ctypes -import time -from collections import OrderedDict -from multiprocessing import Array, Pipe, connection -from multiprocessing.context import Process -from typing import Any, Callable, List, Optional, Tuple, Union - -import gymnasium as gym -import numpy as np - -from tianshou.env.utils import CloudpickleWrapper, gym_new_venv_step_type -from tianshou.env.worker import EnvWorker - -_NP_TO_CT = { - np.bool_: ctypes.c_bool, - np.uint8: ctypes.c_uint8, - np.uint16: ctypes.c_uint16, - np.uint32: ctypes.c_uint32, - np.uint64: ctypes.c_uint64, - np.int8: ctypes.c_int8, - np.int16: ctypes.c_int16, - np.int32: ctypes.c_int32, - np.int64: ctypes.c_int64, - np.float32: ctypes.c_float, - np.float64: ctypes.c_double, -} - - -class ShArray: - """Wrapper of multiprocessing Array.""" - - def __init__(self, dtype: np.generic, shape: Tuple[int]) -> None: - self.arr = Array(_NP_TO_CT[dtype.type], int(np.prod(shape))) # type: ignore - self.dtype = dtype - self.shape = shape - - def save(self, ndarray: np.ndarray) -> None: - assert isinstance(ndarray, np.ndarray) - dst = self.arr.get_obj() - dst_np = np.frombuffer(dst, - dtype=self.dtype).reshape(self.shape) # type: ignore - np.copyto(dst_np, ndarray) - - def get(self) -> np.ndarray: - obj = self.arr.get_obj() - return np.frombuffer(obj, dtype=self.dtype).reshape(self.shape) # type: ignore - - -def _setup_buf(space: gym.Space) -> Union[dict, tuple, ShArray]: - if isinstance(space, gym.spaces.Dict): - assert isinstance(space.spaces, OrderedDict) - return {k: _setup_buf(v) for k, v in space.spaces.items()} - elif isinstance(space, gym.spaces.Tuple): - assert isinstance(space.spaces, tuple) - return tuple([_setup_buf(t) for t in space.spaces]) - else: - return ShArray(space.dtype, space.shape) # type: ignore - - -def _worker( - parent: connection.Connection, - p: connection.Connection, - env_fn_wrapper: CloudpickleWrapper, - obs_bufs: Optional[Union[dict, tuple, ShArray]] = None, -) -> None: - - def _encode_obs( - obs: Union[dict, tuple, np.ndarray], buffer: Union[dict, tuple, ShArray] - ) -> None: - if isinstance(obs, np.ndarray) and isinstance(buffer, ShArray): - buffer.save(obs) - elif isinstance(obs, tuple) and isinstance(buffer, tuple): - for o, b in zip(obs, buffer): - _encode_obs(o, b) - elif isinstance(obs, dict) and isinstance(buffer, dict): - for k in obs.keys(): - _encode_obs(obs[k], buffer[k]) - return None - - parent.close() - env = env_fn_wrapper.data() - try: - while True: - try: - cmd, data = p.recv() - except EOFError: # the pipe has been closed - p.close() - break - if cmd == "step": - env_return = env.step(data) - if obs_bufs is not None: - _encode_obs(env_return[0], obs_bufs) - env_return = (None, *env_return[1:]) - p.send(env_return) - elif cmd == "reset": - obs, info = env.reset(**data) - if obs_bufs is not None: - _encode_obs(obs, obs_bufs) - obs = None - p.send((obs, info)) - elif cmd == "close": - p.send(env.close()) - p.close() - break - elif cmd == "render": - p.send(env.render(**data) if hasattr(env, "render") else None) - elif cmd == "seed": - if hasattr(env, "seed"): - p.send(env.seed(data)) - else: - env.reset(seed=data) - p.send(None) - elif cmd == "getattr": - p.send(getattr(env, data) if hasattr(env, data) else None) - elif cmd == "setattr": - setattr(env.unwrapped, data["key"], data["value"]) - else: - p.close() - raise NotImplementedError - except KeyboardInterrupt: - p.close() - - -class SubprocEnvWorker(EnvWorker): - """Subprocess worker used in SubprocVectorEnv and ShmemVectorEnv.""" - - def __init__( - self, env_fn: Callable[[], gym.Env], share_memory: bool = False - ) -> None: - self.parent_remote, self.child_remote = Pipe() - self.share_memory = share_memory - self.buffer: Optional[Union[dict, tuple, ShArray]] = None - if self.share_memory: - dummy = env_fn() - obs_space = dummy.observation_space - dummy.close() - del dummy - self.buffer = _setup_buf(obs_space) - args = ( - self.parent_remote, - self.child_remote, - CloudpickleWrapper(env_fn), - self.buffer, - ) - self.process = Process(target=_worker, args=args, daemon=True) - self.process.start() - self.child_remote.close() - super().__init__(env_fn) - - def get_env_attr(self, key: str) -> Any: - self.parent_remote.send(["getattr", key]) - return self.parent_remote.recv() - - def set_env_attr(self, key: str, value: Any) -> None: - self.parent_remote.send(["setattr", {"key": key, "value": value}]) - - def _decode_obs(self) -> Union[dict, tuple, np.ndarray]: - - def decode_obs( - buffer: Optional[Union[dict, tuple, ShArray]] - ) -> Union[dict, tuple, np.ndarray]: - if isinstance(buffer, ShArray): - return buffer.get() - elif isinstance(buffer, tuple): - return tuple([decode_obs(b) for b in buffer]) - elif isinstance(buffer, dict): - return {k: decode_obs(v) for k, v in buffer.items()} - else: - raise NotImplementedError - - return decode_obs(self.buffer) - - @staticmethod - def wait( # type: ignore - workers: List["SubprocEnvWorker"], - wait_num: int, - timeout: Optional[float] = None, - ) -> List["SubprocEnvWorker"]: - remain_conns = conns = [x.parent_remote for x in workers] - ready_conns: List[connection.Connection] = [] - remain_time, t1 = timeout, time.time() - while len(remain_conns) > 0 and len(ready_conns) < wait_num: - if timeout: - remain_time = timeout - (time.time() - t1) - if remain_time <= 0: - break - # connection.wait hangs if the list is empty - new_ready_conns = connection.wait(remain_conns, timeout=remain_time) - ready_conns.extend(new_ready_conns) # type: ignore - remain_conns = [conn for conn in remain_conns if conn not in ready_conns] - return [workers[conns.index(con)] for con in ready_conns] - - def send(self, action: Optional[np.ndarray], **kwargs: Any) -> None: - if action is None: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - self.parent_remote.send(["reset", kwargs]) - else: - self.parent_remote.send(["step", action]) - - def recv( - self - ) -> Union[gym_new_venv_step_type, Tuple[np.ndarray, dict]]: # noqa:E125 - result = self.parent_remote.recv() - if isinstance(result, tuple): - if len(result) == 2: - obs, info = result - if self.share_memory: - obs = self._decode_obs() - return obs, info - obs = result[0] - if self.share_memory: - obs = self._decode_obs() - return (obs, *result[1:]) # type: ignore - else: - obs = result - if self.share_memory: - obs = self._decode_obs() - return obs - - def reset(self, **kwargs: Any) -> Tuple[np.ndarray, dict]: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - self.parent_remote.send(["reset", kwargs]) - - result = self.parent_remote.recv() - if isinstance(result, tuple): - obs, info = result - if self.share_memory: - obs = self._decode_obs() - return obs, info - else: - obs = result - if self.share_memory: - obs = self._decode_obs() - return obs - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - super().seed(seed) - self.parent_remote.send(["seed", seed]) - return self.parent_remote.recv() - - def render(self, **kwargs: Any) -> Any: - self.parent_remote.send(["render", kwargs]) - return self.parent_remote.recv() - - def close_env(self) -> None: - try: - self.parent_remote.send(["close", None]) - # mp may be deleted so it may raise AttributeError - self.parent_remote.recv() - self.process.join() - except (BrokenPipeError, EOFError, AttributeError): - pass - # ensure the subproc is terminated - self.process.terminate()