From 06cc6c89a092a09149441c0f358fa92ee7961478 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 09:19:59 +0900 Subject: [PATCH 01/19] . --- workspace/speed_benchmark.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/workspace/speed_benchmark.py b/workspace/speed_benchmark.py index c10f287ed..ceafa79d2 100644 --- a/workspace/speed_benchmark.py +++ b/workspace/speed_benchmark.py @@ -1,4 +1,5 @@ import time +import json import jax import pgx from pgx.utils import act_randomly @@ -37,20 +38,23 @@ def benchmark(env_id: pgx.EnvId, batch_size, num_steps=(2 ** 12) * 1000): state = step(state, action) te = time.time() - return f"{num_steps / (te - ts):.05f}" + return num_steps / (te- ts) -N = (2 ** 12) * 100 -print(f"Total # of steps: {N}") +N = (2 ** 12) * 1 +# print(f"Total # of steps: {N}") bs_list = [2 ** i for i in range(5, 13)] -print("| env_id |" + "|".join([str(bs) for bs in bs_list]) + "|") -print("|:---:|" + "|".join([":---:" for bs in bs_list]) + "|") +# print("| env_id |" + "|".join([str(bs) for bs in bs_list]) + "|") +# print("|:---:|" + "|".join([":---:" for bs in bs_list]) + "|") +d = {} for env_id in get_args(pgx.EnvId): s = f"|{env_id}|" - for bs in tqdm(bs_list, leave=False): - s += benchmark(env_id, bs, N) + for bs in bs_list: + n_per_sec = benchmark(env_id, bs, N) + s += f"{n_per_sec:.05f}" s += "|" - print(s) + print(json.dumps({"game": "/".join(env_id.split("/")[:-1]) ,"library": "pgx", "total_steps": N, "steps/sec": n_per_sec, "batch_size": bs})) + # print(s) """ @@ -61,4 +65,4 @@ def benchmark(env_id: pgx.EnvId, batch_size, num_steps=(2 ** 12) * 1000): |go-19x19/v0|15146.45769|29891.62027|58064.94882|108400.19704|173638.40814|286740.95368|379331.88909|449555.32632| |shogi/v0|21047.48879|42130.05279|82988.81210|175415.01266|259940.79393|290410.20642|299800.69880|308552.26434| |minatar/asterix/v0|13066.68075|25836.00751|52134.46018|102929.03752|205880.59846|384825.85566|806843.53100|1553951.76960| -""" \ No newline at end of file +""" From 576ab3a0ca5c832c83285017aa77e70e6cc83f79 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 09:21:13 +0900 Subject: [PATCH 02/19] mv --- workspace/speed_benchmark.py => speed_benchmark/pgx.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename workspace/speed_benchmark.py => speed_benchmark/pgx.py (100%) diff --git a/workspace/speed_benchmark.py b/speed_benchmark/pgx.py similarity index 100% rename from workspace/speed_benchmark.py rename to speed_benchmark/pgx.py From 0e54f0615da305aedd7460ff9e7cf2731df514c8 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 09:23:56 +0900 Subject: [PATCH 03/19] . --- speed_benchmark/{pgx.py => run_pgx.py} | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) rename speed_benchmark/{pgx.py => run_pgx.py} (87%) diff --git a/speed_benchmark/pgx.py b/speed_benchmark/run_pgx.py similarity index 87% rename from speed_benchmark/pgx.py rename to speed_benchmark/run_pgx.py index ceafa79d2..96ce502a2 100644 --- a/speed_benchmark/pgx.py +++ b/speed_benchmark/run_pgx.py @@ -38,7 +38,7 @@ def benchmark(env_id: pgx.EnvId, batch_size, num_steps=(2 ** 12) * 1000): state = step(state, action) te = time.time() - return num_steps / (te- ts) + return num_steps, te - ts N = (2 ** 12) * 1 @@ -48,12 +48,13 @@ def benchmark(env_id: pgx.EnvId, batch_size, num_steps=(2 ** 12) * 1000): # print("|:---:|" + "|".join([":---:" for bs in bs_list]) + "|") d = {} for env_id in get_args(pgx.EnvId): - s = f"|{env_id}|" + # s = f"|{env_id}|" for bs in bs_list: - n_per_sec = benchmark(env_id, bs, N) - s += f"{n_per_sec:.05f}" - s += "|" - print(json.dumps({"game": "/".join(env_id.split("/")[:-1]) ,"library": "pgx", "total_steps": N, "steps/sec": n_per_sec, "batch_size": bs})) + num_steps, sec = benchmark(env_id, bs, N) + # s += f"{n_per_sec:.05f}" + # s += "|" + print(json.dumps({"game": "/".join(env_id.split("/")[:-1]), "library": "pgx", + "total_steps": num_steps, "total_sec": sec, "steps/sec": num_steps / sec, "batch_size": bs})) # print(s) From b11870579c5adb67be916f786dd2998dcfdfbde1 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 20:43:31 +0900 Subject: [PATCH 04/19] mv files --- {workspace/compariosn => speed_benchmark}/compare.sh | 0 {workspace/compariosn => speed_benchmark}/compare_for_loop.py | 0 {workspace/compariosn => speed_benchmark}/compare_subproc.py | 0 {workspace/compariosn => speed_benchmark}/comparison.py | 0 {workspace/compariosn => speed_benchmark}/install.sh | 0 {workspace/compariosn => speed_benchmark}/open_spiel_forloop.py | 0 {workspace/compariosn => speed_benchmark}/open_spiel_subproc.py | 0 {workspace/compariosn => speed_benchmark}/petting_zoo_forloop.py | 0 {workspace/compariosn => speed_benchmark}/petting_zoo_subproc.py | 0 .../compariosn => speed_benchmark}/tianshou_env/__init__.py | 0 .../compariosn => speed_benchmark}/tianshou_env/gym_wrappers.py | 0 .../compariosn => speed_benchmark}/tianshou_env/pettingzoo_env.py | 0 .../tianshou_env/test_tianshou_subproc.py | 0 {workspace/compariosn => speed_benchmark}/tianshou_env/utils.py | 0 .../compariosn => speed_benchmark}/tianshou_env/venv_wrappers.py | 0 {workspace/compariosn => speed_benchmark}/tianshou_env/venvs.py | 0 .../tianshou_env/worker/__init__.py | 0 .../compariosn => speed_benchmark}/tianshou_env/worker/base.py | 0 .../compariosn => speed_benchmark}/tianshou_env/worker/dummy.py | 0 .../compariosn => speed_benchmark}/tianshou_env/worker/ray.py | 0 .../compariosn => speed_benchmark}/tianshou_env/worker/subproc.py | 0 {workspace/compariosn => speed_benchmark}/vector_env.py | 0 22 files changed, 0 insertions(+), 0 deletions(-) rename {workspace/compariosn => speed_benchmark}/compare.sh (100%) rename {workspace/compariosn => speed_benchmark}/compare_for_loop.py (100%) rename {workspace/compariosn => speed_benchmark}/compare_subproc.py (100%) rename {workspace/compariosn => speed_benchmark}/comparison.py (100%) rename {workspace/compariosn => speed_benchmark}/install.sh (100%) rename {workspace/compariosn => speed_benchmark}/open_spiel_forloop.py (100%) rename {workspace/compariosn => speed_benchmark}/open_spiel_subproc.py (100%) rename {workspace/compariosn => speed_benchmark}/petting_zoo_forloop.py (100%) rename {workspace/compariosn => speed_benchmark}/petting_zoo_subproc.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/__init__.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/gym_wrappers.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/pettingzoo_env.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/test_tianshou_subproc.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/utils.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/venv_wrappers.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/venvs.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/worker/__init__.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/worker/base.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/worker/dummy.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/worker/ray.py (100%) rename {workspace/compariosn => speed_benchmark}/tianshou_env/worker/subproc.py (100%) rename {workspace/compariosn => speed_benchmark}/vector_env.py (100%) diff --git a/workspace/compariosn/compare.sh b/speed_benchmark/compare.sh similarity index 100% rename from workspace/compariosn/compare.sh rename to speed_benchmark/compare.sh diff --git a/workspace/compariosn/compare_for_loop.py b/speed_benchmark/compare_for_loop.py similarity index 100% rename from workspace/compariosn/compare_for_loop.py rename to speed_benchmark/compare_for_loop.py diff --git a/workspace/compariosn/compare_subproc.py b/speed_benchmark/compare_subproc.py similarity index 100% rename from workspace/compariosn/compare_subproc.py rename to speed_benchmark/compare_subproc.py diff --git a/workspace/compariosn/comparison.py b/speed_benchmark/comparison.py similarity index 100% rename from workspace/compariosn/comparison.py rename to speed_benchmark/comparison.py diff --git a/workspace/compariosn/install.sh b/speed_benchmark/install.sh similarity index 100% rename from workspace/compariosn/install.sh rename to speed_benchmark/install.sh diff --git a/workspace/compariosn/open_spiel_forloop.py b/speed_benchmark/open_spiel_forloop.py similarity index 100% rename from workspace/compariosn/open_spiel_forloop.py rename to speed_benchmark/open_spiel_forloop.py diff --git a/workspace/compariosn/open_spiel_subproc.py b/speed_benchmark/open_spiel_subproc.py similarity index 100% rename from workspace/compariosn/open_spiel_subproc.py rename to speed_benchmark/open_spiel_subproc.py diff --git a/workspace/compariosn/petting_zoo_forloop.py b/speed_benchmark/petting_zoo_forloop.py similarity index 100% rename from workspace/compariosn/petting_zoo_forloop.py rename to speed_benchmark/petting_zoo_forloop.py diff --git a/workspace/compariosn/petting_zoo_subproc.py b/speed_benchmark/petting_zoo_subproc.py similarity index 100% rename from workspace/compariosn/petting_zoo_subproc.py rename to speed_benchmark/petting_zoo_subproc.py diff --git a/workspace/compariosn/tianshou_env/__init__.py b/speed_benchmark/tianshou_env/__init__.py similarity index 100% rename from workspace/compariosn/tianshou_env/__init__.py rename to speed_benchmark/tianshou_env/__init__.py diff --git a/workspace/compariosn/tianshou_env/gym_wrappers.py b/speed_benchmark/tianshou_env/gym_wrappers.py similarity index 100% rename from workspace/compariosn/tianshou_env/gym_wrappers.py rename to speed_benchmark/tianshou_env/gym_wrappers.py diff --git a/workspace/compariosn/tianshou_env/pettingzoo_env.py b/speed_benchmark/tianshou_env/pettingzoo_env.py similarity index 100% rename from workspace/compariosn/tianshou_env/pettingzoo_env.py rename to speed_benchmark/tianshou_env/pettingzoo_env.py diff --git a/workspace/compariosn/tianshou_env/test_tianshou_subproc.py b/speed_benchmark/tianshou_env/test_tianshou_subproc.py similarity index 100% rename from workspace/compariosn/tianshou_env/test_tianshou_subproc.py rename to speed_benchmark/tianshou_env/test_tianshou_subproc.py diff --git a/workspace/compariosn/tianshou_env/utils.py b/speed_benchmark/tianshou_env/utils.py similarity index 100% rename from workspace/compariosn/tianshou_env/utils.py rename to speed_benchmark/tianshou_env/utils.py diff --git a/workspace/compariosn/tianshou_env/venv_wrappers.py b/speed_benchmark/tianshou_env/venv_wrappers.py similarity index 100% rename from workspace/compariosn/tianshou_env/venv_wrappers.py rename to speed_benchmark/tianshou_env/venv_wrappers.py diff --git a/workspace/compariosn/tianshou_env/venvs.py b/speed_benchmark/tianshou_env/venvs.py similarity index 100% rename from workspace/compariosn/tianshou_env/venvs.py rename to speed_benchmark/tianshou_env/venvs.py diff --git a/workspace/compariosn/tianshou_env/worker/__init__.py b/speed_benchmark/tianshou_env/worker/__init__.py similarity index 100% rename from workspace/compariosn/tianshou_env/worker/__init__.py rename to speed_benchmark/tianshou_env/worker/__init__.py diff --git a/workspace/compariosn/tianshou_env/worker/base.py b/speed_benchmark/tianshou_env/worker/base.py similarity index 100% rename from workspace/compariosn/tianshou_env/worker/base.py rename to speed_benchmark/tianshou_env/worker/base.py diff --git a/workspace/compariosn/tianshou_env/worker/dummy.py b/speed_benchmark/tianshou_env/worker/dummy.py similarity index 100% rename from workspace/compariosn/tianshou_env/worker/dummy.py rename to speed_benchmark/tianshou_env/worker/dummy.py diff --git a/workspace/compariosn/tianshou_env/worker/ray.py b/speed_benchmark/tianshou_env/worker/ray.py similarity index 100% rename from workspace/compariosn/tianshou_env/worker/ray.py rename to speed_benchmark/tianshou_env/worker/ray.py diff --git a/workspace/compariosn/tianshou_env/worker/subproc.py b/speed_benchmark/tianshou_env/worker/subproc.py similarity index 100% rename from workspace/compariosn/tianshou_env/worker/subproc.py rename to speed_benchmark/tianshou_env/worker/subproc.py diff --git a/workspace/compariosn/vector_env.py b/speed_benchmark/vector_env.py similarity index 100% rename from workspace/compariosn/vector_env.py rename to speed_benchmark/vector_env.py From e45a780110e3bfd171f69341052eaa952d43200f Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 20:44:31 +0900 Subject: [PATCH 05/19] fix --- speed_benchmark/install.sh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/speed_benchmark/install.sh b/speed_benchmark/install.sh index dd0b16f55..d2fdf5b8a 100755 --- a/speed_benchmark/install.sh +++ b/speed_benchmark/install.sh @@ -1,3 +1 @@ -pip install pettingzoo tianshou -python3 -m pip install open_spiel -pip install pygame cloudpickle \ No newline at end of file +python3 -m pip install pettingzoo open_spiel tianshou pygame cloudpickle chess From 89aa8d7bd3a011c7a3892bf077bcc3946cd5fadf Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 21:10:14 +0900 Subject: [PATCH 06/19] refactor --- speed_benchmark/open_spiel_forloop.py | 102 +++++++++++++++++++++++--- 1 file changed, 90 insertions(+), 12 deletions(-) diff --git a/speed_benchmark/open_spiel_forloop.py b/speed_benchmark/open_spiel_forloop.py index 143f1b9e2..2ccad4d56 100644 --- a/speed_benchmark/open_spiel_forloop.py +++ b/speed_benchmark/open_spiel_forloop.py @@ -1,13 +1,90 @@ -from vector_env import SyncVectorEnv import argparse +import json +import collections import time import numpy as np -import collections +import pyspiel +from open_spiel.python.rl_environment import Environment, ChanceEventSampler + + +# SyncVectorEnv is copied from +# https://github.com/deepmind/open_spiel/blob/master/open_spiel/python/vector_env.py +# +# Copyright 2022 DeepMind Technologies Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +class SyncVectorEnv(object): + """A vectorized RL Environment. + This environment is synchronized - games do not execute in parallel. Speedups + are realized by calling models on many game states simultaneously. + """ + + def __init__(self, envs): + if not isinstance(envs, list): + raise ValueError( + "Need to call this with a list of rl_environment.Environment objects") + self.envs = envs + + def __len__(self): + return len(self.envs) + + def observation_spec(self): + return self.envs[0].observation_spec() + + @property + def num_players(self): + return self.envs[0].num_players + + def step(self, step_outputs, reset_if_done=False): + """Apply one step. + Args: + step_outputs: the step outputs + reset_if_done: if True, automatically reset the environment + when the epsiode ends + Returns: + time_steps: the time steps, + reward: the reward + done: done flag + unreset_time_steps: unreset time steps + """ + time_steps = [ + self.envs[i].step([step_outputs[i].action]) + for i in range(len(self.envs)) + ] + reward = [step.rewards for step in time_steps] + done = [step.last() for step in time_steps] + unreset_time_steps = time_steps # Copy these because you may want to look + # at the unreset versions to extract + # information from them + + if reset_if_done: + time_steps = self.reset(envs_to_reset=done) + + return time_steps, reward, done, unreset_time_steps + + def reset(self, envs_to_reset=None): + if envs_to_reset is None: + envs_to_reset = [True for _ in range(len(self.envs))] + + time_steps = [ + self.envs[i].reset() + if envs_to_reset[i] else self.envs[i].get_time_step() + for i in range(len(self.envs)) + ] + return time_steps def make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler def gen_env(): game = pyspiel.load_game(env_name) return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) @@ -15,20 +92,20 @@ def gen_env(): def make_env(env_name: str, n_envs: int, seed: int) -> SyncVectorEnv: - return SyncVectorEnv([make_single_env(env_name, seed) for i in range(n_envs)]) + return SyncVectorEnv([make_single_env(env_name, seed + i) for i in range(n_envs)]) def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): # random play for open spiel StepOutput = collections.namedtuple("step_output", ["action"]) time_step = env.reset() + assert len(env.envs) == len(time_step) # ensure parallerization rng = np.random.default_rng() step_num = 0 while step_num < n_steps_lim: - legal_actions = np.array([ts.observations["legal_actions"][ts.observations["current_player"]] for ts in time_step]) - assert len(env.envs) == len(legal_actions) # ensure parallerization - action = [rng.choice(legal_actions[i]) for i in range(len(legal_actions))] - step_outputs = [StepOutput(action=a) for a in action] + # See https://github.com/deepmind/open_spiel/blob/master/open_spiel/python/examples/rl_example.py + actions = [rng.choice(ts.observations["legal_actions"][ts.observations["current_player"]]) for ts in time_step] + step_outputs = [StepOutput(action=action) for action in actions] time_step, reward, done, unreset_time_steps = env.step(step_outputs, reset_if_done=True) step_num += batch_size return step_num @@ -39,11 +116,12 @@ def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): parser.add_argument("env_name") parser.add_argument("batch_size", type=int) parser.add_argument("n_steps_lim", type=int) - parser.add_argument("--seed", default=100, type=bool) + parser.add_argument("--seed", default=0, type=int) args = parser.parse_args() assert args.n_steps_lim % args.batch_size == 0 env = make_env(args.env_name, args.batch_size, args.seed) time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) + steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() - print((step_num)/(time_end-time_sta), time_end-time_sta) + sec = time_end-time_sta + json.dumps({"game": args.env_name, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec}) From 8d988fbe9613e3766617c75250731aa274e69044 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 22:31:48 +0900 Subject: [PATCH 07/19] tidy --- speed_benchmark/petting_zoo_forloop.py | 32 ++++++++++++++++---------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/speed_benchmark/petting_zoo_forloop.py b/speed_benchmark/petting_zoo_forloop.py index 6b97b4a26..7e46ff29e 100644 --- a/speed_benchmark/petting_zoo_forloop.py +++ b/speed_benchmark/petting_zoo_forloop.py @@ -1,10 +1,11 @@ import argparse import time +import json import numpy as np import collections from tianshou.env import DummyVectorEnv from tianshou.env.pettingzoo_env import PettingZooEnv -from pettingzoo.classic.tictactoe import tictactoe + class AutoResetPettingZooEnv(PettingZooEnv): def __init__(self, env): @@ -18,21 +19,27 @@ def step(self, action): def make_env(env_name, n_envs): - from pettingzoo.classic.go import go - #from pettingzoo.classic import chess_v5 + def get_go_env(): + from pettingzoo.classic.go import go return AutoResetPettingZooEnv(go.env()) + def get_tictactoe_env(): + from pettingzoo.classic.tictactoe import tictactoe return AutoResetPettingZooEnv(tictactoe.env()) + + def get_chess_env(): + from pettingzoo.classic.chess import chess + return AutoResetPettingZooEnv(chess.env()) + if env_name == "go": return DummyVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "tictactoe": + elif env_name == "tic_tac_toe": return DummyVectorEnv([get_tictactoe_env for _ in range(n_envs)]) elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") + return DummyVectorEnv([get_chess_env for _ in range(n_envs)]) else: - raise ValueError("no such environment in petting zoo") + raise ValueError(f"wrong argument: {env_name}") def random_play(env: DummyVectorEnv, n_steps_lim: int, batch_size: int) -> int: @@ -40,11 +47,10 @@ def random_play(env: DummyVectorEnv, n_steps_lim: int, batch_size: int) -> int: step_num = 0 rng = np.random.default_rng() observation = env.reset() - terminated = np.zeros(len(env._env_fns)) while step_num < n_steps_lim: assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly + legal_action_mask = np.array([observation[i]["mask"] for i in range(batch_size)]) + action = [rng.choice(np.where(legal_action_mask[i])[0]) for i in range(batch_size)] # chose action randomly observation, reward, terminated, _, _ = env.step(action) step_num += batch_size return step_num @@ -55,10 +61,12 @@ def random_play(env: DummyVectorEnv, n_steps_lim: int, batch_size: int) -> int: parser.add_argument("env_name") parser.add_argument("batch_size", type=int) parser.add_argument("n_steps_lim", type=int) + parser.add_argument("--seed", default=0, type=int) args = parser.parse_args() assert args.n_steps_lim % args.batch_size == 0 env = make_env(args.env_name, args.batch_size) time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) + steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() - print((step_num)/(time_end-time_sta)) \ No newline at end of file + sec = time_end - time_sta + print(json.dumps({"game": args.env_name, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) \ No newline at end of file From 20a26f73ae99e52b65d3ee94b8b82ca58fcf92ae Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 22:43:05 +0900 Subject: [PATCH 08/19] . --- speed_benchmark/open_spiel_forloop.py | 2 +- speed_benchmark/petting_zoo_forloop.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/speed_benchmark/open_spiel_forloop.py b/speed_benchmark/open_spiel_forloop.py index 2ccad4d56..d51e7e5d8 100644 --- a/speed_benchmark/open_spiel_forloop.py +++ b/speed_benchmark/open_spiel_forloop.py @@ -113,7 +113,7 @@ def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("env_name") + parser.add_argument("env_name") # go, chess backgammon tic_tac_toe parser.add_argument("batch_size", type=int) parser.add_argument("n_steps_lim", type=int) parser.add_argument("--seed", default=0, type=int) diff --git a/speed_benchmark/petting_zoo_forloop.py b/speed_benchmark/petting_zoo_forloop.py index 7e46ff29e..c8363f397 100644 --- a/speed_benchmark/petting_zoo_forloop.py +++ b/speed_benchmark/petting_zoo_forloop.py @@ -58,7 +58,7 @@ def random_play(env: DummyVectorEnv, n_steps_lim: int, batch_size: int) -> int: if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("env_name") + parser.add_argument("env_name") # go, chess, tic_tac_toe parser.add_argument("batch_size", type=int) parser.add_argument("n_steps_lim", type=int) parser.add_argument("--seed", default=0, type=int) From fc6d841642fc57ed73b1c76c078a7722707dd845 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 22:51:42 +0900 Subject: [PATCH 09/19] . --- speed_benchmark/petting_zoo_forloop.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/speed_benchmark/petting_zoo_forloop.py b/speed_benchmark/petting_zoo_forloop.py index c8363f397..c188df366 100644 --- a/speed_benchmark/petting_zoo_forloop.py +++ b/speed_benchmark/petting_zoo_forloop.py @@ -3,7 +3,6 @@ import json import numpy as np import collections -from tianshou.env import DummyVectorEnv from tianshou.env.pettingzoo_env import PettingZooEnv @@ -18,7 +17,7 @@ def step(self, action): return obs, reward, term, trunc, info -def make_env(env_name, n_envs): +def make_env(env_name, n_envs, vec_env): def get_go_env(): from pettingzoo.classic.go import go @@ -31,24 +30,29 @@ def get_tictactoe_env(): def get_chess_env(): from pettingzoo.classic.chess import chess return AutoResetPettingZooEnv(chess.env()) + + if vec_env == "for-loop": + from tianshou.env import DummyVectorEnv as VecEnv + elif vec_env == "subproc": + from tianshou.env import SubprocVectorEnv as VecEnv if env_name == "go": - return DummyVectorEnv([get_go_env for _ in range(n_envs)]) + env_fn = get_go_env elif env_name == "tic_tac_toe": - return DummyVectorEnv([get_tictactoe_env for _ in range(n_envs)]) + env_fn = get_tictactoe_env elif env_name == "chess": - return DummyVectorEnv([get_chess_env for _ in range(n_envs)]) - else: - raise ValueError(f"wrong argument: {env_name}") + env_fn = get_chess_env + + return VecEnv([env_fn for _ in range(n_envs)]) -def random_play(env: DummyVectorEnv, n_steps_lim: int, batch_size: int) -> int: +def random_play(env, n_steps_lim: int, batch_size: int) -> int: # petting zooのgo環境でrandom gaentを終局まで動かす. step_num = 0 rng = np.random.default_rng() observation = env.reset() + assert len(env._env_fns) == len(observation) # ensure parallerization while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization legal_action_mask = np.array([observation[i]["mask"] for i in range(batch_size)]) action = [rng.choice(np.where(legal_action_mask[i])[0]) for i in range(batch_size)] # chose action randomly observation, reward, terminated, _, _ = env.step(action) From a627628016c0c98c7a22d967b00721a0599ff73c Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 22:51:50 +0900 Subject: [PATCH 10/19] . --- speed_benchmark/{petting_zoo_forloop.py => petting_zoo.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename speed_benchmark/{petting_zoo_forloop.py => petting_zoo.py} (100%) diff --git a/speed_benchmark/petting_zoo_forloop.py b/speed_benchmark/petting_zoo.py similarity index 100% rename from speed_benchmark/petting_zoo_forloop.py rename to speed_benchmark/petting_zoo.py From 59df4ac535c9684a6fdc267263d04f2678c073d7 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Fri, 24 Feb 2023 22:51:56 +0900 Subject: [PATCH 11/19] rm --- speed_benchmark/petting_zoo_subproc.py | 64 -------------------------- 1 file changed, 64 deletions(-) delete mode 100644 speed_benchmark/petting_zoo_subproc.py diff --git a/speed_benchmark/petting_zoo_subproc.py b/speed_benchmark/petting_zoo_subproc.py deleted file mode 100644 index ed34cfdd5..000000000 --- a/speed_benchmark/petting_zoo_subproc.py +++ /dev/null @@ -1,64 +0,0 @@ -from tianshou.env import SubprocVectorEnv -from pettingzoo.classic.go import go -from pettingzoo.classic.tictactoe import tictactoe -from tianshou.env.pettingzoo_env import PettingZooEnv -import argparse -import numpy as np -import time - -class AutoResetPettingZooEnv(PettingZooEnv): # 全体でpetting_zooの関数, classをimportするとopen_spielの速度が落ちる. - def __init__(self, env): - super().__init__(env) - - def step(self, action): - obs, reward, term, trunc, info = super().step(action) - if term: - obs = super().reset() - return obs, reward, term, trunc, info - - -def make_env(env_name, n_envs): - - #from pettingzoo.classic import chess_v5 - def get_go_env(): - return AutoResetPettingZooEnv(go.env()) - def get_tictactoe_env(): - return AutoResetPettingZooEnv(tictactoe.env()) - if env_name == "go": - return SubprocVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "tictactoe": - return SubprocVectorEnv([get_tictactoe_env for _ in range(n_envs)]) - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - - -def random_play(env, n_steps_lim: int, batch_size: int) -> int: - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, _ = env.step(action) - step_num += batch_size - return step_num - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("env_name") - parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) - args = parser.parse_args() - assert args.n_steps_lim % args.batch_size == 0 - env = make_env(args.env_name, args.batch_size) - time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) - time_end = time.time() - print((step_num)/(time_end-time_sta)) \ No newline at end of file From ee799f6c461a1743eec8f268a43d6f517af22494 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 00:17:21 +0900 Subject: [PATCH 12/19] . --- speed_benchmark/petting_zoo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/speed_benchmark/petting_zoo.py b/speed_benchmark/petting_zoo.py index c188df366..0b187209e 100644 --- a/speed_benchmark/petting_zoo.py +++ b/speed_benchmark/petting_zoo.py @@ -53,7 +53,7 @@ def random_play(env, n_steps_lim: int, batch_size: int) -> int: observation = env.reset() assert len(env._env_fns) == len(observation) # ensure parallerization while step_num < n_steps_lim: - legal_action_mask = np.array([observation[i]["mask"] for i in range(batch_size)]) + legal_action_mask = [observation[i]["mask"] for i in range(batch_size)] action = [rng.choice(np.where(legal_action_mask[i])[0]) for i in range(batch_size)] # chose action randomly observation, reward, terminated, _, _ = env.step(action) step_num += batch_size From c2c95d90f405d0062d4d0a013c620c4af773b184 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 00:17:32 +0900 Subject: [PATCH 13/19] rename --- speed_benchmark/{petting_zoo.py => run_petting_zoo.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename speed_benchmark/{petting_zoo.py => run_petting_zoo.py} (100%) diff --git a/speed_benchmark/petting_zoo.py b/speed_benchmark/run_petting_zoo.py similarity index 100% rename from speed_benchmark/petting_zoo.py rename to speed_benchmark/run_petting_zoo.py From cc22a803866aeceefdb5c6731f9ddde9f464a687 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 00:31:37 +0900 Subject: [PATCH 14/19] . --- speed_benchmark/open_spiel_subproc.py | 98 +++++++++++++++++++++++++-- 1 file changed, 91 insertions(+), 7 deletions(-) diff --git a/speed_benchmark/open_spiel_subproc.py b/speed_benchmark/open_spiel_subproc.py index 668f1cbfc..0e2efb710 100644 --- a/speed_benchmark/open_spiel_subproc.py +++ b/speed_benchmark/open_spiel_subproc.py @@ -1,13 +1,97 @@ -from tianshou_env.pettingzoo_env import OpenSpielEnv -from tianshou_env.venvs import SubprocVectorEnv +import json +from tianshou.env import SubprocVectorEnv import numpy as np import time import argparse +import warnings +from abc import ABC +from typing import Any, Dict, List, Tuple + +import pyspiel +from pettingzoo.utils.env import AECEnv +from open_spiel.python.rl_environment import Environment, ChanceEventSampler + + + +# OpenSpielEnv is modified from TianShou repository (see #384 for changes): +# This wrapper enables to use TianShou's SubprocVectorEnv for OpenSpiel +# +# https://github.com/thu-ml/tianshou/blob/master/tianshou/env/pettingzoo_env.py +# +# Distributed under MIT LICENSE: +# +# https://github.com/thu-ml/tianshou/blob/master/LICENSE +class OpenSpielEnv(AECEnv, ABC): + """The interface for petting zoo environments. + + Multi-agent environments must be wrapped as + :class:`~tianshou.env.PettingZooEnv`. Here is the usage: + :: + + env = PettingZooEnv(...) + # obs is a dict containing obs, agent_id, and mask + obs = env.reset() + action = policy(obs) + obs, rew, trunc, term, info = env.step(action) + env.close() + + The available action's mask is set to True, otherwise it is set to False. + Further usage can be found at :ref:`marl_example`. + """ + + def __init__(self, env: Environment): + super().__init__() + self.env = env + self.reset() + + + def reset(self, *args: Any, **kwargs: Any) -> Tuple[dict, dict]: + time_step = self.env.reset() + + obs = time_step.observations # open_spielのEnvironmentの出力 + observation_dict = { + "agent_id": obs["current_player"], + "obs": obs["serialized_state"], + "mask": obs["legal_actions"][obs["current_player"]] + } # tianshouのPettingZooEnvの形式に直す. + + return observation_dict, {"info": obs["info_state"]} + + + def step(self, action: Any, reset_if_done=True) -> Tuple[Dict, List[int], bool, bool, Dict]: + + time_step = self.env.step([action]) + term = time_step.last() + if term: # AutoReset + time_step = self.env.reset() + + reward = time_step.rewards if not term else [0., 0.] # open_spielのEnvironmentの出力 + spiel_observation = time_step.observations + + + obs = { + 'agent_id': spiel_observation['current_player'], + 'obs': spiel_observation['serialized_state'], + 'mask': spiel_observation["legal_actions"][spiel_observation["current_player"]] + } + + return obs, reward, term, False, {"info": spiel_observation["info_state"]} + + + def close(self) -> None: + self.env.close() + + def seed(self, seed: Any = None) -> None: + try: + self.env.seed(seed) + except (NotImplementedError, AttributeError): + self.env.reset(seed=seed) + + def render(self) -> Any: + return self.env.render() def make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler def gen_env(): game = pyspiel.load_game(env_name) return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) @@ -21,7 +105,6 @@ def random_play(env: SubprocVectorEnv, n_steps_lim: int, batch_size: int): step_num = 0 rng = np.random.default_rng() observation, info = env.reset() - terminated = np.zeros(len(env._env_fns)) while step_num < n_steps_lim: legal_action_mask = [observation[i]["mask"] for i in range(len(observation))] action = [rng.choice(legal_action_mask[i]) for i in range(len(legal_action_mask))] # chose action randomly @@ -40,6 +123,7 @@ def random_play(env: SubprocVectorEnv, n_steps_lim: int, batch_size: int): assert args.n_steps_lim % args.batch_size == 0 env = make_env(args.env_name, args.batch_size, args.seed) time_sta = time.time() - step_num = random_play(env, args.n_steps_lim, args.batch_size) + steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() - print((step_num)/(time_end-time_sta)) + sec = time_end - time_sta + print(json.dumps({"game": args.env_name, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) From 1ae70ffbdbaabfc0582cad79f747aee6d5157a7f Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 00:32:26 +0900 Subject: [PATCH 15/19] rm unused files --- speed_benchmark/compare.sh | 6 - speed_benchmark/compare_for_loop.py | 104 ---- speed_benchmark/compare_subproc.py | 260 ---------- speed_benchmark/comparison.py | 108 ---- speed_benchmark/tianshou_env/__init__.py | 34 -- speed_benchmark/tianshou_env/gym_wrappers.py | 81 --- .../tianshou_env/pettingzoo_env.py | 102 ---- .../tianshou_env/test_tianshou_subproc.py | 46 -- speed_benchmark/tianshou_env/utils.py | 31 -- speed_benchmark/tianshou_env/venv_wrappers.py | 123 ----- speed_benchmark/tianshou_env/venvs.py | 483 ------------------ .../tianshou_env/worker/__init__.py | 11 - speed_benchmark/tianshou_env/worker/base.py | 106 ---- speed_benchmark/tianshou_env/worker/dummy.py | 52 -- speed_benchmark/tianshou_env/worker/ray.py | 74 --- .../tianshou_env/worker/subproc.py | 256 ---------- speed_benchmark/vector_env.py | 78 --- 17 files changed, 1955 deletions(-) delete mode 100755 speed_benchmark/compare.sh delete mode 100644 speed_benchmark/compare_for_loop.py delete mode 100644 speed_benchmark/compare_subproc.py delete mode 100644 speed_benchmark/comparison.py delete mode 100644 speed_benchmark/tianshou_env/__init__.py delete mode 100644 speed_benchmark/tianshou_env/gym_wrappers.py delete mode 100644 speed_benchmark/tianshou_env/pettingzoo_env.py delete mode 100644 speed_benchmark/tianshou_env/test_tianshou_subproc.py delete mode 100644 speed_benchmark/tianshou_env/utils.py delete mode 100644 speed_benchmark/tianshou_env/venv_wrappers.py delete mode 100644 speed_benchmark/tianshou_env/venvs.py delete mode 100644 speed_benchmark/tianshou_env/worker/__init__.py delete mode 100644 speed_benchmark/tianshou_env/worker/base.py delete mode 100644 speed_benchmark/tianshou_env/worker/dummy.py delete mode 100644 speed_benchmark/tianshou_env/worker/ray.py delete mode 100644 speed_benchmark/tianshou_env/worker/subproc.py delete mode 100644 speed_benchmark/vector_env.py diff --git a/speed_benchmark/compare.sh b/speed_benchmark/compare.sh deleted file mode 100755 index 22f662d7e..000000000 --- a/speed_benchmark/compare.sh +++ /dev/null @@ -1,6 +0,0 @@ -echo "| library | game | type | n_envs | steps | time | time per step |" -echo "| :--- | ---: | ---: | ---: | ---: | ---: | ---: |" -python compare_subproc.py open_spiel go 10 1000 -python compare_subproc.py petting_zoo go 10 1000 -python compare_for_loop.py open_spiel go 10 1000 -python compare_for_loop.py petting_zoo go 10 1000 \ No newline at end of file diff --git a/speed_benchmark/compare_for_loop.py b/speed_benchmark/compare_for_loop.py deleted file mode 100644 index bd816009b..000000000 --- a/speed_benchmark/compare_for_loop.py +++ /dev/null @@ -1,104 +0,0 @@ -from vector_env import SyncVectorEnv -import argparse -import time -import numpy as np -import collections -from tianshou.env import DummyVectorEnv -from tianshou.env.pettingzoo_env import PettingZooEnv - - -class AutoResetPettingZooEnv(PettingZooEnv): - def __init__(self, env): - super().__init__(env) - - def step(self, action): - obs, reward, term, trunc, info = super().step(action) - if term: - obs = super().reset() - return obs, reward, term, trunc, info - - -def petting_zoo_make_env(env_name, n_envs): - from pettingzoo.classic.go import go - #from pettingzoo.classic import chess_v5 - def get_go_env(): - return AutoResetPettingZooEnv(go.env()) - if env_name == "go": - return DummyVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - - -def petting_zoo_random_play(env: DummyVectorEnv, n_steps_lim: int) -> int: - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, _ = env.step(action) - step_num += 1 - return step_num - - -def open_spile_make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler - def gen_env(): - game = pyspiel.load_game(env_name) - return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) - return gen_env() - - -def open_spiel_make_env(env_name: str, n_envs: int, seed: int) -> SyncVectorEnv: - return SyncVectorEnv([open_spile_make_single_env(env_name, seed) for i in range(n_envs)]) - - -def open_spile_random_play(env: SyncVectorEnv, n_steps_lim: int): - # random play for open spiel - StepOutput = collections.namedtuple("step_output", ["action"]) - time_step = env.reset() - rng = np.random.default_rng() - step_num = 0 - while step_num < n_steps_lim: - legal_actions = np.array([ts.observations["legal_actions"][ts.observations["current_player"]] for ts in time_step]) - assert len(env.envs) == len(legal_actions) # ensure parallerization - action = rng.choice(legal_actions, axis=1) # same actions so far - step_outputs = [StepOutput(action=a) for a in action] - time_step, reward, done, unreset_time_steps = env.step(step_outputs, reset_if_done=True) - step_num += 1 - return step_num - - -def measure_time(args): - if args.library == "open_spiel": - env = open_spiel_make_env(args.env_name, args.n_envs, args.seed) - random_play_fn = open_spile_random_play - elif args.library == "petting_zoo": - env = petting_zoo_make_env(args.env_name, args.n_envs) - random_play_fn = petting_zoo_random_play - else: - raise ValueError("Incorrect library name") - time_sta = time.time() - step_num = random_play_fn(env, args.n_steps_lim) - time_end = time.time() - tim = time_end- time_sta - tim_per_step = tim / (step_num * args.n_envs) - print(f"| `{args.library}` | {args.env_name} | forloop | {args.n_envs} | {step_num} | {round(tim, 2)} | {round(tim_per_step, 6)}s |") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("library") - parser.add_argument("env_name") - parser.add_argument("n_envs", type=int) - parser.add_argument("n_steps_lim", type=int) - parser.add_argument("--seed", default=100, type=bool) - args = parser.parse_args() - measure_time(args) \ No newline at end of file diff --git a/speed_benchmark/compare_subproc.py b/speed_benchmark/compare_subproc.py deleted file mode 100644 index c99347786..000000000 --- a/speed_benchmark/compare_subproc.py +++ /dev/null @@ -1,260 +0,0 @@ -""" -Modified from: - -https://github.com/DLR-RM/stable-baselines3/blob/master/stable_baselines3/common/vec_env/base_vec_env.py - -Distributed under MIT License: - -https://github.com/DLR-RM/stable-baselines3/blob/master/LICENSE -""" - -import multiprocessing as mp -from comparison import open_spile_make_env, petting_zoo_make_env -from collections import OrderedDict -from typing import Any, Callable, List, Optional, Sequence, Tuple, Type, Union, Dict, Iterable -import numpy as np -import time -import cloudpickle -import argparse - - -def petting_zoo_make_env(env_name, n_envs): - from tianshou.env import SubprocVectorEnv - from pettingzoo.classic.go import go - from tianshou.env.pettingzoo_env import PettingZooEnv - - class AutoResetPettingZooEnv(PettingZooEnv): # 全体でpetting_zooの関数, classをimportするとopen_spielの速度が落ちる. - def __init__(self, env): - super().__init__(env) - - def step(self, action): - obs, reward, term, trunc, info = super().step(action) - if term: - obs = super().reset() - return obs, reward, term, trunc, info - - #from pettingzoo.classic import chess_v5 - def get_go_env(): - return AutoResetPettingZooEnv(go.env()) - if env_name == "go": - return SubprocVectorEnv([get_go_env for _ in range(n_envs)]) - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - - -def petting_zoo_random_play(env, n_steps_lim: int) -> int: - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation = env.reset() - terminated = np.zeros(len(env._env_fns)) - while step_num < n_steps_lim: - assert len(env._env_fns) == len(observation) # ensure parallerization - legal_action_mask = np.array([observation[i]["mask"] for i in range(len(observation))]) - action = [rng.choice(np.where(legal_action_mask[i]==1)[0]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, _ = env.step(action) - step_num += 1 - return step_num - - -class CloudpickleWrapper: - """ - Uses cloudpickle to serialize contents (otherwise multiprocessing tries to use pickle) - - :param var: the variable you wish to wrap for pickling with cloudpickle - """ - - def __init__(self, var: Any): - self.var = var - - def __getstate__(self) -> Any: - return cloudpickle.dumps(self.var) - - def __setstate__(self, var: Any) -> None: - self.var = cloudpickle.loads(var) - - -def _open_spiel_worker( - remote, parent_remote, state_wrapper: CloudpickleWrapper, env_name: str, -) -> None: - parent_remote.close() - state = state_wrapper.var() - while True: - try: - cmd, data = remote.recv() - if cmd == "step": - action = data - state.apply_action(action) - legal_actions = state.legal_actions() - terminated = state.is_terminal() - if terminated: # auto_reset - state = open_spile_make_env(env_name).new_initial_state() - legal_actions = state.legal_actions() - remote.send((legal_actions, terminated)) - - elif cmd == "reset": - legal_actions = state.legal_actions() - terminated = state.is_terminal() - remote.send((legal_actions, terminated)) - elif cmd == "render": - pass - elif cmd == "close": - remote.close() - break - elif cmd == "get_spaces": - pass - elif cmd == "env_method": - pass - elif cmd == "get_attr": - pass - elif cmd == "set_attr": - pass - elif cmd == "is_wrapped": - pass - else: - raise NotImplementedError(f"`{cmd}` is not implemented in the worker") - except EOFError: - break - - -class SubprocVecEnv(object): - """ - Creates a multiprocess vectorized wrapper for multiple environments, distributing each environment to its own - process, allowing significant speed up when the environment is computationally complex. - - For performance reasons, if your environment is not IO bound, the number of environments should not exceed the - number of logical cores on your CPU. - - .. warning:: - - Only 'forkserver' and 'spawn' start methods are thread-safe, - which is important when TensorFlow sessions or other non thread-safe - libraries are used in the parent (see issue #217). However, compared to - 'fork' they incur a small start-up cost and have restrictions on - global variables. With those methods, users must wrap the code in an - ``if __name__ == "__main__":`` block. - For more information, see the multiprocessing documentation. - - :param env_fns: Environments to run in subprocesses - :param start_method: method used to start the subprocesses. - Must be one of the methods returned by multiprocessing.get_all_start_methods(). - Defaults to 'forkserver' on available platforms, and 'spawn' otherwise. - """ - - def __init__(self, states, env_name, start_method: Optional[str] = None): - self.waiting = False - self.closed = False - self.n_envs = len(states) - env_names = [env_name] * self.n_envs - if start_method is None: - # Fork is not a thread safe method (see issue #217) - # but is more user friendly (does not require to wrap the code in - # a `if __name__ == "__main__":`) - forkserver_available = "forkserver" in mp.get_all_start_methods() - start_method = "forkserver" if forkserver_available else "spawn" - ctx = mp.get_context(start_method) - _worker = _open_spiel_worker - - self.remotes, self.work_remotes = zip(*[ctx.Pipe() for _ in range(self.n_envs)]) - self.processes = [] - for work_remote, remote, state, env_name in zip(self.work_remotes, self.remotes, states, env_names): - args = (work_remote, remote, CloudpickleWrapper(state), env_name) - # daemon=True: if the main process crashes, we should not cause things to hang - process = ctx.Process(target=_worker, args=args, daemon=True) # pytype:disable=attribute-error - process.start() - self.processes.append(process) - work_remote.close() - - - def step_async(self, actions: np.ndarray) -> None: - for remote, action in zip(self.remotes, actions): - remote.send(("step", (action))) - self.waiting = True - - - def step_wait(self) -> Tuple: - results = [remote.recv() for remote in self.remotes] - self.waiting = False - legal_actions, terminated = zip(*results) - return np.stack(legal_actions), np.stack(terminated) - - - def seed(self, seed: Optional[int] = None) -> List[Union[None, int]]: - if seed is None: - seed = np.random.randint(0, 2**32 - 1) - for idx, remote in enumerate(self.remotes): - remote.send(("seed", seed + idx)) - return [remote.recv() for remote in self.remotes] - - - def reset(self) -> Tuple: - for remote in self.remotes: - remote.send(("reset", None)) - results = [remote.recv() for remote in self.remotes] - legal_actions, terminated = zip(*results) - return np.stack(legal_actions), np.stack(terminated) - - - def step(self, actions) -> Tuple: - self.step_async(actions) - return self.step_wait() - - - def close(self) -> None: - if self.closed: - return - if self.waiting: - for remote in self.remotes: - remote.recv() - for remote in self.remotes: - remote.send(("close", None)) - for process in self.processes: - process.join() - self.closed = True - -def open_spiel_make_env(env_name: str, n_envs: int): - states = [lambda: open_spile_make_env(env_name).new_initial_state() for i in range(n_envs)] - return SubprocVecEnv(states, env_name) - - -def open_spiel_random_play(env, n_steps_lim): - legal_actions, terminated = env.reset() - n_steps = 0 - rng = np.random.default_rng() - while n_steps < n_steps_lim: - action = rng.choice(legal_actions, axis=1) # n_envs - legal_actions, terminated = env.step(action) - assert env.n_envs == legal_actions.shape[0] # 並列化されていることを確認. - n_steps += 1 - return n_steps - - -def measure_time(args): - if args.library == "open_spiel": - env = open_spiel_make_env(args.env_name, args.n_envs) - random_play_fn = open_spiel_random_play - elif args.library == "petting_zoo": - env = petting_zoo_make_env(args.env_name, args.n_envs) - random_play_fn = petting_zoo_random_play - else: - raise ValueError("Incorrect library name") - time_sta = time.time() - step_num = random_play_fn(env, args.n_steps_lim) - time_end = time.time() - tim = time_end- time_sta - env.close() - tim_per_step = tim / (step_num * args.n_envs) - print(f"| `{args.library}` | {args.env_name} | subprocess | {args.n_envs} | {step_num} | {round(tim, 2)} | {round(tim_per_step, 6)}s |") - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("library") - parser.add_argument("env_name") - parser.add_argument("n_envs", type=int) - parser.add_argument("n_steps_lim", type=int) - args = parser.parse_args() - measure_time(args) diff --git a/speed_benchmark/comparison.py b/speed_benchmark/comparison.py deleted file mode 100644 index 23aa7f37f..000000000 --- a/speed_benchmark/comparison.py +++ /dev/null @@ -1,108 +0,0 @@ -""" -Petting zooとopen spileの囲碁環境とpgxの囲碁環境の速度比較を行う -""" -import random -import numpy as np -import time -import argparse -from multiprocessing import Pool -from multiprocessing import Queue -import multiprocessing -from pgx.go import init, step, observe -import jax -from jax import vmap, jit -import jax.numpy as jnp -import numpy as np - -def petting_zoo_make_env(env_name): - from pettingzoo.classic.go import go - #from pettingzoo.classic import chess_v5 - if env_name == "go": - return go.env() - elif env_name == "chess": - #return chess_v5.env() - raise ValueError("Chess will be added later") - else: - raise ValueError("no such environment in petting zoo") - -def petting_zoo_random_play(tup): - # petting zooのgo環境でrandom gaentを終局まで動かす. - id, do_print, env_name = tup - env = petting_zoo_make_env(env_name) - step_nums = 0 - env.reset() - for agent in env.agent_iter(): - observation, reward, termination, truncation, info = env.last() - action = None if termination or truncation else np.random.choice(np.where(observation["action_mask"]==1)[0]) # this is where you would insert your policy - env.step(action) - step_nums += 1 - if do_print: - print(id, step_nums) - return id, step_nums - - -def open_spile_make_env(env_name): - import pyspiel - if env_name == "go": - return pyspiel.load_game("go") - elif env_name == "backgammon": - return pyspiel.load_game("backgammon") - elif env_name == "bridge": - return pyspiel.load_game("bridge") - elif env_name == "chess": - return pyspiel.load_game("chess") - else: - raise ValueError("no such environment in open spile") - - - -def open_spile_random_play(tup): - import pyspiel - # open spileのgo環境でrandom gaentを終局まで動かす. - id, do_print, env_name = tup - game = open_spile_make_env(env_name) - state = game.new_initial_state() - step_nums = 0 - while not state.is_terminal(): - legal_actions = state.legal_actions() - # Sample a chance event outcome. - action = np.random.choice(legal_actions) - state.apply_action(action) - step_nums += 1 - if do_print: - print(id, step_nums) - return id, step_nums - - -def measure_time(args): - if args.library == "open_spiel": - random_play_fn = open_spile_random_play - elif args.library == "petting_zoo": - random_play_fn = petting_zoo_random_play - else: - raise ValueError("Incorrect library name") - - p = Pool(args.n_processes) - process_list = [] - time_sta = time.time() - ex = p.map_async(random_play_fn, iterable=[(i, args.print_per_game, args.env_name) for i in range(args.n_games)]) - result = ex.get() - time_end = time.time() - tim = time_end- time_sta - p.close() - avarage_steps = sum(list(map(lambda x: x[1], ex.get())))//args.n_games - print("library: {} env: {} n_games: {} n_processes: {} execution time is {}, avarage number of steps is {}".format(args.library, args.env_name, args.n_games, args.n_processes, tim, avarage_steps)) - - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.add_argument("library") - parser.add_argument("env_name") - parser.add_argument("n_processes", type=int) - parser.add_argument("n_games", type=int) - parser.add_argument("--print_per_game", default=False, type=bool) - args = parser.parse_args() - measure_time(args) - - \ No newline at end of file diff --git a/speed_benchmark/tianshou_env/__init__.py b/speed_benchmark/tianshou_env/__init__.py deleted file mode 100644 index a00c3cd38..000000000 --- a/speed_benchmark/tianshou_env/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Env package.""" - -from tianshou.env.gym_wrappers import ( - ContinuousToDiscrete, - MultiDiscreteToDiscrete, - TruncatedAsTerminated, -) -from tianshou.env.venv_wrappers import VectorEnvNormObs, VectorEnvWrapper -from tianshou.env.venvs import ( - BaseVectorEnv, - DummyVectorEnv, - RayVectorEnv, - ShmemVectorEnv, - SubprocVectorEnv, -) - -try: - from tianshou.env.pettingzoo_env import PettingZooEnv -except ImportError: - pass - -__all__ = [ - "BaseVectorEnv", - "DummyVectorEnv", - "SubprocVectorEnv", - "ShmemVectorEnv", - "RayVectorEnv", - "VectorEnvWrapper", - "VectorEnvNormObs", - "PettingZooEnv", - "ContinuousToDiscrete", - "MultiDiscreteToDiscrete", - "TruncatedAsTerminated", -] diff --git a/speed_benchmark/tianshou_env/gym_wrappers.py b/speed_benchmark/tianshou_env/gym_wrappers.py deleted file mode 100644 index c9ce66acb..000000000 --- a/speed_benchmark/tianshou_env/gym_wrappers.py +++ /dev/null @@ -1,81 +0,0 @@ -from typing import Any, Dict, List, SupportsFloat, Tuple, Union - -import gymnasium as gym -import numpy as np -from packaging import version - - -class ContinuousToDiscrete(gym.ActionWrapper): - """Gym environment wrapper to take discrete action in a continuous environment. - - :param gym.Env env: gym environment with continuous action space. - :param int action_per_dim: number of discrete actions in each dimension - of the action space. - """ - - def __init__(self, env: gym.Env, action_per_dim: Union[int, List[int]]) -> None: - super().__init__(env) - assert isinstance(env.action_space, gym.spaces.Box) - low, high = env.action_space.low, env.action_space.high - if isinstance(action_per_dim, int): - action_per_dim = [action_per_dim] * env.action_space.shape[0] - assert len(action_per_dim) == env.action_space.shape[0] - self.action_space = gym.spaces.MultiDiscrete(action_per_dim) - self.mesh = np.array( - [np.linspace(lo, hi, a) for lo, hi, a in zip(low, high, action_per_dim)], - dtype=object - ) - - def action(self, act: np.ndarray) -> np.ndarray: # type: ignore - # modify act - assert len(act.shape) <= 2, f"Unknown action format with shape {act.shape}." - if len(act.shape) == 1: - return np.array([self.mesh[i][a] for i, a in enumerate(act)]) - return np.array([[self.mesh[i][a] for i, a in enumerate(a_)] for a_ in act]) - - -class MultiDiscreteToDiscrete(gym.ActionWrapper): - """Gym environment wrapper to take discrete action in multidiscrete environment. - - :param gym.Env env: gym environment with multidiscrete action space. - """ - - def __init__(self, env: gym.Env) -> None: - super().__init__(env) - assert isinstance(env.action_space, gym.spaces.MultiDiscrete) - nvec = env.action_space.nvec - assert nvec.ndim == 1 - self.bases = np.ones_like(nvec) - for i in range(1, len(self.bases)): - self.bases[i] = self.bases[i - 1] * nvec[-i] - self.action_space = gym.spaces.Discrete(np.prod(nvec)) - - def action(self, act: np.ndarray) -> np.ndarray: # type: ignore - converted_act = [] - for b in np.flip(self.bases): - converted_act.append(act // b) - act = act % b - return np.array(converted_act).transpose() - - -class TruncatedAsTerminated(gym.Wrapper): - """A wrapper that set ``terminated = terminated or truncated`` for ``step()``. - - It's intended to use with ``gym.wrappers.TimeLimit``. - - :param gym.Env env: gym environment. - """ - - def __init__(self, env: gym.Env): - super().__init__(env) - if not version.parse(gym.__version__) >= version.parse('0.26.0'): - raise EnvironmentError( - f"TruncatedAsTerminated is not applicable with gym version \ - {gym.__version__}" - ) - - def step(self, - act: np.ndarray) -> Tuple[Any, SupportsFloat, bool, bool, Dict[str, Any]]: - observation, reward, terminated, truncated, info = super().step(act) - terminated = (terminated or truncated) - return observation, reward, terminated, truncated, info diff --git a/speed_benchmark/tianshou_env/pettingzoo_env.py b/speed_benchmark/tianshou_env/pettingzoo_env.py deleted file mode 100644 index aee47f1bc..000000000 --- a/speed_benchmark/tianshou_env/pettingzoo_env.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -Copied from TienShou repository: - -https://github.com/thu-ml/tianshou/blob/master/tianshou/env/pettingzoo_env.py - -Distributed under MIT LICENSE: - -https://github.com/thu-ml/tianshou/blob/master/LICENSE - -Modified to use OpenSpiel in SubprocVecEnv (see #384 for changes) -""" - - - -import warnings -from abc import ABC -from typing import Any, Dict, List, Tuple - -import pettingzoo -from gymnasium import spaces -from packaging import version -from pettingzoo.utils.env import AECEnv -from pettingzoo.utils.wrappers import BaseWrapper -from open_spiel.python.rl_environment import Environment, ChanceEventSampler - - -if version.parse(pettingzoo.__version__) < version.parse("1.21.0"): - warnings.warn( - f"You are using PettingZoo {pettingzoo.__version__}. " - f"Future tianshou versions may not support PettingZoo<1.21.0. " - f"Consider upgrading your PettingZoo version.", DeprecationWarning - ) - - -class OpenSpielEnv(AECEnv, ABC): - """The interface for petting zoo environments. - - Multi-agent environments must be wrapped as - :class:`~tianshou.env.PettingZooEnv`. Here is the usage: - :: - - env = PettingZooEnv(...) - # obs is a dict containing obs, agent_id, and mask - obs = env.reset() - action = policy(obs) - obs, rew, trunc, term, info = env.step(action) - env.close() - - The available action's mask is set to True, otherwise it is set to False. - Further usage can be found at :ref:`marl_example`. - """ - - def __init__(self, env: Environment): - super().__init__() - self.env = env - self.reset() - - - def reset(self, *args: Any, **kwargs: Any) -> Tuple[dict, dict]: - time_step = self.env.reset() - - obs = time_step.observations # open_spielのEnvironmentの出力 - observation_dict = { - "agent_id": obs["current_player"], - "obs": obs["serialized_state"], - "mask": obs["legal_actions"][obs["current_player"]] - } # tianshouのPettingZooEnvの形式に直す. - - return observation_dict, {"info": obs["info_state"]} - - - def step(self, action: Any, reset_if_done=True) -> Tuple[Dict, List[int], bool, bool, Dict]: - - time_step = self.env.step([action]) - term = time_step.last() - if term: # AutoReset - time_step = self.env.reset() - - reward = time_step.rewards if not term else [0., 0.] # open_spielのEnvironmentの出力 - spiel_observation = time_step.observations - - - obs = { - 'agent_id': spiel_observation['current_player'], - 'obs': spiel_observation['serialized_state'], - 'mask': spiel_observation["legal_actions"][spiel_observation["current_player"]] - } - - return obs, reward, term, False, {"info": spiel_observation["info_state"]} - - - def close(self) -> None: - self.env.close() - - def seed(self, seed: Any = None) -> None: - try: - self.env.seed(seed) - except (NotImplementedError, AttributeError): - self.env.reset(seed=seed) - - def render(self) -> Any: - return self.env.render() diff --git a/speed_benchmark/tianshou_env/test_tianshou_subproc.py b/speed_benchmark/tianshou_env/test_tianshou_subproc.py deleted file mode 100644 index 39cd1172a..000000000 --- a/speed_benchmark/tianshou_env/test_tianshou_subproc.py +++ /dev/null @@ -1,46 +0,0 @@ -from pettingzoo_env import OpenSpielEnv -from venvs import SubprocVectorEnv -import numpy as np -import time -import pyspiel - - -if __name__ == "__main__": - env_name = "go" - n_envs = 10 - seed = 100 - n_steps_lim = 1000 - - def open_spile_make_single_env(env_name: str, seed: int): - import pyspiel - from open_spiel.python.rl_environment import Environment, ChanceEventSampler - def gen_env(): - game = pyspiel.load_game(env_name) - return Environment(game, chance_event_sampler=ChanceEventSampler(seed=seed)) - return gen_env() - def get_go_env(): - return OpenSpielEnv(open_spile_make_single_env(env_name, seed)) - - env = SubprocVectorEnv([lambda: OpenSpielEnv(open_spile_make_single_env(env_name, seed)) for _ in range(n_envs)]) - - # petting zooのgo環境でrandom gaentを終局まで動かす. - step_num = 0 - rng = np.random.default_rng() - observation, info = env.reset() - terminated = np.zeros(len(env._env_fns)) - time_sta = time.time() - while step_num < n_steps_lim: - legal_action_mask = [observation[i]["mask"] for i in range(len(observation))] - #old_board_black = [info[i]["info"][0][:361] if observation[i]["agent_id"] == 0 else info[i]["info"][0][361: 722] for i in range(n_envs)] # 手番のplayerから見たboard - action = [rng.choice(legal_action_mask[i]) for i in range(len(legal_action_mask))] # chose action randomly - observation, reward, terminated, _, info = env.step(action) - #new_legal_action_mask = [observation[i]["mask"] for i in range(len(observation))] - #assert sum([((not action[i] in new_legal_action_mask[i]) & (not terminated[i])) | (action[i]==361)| terminated[i] for i in range(n_envs)]) == n_envs # 実行済みのactionが消えていることを確認. 361はパス. - #new_board_black = [info[i]["info"][0][361: 722] if observation[i]["agent_id"] == 0 else info[i]["info"][0][:361] for i in range(n_envs)] # 直前の手番のplayerから見たboard - #assert sum([(sum(new_board_black[i]) > sum(old_board_black[i])) | (action[i]==361) | terminated[i] for i in range(n_envs)]) # 石を置いた場合は増えているかどうか 361はパス. - #step_num += 1 - time_end = time.time() - tim = time_end - time_sta - print(tim) - - diff --git a/speed_benchmark/tianshou_env/utils.py b/speed_benchmark/tianshou_env/utils.py deleted file mode 100644 index cbd36d998..000000000 --- a/speed_benchmark/tianshou_env/utils.py +++ /dev/null @@ -1,31 +0,0 @@ -from typing import TYPE_CHECKING, Any, Tuple, Union - -import cloudpickle -import gymnasium -import numpy as np - -try: - from tianshou.env.pettingzoo_env import PettingZooEnv -except ImportError: - PettingZooEnv = None # type: ignore - -if TYPE_CHECKING: - import gym - -ENV_TYPE = Union[gymnasium.Env, "gym.Env", PettingZooEnv] - -gym_new_venv_step_type = Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, - np.ndarray] - - -class CloudpickleWrapper(object): - """A cloudpickle wrapper used in SubprocVectorEnv.""" - - def __init__(self, data: Any) -> None: - self.data = data - - def __getstate__(self) -> str: - return cloudpickle.dumps(self.data) - - def __setstate__(self, data: str) -> None: - self.data = cloudpickle.loads(data) diff --git a/speed_benchmark/tianshou_env/venv_wrappers.py b/speed_benchmark/tianshou_env/venv_wrappers.py deleted file mode 100644 index 66470289c..000000000 --- a/speed_benchmark/tianshou_env/venv_wrappers.py +++ /dev/null @@ -1,123 +0,0 @@ -from typing import Any, List, Optional, Tuple, Union - -import numpy as np - -from tianshou.env.utils import gym_new_venv_step_type -from tianshou.env.venvs import GYM_RESERVED_KEYS, BaseVectorEnv -from tianshou.utils import RunningMeanStd - - -class VectorEnvWrapper(BaseVectorEnv): - """Base class for vectorized environments wrapper.""" - - def __init__(self, venv: BaseVectorEnv) -> None: - self.venv = venv - self.is_async = venv.is_async - - def __len__(self) -> int: - return len(self.venv) - - def __getattribute__(self, key: str) -> Any: - if key in GYM_RESERVED_KEYS: # reserved keys in gym.Env - return getattr(self.venv, key) - else: - return super().__getattribute__(key) - - def get_env_attr( - self, - key: str, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> List[Any]: - return self.venv.get_env_attr(key, id) - - def set_env_attr( - self, - key: str, - value: Any, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> None: - return self.venv.set_env_attr(key, value, id) - - def reset( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - **kwargs: Any, - ) -> Tuple[np.ndarray, Union[dict, List[dict]]]: - return self.venv.reset(id, **kwargs) - - def step( - self, - action: np.ndarray, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> gym_new_venv_step_type: - return self.venv.step(action, id) - - def seed( - self, - seed: Optional[Union[int, List[int]]] = None, - ) -> List[Optional[List[int]]]: - return self.venv.seed(seed) - - def render(self, **kwargs: Any) -> List[Any]: - return self.venv.render(**kwargs) - - def close(self) -> None: - self.venv.close() - - -class VectorEnvNormObs(VectorEnvWrapper): - """An observation normalization wrapper for vectorized environments. - - :param bool update_obs_rms: whether to update obs_rms. Default to True. - """ - - def __init__( - self, - venv: BaseVectorEnv, - update_obs_rms: bool = True, - ) -> None: - super().__init__(venv) - # initialize observation running mean/std - self.update_obs_rms = update_obs_rms - self.obs_rms = RunningMeanStd() - - def reset( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - **kwargs: Any, - ) -> Tuple[np.ndarray, Union[dict, List[dict]]]: - obs, info = self.venv.reset(id, **kwargs) - - if isinstance(obs, tuple): # type: ignore - raise TypeError( - "Tuple observation space is not supported. ", - "Please change it to array or dict space", - ) - - if self.obs_rms and self.update_obs_rms: - self.obs_rms.update(obs) - obs = self._norm_obs(obs) - return obs, info - - def step( - self, - action: np.ndarray, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> gym_new_venv_step_type: - step_results = self.venv.step(action, id) - if self.obs_rms and self.update_obs_rms: - self.obs_rms.update(step_results[0]) - return (self._norm_obs(step_results[0]), *step_results[1:]) - - def _norm_obs(self, obs: np.ndarray) -> np.ndarray: - if self.obs_rms: - return self.obs_rms.norm(obs) # type: ignore - return obs - - def set_obs_rms(self, obs_rms: RunningMeanStd) -> None: - """Set with given observation running mean/std.""" - self.obs_rms = obs_rms - - def get_obs_rms(self) -> RunningMeanStd: - """Return observation running mean/std.""" - return self.obs_rms diff --git a/speed_benchmark/tianshou_env/venvs.py b/speed_benchmark/tianshou_env/venvs.py deleted file mode 100644 index 09379e2a8..000000000 --- a/speed_benchmark/tianshou_env/venvs.py +++ /dev/null @@ -1,483 +0,0 @@ -import warnings -from typing import Any, Callable, List, Optional, Tuple, Union - -import gymnasium as gym -import numpy as np -import packaging - -from .pettingzoo_env import OpenSpielEnv -from .utils import ENV_TYPE, gym_new_venv_step_type -from .worker import ( - DummyEnvWorker, - EnvWorker, - RayEnvWorker, - SubprocEnvWorker, -) - -try: - import gym as old_gym - has_old_gym = True -except ImportError: - has_old_gym = False - -GYM_RESERVED_KEYS = [ - "metadata", "reward_range", "spec", "action_space", "observation_space" -] - - -def _patch_env_generator(fn: Callable[[], ENV_TYPE]) -> Callable[[], gym.Env]: - """Takes an environment generator and patches it to return Gymnasium envs. - - This function takes the environment generator `fn` and returns a patched - generator, without invoking `fn`. The original generator may return - Gymnasium or OpenAI Gym environments, but the patched generator wraps - the result of `fn` in a shimmy wrapper to convert it to Gymnasium, - if necessary. - """ - - def patched() -> gym.Env: - assert callable( - fn - ), "Env generators that are provided to vector environemnts must be callable." - - env = fn() - if isinstance(env, (gym.Env, OpenSpielEnv)): - return env - - if not has_old_gym or not isinstance(env, old_gym.Env): - raise ValueError( - f"Environment generator returned a {type(env)}, not a Gymnasium " - f"environment. In this case, we expect OpenAI Gym to be " - f"installed and the environment to be an OpenAI Gym environment." - ) - - try: - import shimmy - except ImportError as e: - raise ImportError( - "Missing shimmy installation. You provided an environment generator " - "that returned an OpenAI Gym environment. " - "Tianshou has transitioned to using Gymnasium internally. " - "In order to use OpenAI Gym environments with tianshou, you need to " - "install shimmy (`pip install shimmy`)." - ) from e - - warnings.warn( - "You provided an environment generator that returned an OpenAI Gym " - "environment. We strongly recommend transitioning to Gymnasium " - "environments. " - "Tianshou is automatically wrapping your environments in a compatibility " - "layer, which could potentially cause issues." - ) - - gym_version = packaging.version.parse(old_gym.__version__) - if gym_version >= packaging.version.parse("0.26.0"): - return shimmy.GymV26CompatibilityV0(env=env) - elif gym_version >= packaging.version.parse("0.22.0"): - return shimmy.GymV22CompatibilityV0(env=env) - else: - raise Exception( - f"Found OpenAI Gym version {gym.__version__}. " - f"Tianshou only supports OpenAI Gym environments of version>=0.22.0" - ) - - return patched - - -class BaseVectorEnv(object): - """Base class for vectorized environments. - - Usage: - :: - - env_num = 8 - envs = DummyVectorEnv([lambda: gym.make(task) for _ in range(env_num)]) - assert len(envs) == env_num - - It accepts a list of environment generators. In other words, an environment - generator ``efn`` of a specific task means that ``efn()`` returns the - environment of the given task, for example, ``gym.make(task)``. - - All of the VectorEnv must inherit :class:`~tianshou.env.BaseVectorEnv`. - Here are some other usages: - :: - - envs.seed(2) # which is equal to the next line - envs.seed([2, 3, 4, 5, 6, 7, 8, 9]) # set specific seed for each env - obs = envs.reset() # reset all environments - obs = envs.reset([0, 5, 7]) # reset 3 specific environments - obs, rew, done, info = envs.step([1] * 8) # step synchronously - envs.render() # render all environments - envs.close() # close all environments - - .. warning:: - - If you use your own environment, please make sure the ``seed`` method - is set up properly, e.g., - :: - - def seed(self, seed): - np.random.seed(seed) - - Otherwise, the outputs of these envs may be the same with each other. - - :param env_fns: a list of callable envs, ``env_fns[i]()`` generates the i-th env. - :param worker_fn: a callable worker, ``worker_fn(env_fns[i])`` generates a - worker which contains the i-th env. - :param int wait_num: use in asynchronous simulation if the time cost of - ``env.step`` varies with time and synchronously waiting for all - environments to finish a step is time-wasting. In that case, we can - return when ``wait_num`` environments finish a step and keep on - simulation in these environments. If ``None``, asynchronous simulation - is disabled; else, ``1 <= wait_num <= env_num``. - :param float timeout: use in asynchronous simulation same as above, in each - vectorized step it only deal with those environments spending time - within ``timeout`` seconds. - """ - - def __init__( - self, - env_fns: List[Callable[[], ENV_TYPE]], - worker_fn: Callable[[Callable[[], gym.Env]], EnvWorker], - wait_num: Optional[int] = None, - timeout: Optional[float] = None, - ) -> None: - self._env_fns = env_fns - # A VectorEnv contains a pool of EnvWorkers, which corresponds to - # interact with the given envs (one worker <-> one env). - self.workers = [worker_fn(_patch_env_generator(fn)) for fn in env_fns] - self.worker_class = type(self.workers[0]) - assert issubclass(self.worker_class, EnvWorker) - assert all([isinstance(w, self.worker_class) for w in self.workers]) - - self.env_num = len(env_fns) - self.wait_num = wait_num or len(env_fns) - assert 1 <= self.wait_num <= len(env_fns), \ - f"wait_num should be in [1, {len(env_fns)}], but got {wait_num}" - self.timeout = timeout - assert self.timeout is None or self.timeout > 0, \ - f"timeout is {timeout}, it should be positive if provided!" - self.is_async = self.wait_num != len(env_fns) or timeout is not None - self.waiting_conn: List[EnvWorker] = [] - # environments in self.ready_id is actually ready - # but environments in self.waiting_id are just waiting when checked, - # and they may be ready now, but this is not known until we check it - # in the step() function - self.waiting_id: List[int] = [] - # all environments are ready in the beginning - self.ready_id = list(range(self.env_num)) - self.is_closed = False - - def _assert_is_not_closed(self) -> None: - assert not self.is_closed, \ - f"Methods of {self.__class__.__name__} cannot be called after close." - - def __len__(self) -> int: - """Return len(self), which is the number of environments.""" - return self.env_num - - def __getattribute__(self, key: str) -> Any: - """Switch the attribute getter depending on the key. - - Any class who inherits ``gym.Env`` will inherit some attributes, like - ``action_space``. However, we would like the attribute lookup to go straight - into the worker (in fact, this vector env's action_space is always None). - """ - if key in GYM_RESERVED_KEYS: # reserved keys in gym.Env - return self.get_env_attr(key) - else: - return super().__getattribute__(key) - - def get_env_attr( - self, - key: str, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> List[Any]: - """Get an attribute from the underlying environments. - - If id is an int, retrieve the attribute denoted by key from the environment - underlying the worker at index id. The result is returned as a list with one - element. Otherwise, retrieve the attribute for all workers at indices id and - return a list that is ordered correspondingly to id. - - :param str key: The key of the desired attribute. - :param id: Indice(s) of the desired worker(s). Default to None for all env_id. - - :return list: The list of environment attributes. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if self.is_async: - self._assert_id(id) - - return [self.workers[j].get_env_attr(key) for j in id] - - def set_env_attr( - self, - key: str, - value: Any, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> None: - """Set an attribute in the underlying environments. - - If id is an int, set the attribute denoted by key from the environment - underlying the worker at index id to value. - Otherwise, set the attribute for all workers at indices id. - - :param str key: The key of the desired attribute. - :param Any value: The new value of the attribute. - :param id: Indice(s) of the desired worker(s). Default to None for all env_id. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if self.is_async: - self._assert_id(id) - for j in id: - self.workers[j].set_env_attr(key, value) - - def _wrap_id( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> Union[List[int], np.ndarray]: - if id is None: - return list(range(self.env_num)) - return [id] if np.isscalar(id) else id # type: ignore - - def _assert_id(self, id: Union[List[int], np.ndarray]) -> None: - for i in id: - assert i not in self.waiting_id, \ - f"Cannot interact with environment {i} which is stepping now." - assert i in self.ready_id, \ - f"Can only interact with ready environments {self.ready_id}." - - def reset( - self, - id: Optional[Union[int, List[int], np.ndarray]] = None, - **kwargs: Any, - ) -> Tuple[np.ndarray, Union[dict, List[dict]]]: - """Reset the state of some envs and return initial observations. - - If id is None, reset the state of all the environments and return - initial observations, otherwise reset the specific environments with - the given id, either an int or a list. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if self.is_async: - self._assert_id(id) - - # send(None) == reset() in worker - for i in id: - self.workers[i].send(None, **kwargs) - ret_list = [self.workers[i].recv() for i in id] - #assert isinstance(ret_list[0], (tuple, list)) and len( - # ret_list[0] - #) == 2 and isinstance(ret_list[0][1], dict) - - obs_list = [r[0] for r in ret_list] - - if isinstance(obs_list[0], tuple): # type: ignore - raise TypeError( - "Tuple observation space is not supported. ", - "Please change it to array or dict space", - ) - try: - obs = np.stack(obs_list) - except ValueError: # different len(obs) - obs = np.array(obs_list, dtype=object) - - infos = [r[1] for r in ret_list] - return obs, infos # type: ignore - - def step( - self, - action: np.ndarray, - id: Optional[Union[int, List[int], np.ndarray]] = None, - ) -> gym_new_venv_step_type: - """Run one timestep of some environments' dynamics. - - If id is None, run one timestep of all the environments’ dynamics; - otherwise run one timestep for some environments with given id, either - an int or a list. When the end of episode is reached, you are - responsible for calling reset(id) to reset this environment’s state. - - Accept a batch of action and return a tuple (batch_obs, batch_rew, - batch_done, batch_info) in numpy format. - - :param numpy.ndarray action: a batch of action provided by the agent. - - :return: A tuple consisting of either: - - * ``obs`` a numpy.ndarray, the agent's observation of current environments - * ``rew`` a numpy.ndarray, the amount of rewards returned after \ - previous actions - * ``terminated`` a numpy.ndarray, whether these episodes have been \ - terminated - * ``truncated`` a numpy.ndarray, whether these episodes have been truncated - * ``info`` a numpy.ndarray, contains auxiliary diagnostic \ - information (helpful for debugging, and sometimes learning) - - For the async simulation: - - Provide the given action to the environments. The action sequence - should correspond to the ``id`` argument, and the ``id`` argument - should be a subset of the ``env_id`` in the last returned ``info`` - (initially they are env_ids of all the environments). If action is - None, fetch unfinished step() calls instead. - """ - self._assert_is_not_closed() - id = self._wrap_id(id) - if not self.is_async: - assert len(action) == len(id) - for i, j in enumerate(id): - self.workers[j].send(action[i]) - result = [] - for j in id: - env_return = self.workers[j].recv() - env_return[-1]["env_id"] = j - result.append(env_return) - else: - if action is not None: - self._assert_id(id) - assert len(action) == len(id) - for act, env_id in zip(action, id): - self.workers[env_id].send(act) - self.waiting_conn.append(self.workers[env_id]) - self.waiting_id.append(env_id) - self.ready_id = [x for x in self.ready_id if x not in id] - ready_conns: List[EnvWorker] = [] - while not ready_conns: - ready_conns = self.worker_class.wait( - self.waiting_conn, self.wait_num, self.timeout - ) - result = [] - for conn in ready_conns: - waiting_index = self.waiting_conn.index(conn) - self.waiting_conn.pop(waiting_index) - env_id = self.waiting_id.pop(waiting_index) - # env_return can be (obs, reward, done, info) or - # (obs, reward, terminated, truncated, info) - env_return = conn.recv() - env_return[-1]["env_id"] = env_id # Add `env_id` to info - result.append(env_return) - self.ready_id.append(env_id) - obs_list, rew_list, term_list, trunc_list, info_list = tuple(zip(*result)) - try: - obs_stack = np.stack(obs_list) - except ValueError: # different len(obs) - obs_stack = np.array(obs_list, dtype=object) - return obs_stack, np.stack(rew_list), np.stack(term_list), np.stack( - trunc_list - ), np.stack(info_list) - - def seed( - self, - seed: Optional[Union[int, List[int]]] = None, - ) -> List[Optional[List[int]]]: - """Set the seed for all environments. - - Accept ``None``, an int (which will extend ``i`` to - ``[i, i + 1, i + 2, ...]``) or a list. - - :return: The list of seeds used in this env's random number generators. - The first value in the list should be the "main" seed, or the value - which a reproducer pass to "seed". - """ - self._assert_is_not_closed() - seed_list: Union[List[None], List[int]] - if seed is None: - seed_list = [seed] * self.env_num - elif isinstance(seed, int): - seed_list = [seed + i for i in range(self.env_num)] - else: - seed_list = seed - return [w.seed(s) for w, s in zip(self.workers, seed_list)] - - def render(self, **kwargs: Any) -> List[Any]: - """Render all of the environments.""" - self._assert_is_not_closed() - if self.is_async and len(self.waiting_id) > 0: - raise RuntimeError( - f"Environments {self.waiting_id} are still stepping, cannot " - "render them now." - ) - return [w.render(**kwargs) for w in self.workers] - - def close(self) -> None: - """Close all of the environments. - - This function will be called only once (if not, it will be called during - garbage collected). This way, ``close`` of all workers can be assured. - """ - self._assert_is_not_closed() - for w in self.workers: - w.close() - self.is_closed = True - - -class DummyVectorEnv(BaseVectorEnv): - """Dummy vectorized environment wrapper, implemented in for-loop. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - super().__init__(env_fns, DummyEnvWorker, **kwargs) - - -class SubprocVectorEnv(BaseVectorEnv): - """Vectorized environment wrapper based on subprocess. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - - def worker_fn(fn: Callable[[], gym.Env]) -> SubprocEnvWorker: - return SubprocEnvWorker(fn, share_memory=False) - - super().__init__(env_fns, worker_fn, **kwargs) - - -class ShmemVectorEnv(BaseVectorEnv): - """Optimized SubprocVectorEnv with shared buffers to exchange observations. - - ShmemVectorEnv has exactly the same API as SubprocVectorEnv. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - - def worker_fn(fn: Callable[[], gym.Env]) -> SubprocEnvWorker: - return SubprocEnvWorker(fn, share_memory=True) - - super().__init__(env_fns, worker_fn, **kwargs) - - -class RayVectorEnv(BaseVectorEnv): - """Vectorized environment wrapper based on ray. - - This is a choice to run distributed environments in a cluster. - - .. seealso:: - - Please refer to :class:`~tianshou.env.BaseVectorEnv` for other APIs' usage. - """ - - def __init__(self, env_fns: List[Callable[[], ENV_TYPE]], **kwargs: Any) -> None: - try: - import ray - except ImportError as exception: - raise ImportError( - "Please install ray to support RayVectorEnv: pip install ray" - ) from exception - if not ray.is_initialized(): - ray.init() - super().__init__(env_fns, RayEnvWorker, **kwargs) diff --git a/speed_benchmark/tianshou_env/worker/__init__.py b/speed_benchmark/tianshou_env/worker/__init__.py deleted file mode 100644 index 1b1f37510..000000000 --- a/speed_benchmark/tianshou_env/worker/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from tianshou.env.worker.base import EnvWorker -from tianshou.env.worker.dummy import DummyEnvWorker -from tianshou.env.worker.ray import RayEnvWorker -from tianshou.env.worker.subproc import SubprocEnvWorker - -__all__ = [ - "EnvWorker", - "DummyEnvWorker", - "SubprocEnvWorker", - "RayEnvWorker", -] diff --git a/speed_benchmark/tianshou_env/worker/base.py b/speed_benchmark/tianshou_env/worker/base.py deleted file mode 100644 index 773d56bce..000000000 --- a/speed_benchmark/tianshou_env/worker/base.py +++ /dev/null @@ -1,106 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Any, Callable, List, Optional, Tuple, Union - -import gymnasium as gym -import numpy as np - -from tianshou.env.utils import gym_new_venv_step_type -from tianshou.utils import deprecation - - -class EnvWorker(ABC): - """An abstract worker for an environment.""" - - def __init__(self, env_fn: Callable[[], gym.Env]) -> None: - self._env_fn = env_fn - self.is_closed = False - self.result: Union[gym_new_venv_step_type, Tuple[np.ndarray, dict]] - self.action_space = self.get_env_attr("action_space") # noqa: B009 - self.is_reset = False - - @abstractmethod - def get_env_attr(self, key: str) -> Any: - pass - - @abstractmethod - def set_env_attr(self, key: str, value: Any) -> None: - pass - - def send(self, action: Optional[np.ndarray]) -> None: - """Send action signal to low-level worker. - - When action is None, it indicates sending "reset" signal; otherwise - it indicates "step" signal. The paired return value from "recv" - function is determined by such kind of different signal. - """ - if hasattr(self, "send_action"): - deprecation( - "send_action will soon be deprecated. " - "Please use send and recv for your own EnvWorker." - ) - if action is None: - self.is_reset = True - self.result = self.reset() - else: - self.is_reset = False - self.send_action(action) - - def recv( - self - ) -> Union[gym_new_venv_step_type, Tuple[np.ndarray, dict], ]: # noqa:E125 - """Receive result from low-level worker. - - If the last "send" function sends a NULL action, it only returns a - single observation; otherwise it returns a tuple of (obs, rew, done, - info) or (obs, rew, terminated, truncated, info), based on whether - the environment is using the old step API or the new one. - """ - if hasattr(self, "get_result"): - deprecation( - "get_result will soon be deprecated. " - "Please use send and recv for your own EnvWorker." - ) - if not self.is_reset: - self.result = self.get_result() - return self.result - - @abstractmethod - def reset(self, **kwargs: Any) -> Tuple[np.ndarray, dict]: - pass - - def step(self, action: np.ndarray) -> gym_new_venv_step_type: - """Perform one timestep of the environment's dynamic. - - "send" and "recv" are coupled in sync simulation, so users only call - "step" function. But they can be called separately in async - simulation, i.e. someone calls "send" first, and calls "recv" later. - """ - self.send(action) - return self.recv() # type: ignore - - @staticmethod - def wait( - workers: List["EnvWorker"], - wait_num: int, - timeout: Optional[float] = None - ) -> List["EnvWorker"]: - """Given a list of workers, return those ready ones.""" - raise NotImplementedError - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - return self.action_space.seed(seed) # issue 299 - - @abstractmethod - def render(self, **kwargs: Any) -> Any: - """Render the environment.""" - pass - - @abstractmethod - def close_env(self) -> None: - pass - - def close(self) -> None: - if self.is_closed: - return None - self.is_closed = True - self.close_env() diff --git a/speed_benchmark/tianshou_env/worker/dummy.py b/speed_benchmark/tianshou_env/worker/dummy.py deleted file mode 100644 index 4eec4e0fa..000000000 --- a/speed_benchmark/tianshou_env/worker/dummy.py +++ /dev/null @@ -1,52 +0,0 @@ -from typing import Any, Callable, List, Optional, Tuple - -import gymnasium as gym -import numpy as np - -from tianshou.env.worker import EnvWorker - - -class DummyEnvWorker(EnvWorker): - """Dummy worker used in sequential vector environments.""" - - def __init__(self, env_fn: Callable[[], gym.Env]) -> None: - self.env = env_fn() - super().__init__(env_fn) - - def get_env_attr(self, key: str) -> Any: - return getattr(self.env, key) - - def set_env_attr(self, key: str, value: Any) -> None: - setattr(self.env.unwrapped, key, value) - - def reset(self, **kwargs: Any) -> Tuple[np.ndarray, dict]: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - return self.env.reset(**kwargs) - - @staticmethod - def wait( # type: ignore - workers: List["DummyEnvWorker"], wait_num: int, timeout: Optional[float] = None - ) -> List["DummyEnvWorker"]: - # Sequential EnvWorker objects are always ready - return workers - - def send(self, action: Optional[np.ndarray], **kwargs: Any) -> None: - if action is None: - self.result = self.env.reset(**kwargs) - else: - self.result = self.env.step(action) # type: ignore - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - super().seed(seed) - try: - return self.env.seed(seed) # type: ignore - except (AttributeError, NotImplementedError): - self.env.reset(seed=seed) - return [seed] # type: ignore - - def render(self, **kwargs: Any) -> Any: - return self.env.render(**kwargs) - - def close_env(self) -> None: - self.env.close() diff --git a/speed_benchmark/tianshou_env/worker/ray.py b/speed_benchmark/tianshou_env/worker/ray.py deleted file mode 100644 index fe2b8fe8d..000000000 --- a/speed_benchmark/tianshou_env/worker/ray.py +++ /dev/null @@ -1,74 +0,0 @@ -from typing import Any, Callable, List, Optional - -import gymnasium as gym -import numpy as np - -from tianshou.env.utils import gym_new_venv_step_type -from tianshou.env.worker import EnvWorker - -try: - import ray -except ImportError: - pass - - -class _SetAttrWrapper(gym.Wrapper): - - def set_env_attr(self, key: str, value: Any) -> None: - setattr(self.env.unwrapped, key, value) - - def get_env_attr(self, key: str) -> Any: - return getattr(self.env, key) - - -class RayEnvWorker(EnvWorker): - """Ray worker used in RayVectorEnv.""" - - def __init__(self, env_fn: Callable[[], gym.Env]) -> None: - self.env = ray.remote(_SetAttrWrapper).options( # type: ignore - num_cpus=0 - ).remote(env_fn()) - super().__init__(env_fn) - - def get_env_attr(self, key: str) -> Any: - return ray.get(self.env.get_env_attr.remote(key)) - - def set_env_attr(self, key: str, value: Any) -> None: - ray.get(self.env.set_env_attr.remote(key, value)) - - def reset(self, **kwargs: Any) -> Any: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - return ray.get(self.env.reset.remote(**kwargs)) - - @staticmethod - def wait( # type: ignore - workers: List["RayEnvWorker"], wait_num: int, timeout: Optional[float] = None - ) -> List["RayEnvWorker"]: - results = [x.result for x in workers] - ready_results, _ = ray.wait(results, num_returns=wait_num, timeout=timeout) - return [workers[results.index(result)] for result in ready_results] - - def send(self, action: Optional[np.ndarray], **kwargs: Any) -> None: - # self.result is actually a handle - if action is None: - self.result = self.env.reset.remote(**kwargs) - else: - self.result = self.env.step.remote(action) - - def recv(self) -> gym_new_venv_step_type: - return ray.get(self.result) # type: ignore - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - super().seed(seed) - try: - return ray.get(self.env.seed.remote(seed)) - except (AttributeError, NotImplementedError): - self.env.reset.remote(seed=seed) - return None - - def render(self, **kwargs: Any) -> Any: - return ray.get(self.env.render.remote(**kwargs)) - - def close_env(self) -> None: - ray.get(self.env.close.remote()) diff --git a/speed_benchmark/tianshou_env/worker/subproc.py b/speed_benchmark/tianshou_env/worker/subproc.py deleted file mode 100644 index 68f34e687..000000000 --- a/speed_benchmark/tianshou_env/worker/subproc.py +++ /dev/null @@ -1,256 +0,0 @@ -import ctypes -import time -from collections import OrderedDict -from multiprocessing import Array, Pipe, connection -from multiprocessing.context import Process -from typing import Any, Callable, List, Optional, Tuple, Union - -import gymnasium as gym -import numpy as np - -from tianshou.env.utils import CloudpickleWrapper, gym_new_venv_step_type -from tianshou.env.worker import EnvWorker - -_NP_TO_CT = { - np.bool_: ctypes.c_bool, - np.uint8: ctypes.c_uint8, - np.uint16: ctypes.c_uint16, - np.uint32: ctypes.c_uint32, - np.uint64: ctypes.c_uint64, - np.int8: ctypes.c_int8, - np.int16: ctypes.c_int16, - np.int32: ctypes.c_int32, - np.int64: ctypes.c_int64, - np.float32: ctypes.c_float, - np.float64: ctypes.c_double, -} - - -class ShArray: - """Wrapper of multiprocessing Array.""" - - def __init__(self, dtype: np.generic, shape: Tuple[int]) -> None: - self.arr = Array(_NP_TO_CT[dtype.type], int(np.prod(shape))) # type: ignore - self.dtype = dtype - self.shape = shape - - def save(self, ndarray: np.ndarray) -> None: - assert isinstance(ndarray, np.ndarray) - dst = self.arr.get_obj() - dst_np = np.frombuffer(dst, - dtype=self.dtype).reshape(self.shape) # type: ignore - np.copyto(dst_np, ndarray) - - def get(self) -> np.ndarray: - obj = self.arr.get_obj() - return np.frombuffer(obj, dtype=self.dtype).reshape(self.shape) # type: ignore - - -def _setup_buf(space: gym.Space) -> Union[dict, tuple, ShArray]: - if isinstance(space, gym.spaces.Dict): - assert isinstance(space.spaces, OrderedDict) - return {k: _setup_buf(v) for k, v in space.spaces.items()} - elif isinstance(space, gym.spaces.Tuple): - assert isinstance(space.spaces, tuple) - return tuple([_setup_buf(t) for t in space.spaces]) - else: - return ShArray(space.dtype, space.shape) # type: ignore - - -def _worker( - parent: connection.Connection, - p: connection.Connection, - env_fn_wrapper: CloudpickleWrapper, - obs_bufs: Optional[Union[dict, tuple, ShArray]] = None, -) -> None: - - def _encode_obs( - obs: Union[dict, tuple, np.ndarray], buffer: Union[dict, tuple, ShArray] - ) -> None: - if isinstance(obs, np.ndarray) and isinstance(buffer, ShArray): - buffer.save(obs) - elif isinstance(obs, tuple) and isinstance(buffer, tuple): - for o, b in zip(obs, buffer): - _encode_obs(o, b) - elif isinstance(obs, dict) and isinstance(buffer, dict): - for k in obs.keys(): - _encode_obs(obs[k], buffer[k]) - return None - - parent.close() - env = env_fn_wrapper.data() - try: - while True: - try: - cmd, data = p.recv() - except EOFError: # the pipe has been closed - p.close() - break - if cmd == "step": - env_return = env.step(data) - if obs_bufs is not None: - _encode_obs(env_return[0], obs_bufs) - env_return = (None, *env_return[1:]) - p.send(env_return) - elif cmd == "reset": - obs, info = env.reset(**data) - if obs_bufs is not None: - _encode_obs(obs, obs_bufs) - obs = None - p.send((obs, info)) - elif cmd == "close": - p.send(env.close()) - p.close() - break - elif cmd == "render": - p.send(env.render(**data) if hasattr(env, "render") else None) - elif cmd == "seed": - if hasattr(env, "seed"): - p.send(env.seed(data)) - else: - env.reset(seed=data) - p.send(None) - elif cmd == "getattr": - p.send(getattr(env, data) if hasattr(env, data) else None) - elif cmd == "setattr": - setattr(env.unwrapped, data["key"], data["value"]) - else: - p.close() - raise NotImplementedError - except KeyboardInterrupt: - p.close() - - -class SubprocEnvWorker(EnvWorker): - """Subprocess worker used in SubprocVectorEnv and ShmemVectorEnv.""" - - def __init__( - self, env_fn: Callable[[], gym.Env], share_memory: bool = False - ) -> None: - self.parent_remote, self.child_remote = Pipe() - self.share_memory = share_memory - self.buffer: Optional[Union[dict, tuple, ShArray]] = None - if self.share_memory: - dummy = env_fn() - obs_space = dummy.observation_space - dummy.close() - del dummy - self.buffer = _setup_buf(obs_space) - args = ( - self.parent_remote, - self.child_remote, - CloudpickleWrapper(env_fn), - self.buffer, - ) - self.process = Process(target=_worker, args=args, daemon=True) - self.process.start() - self.child_remote.close() - super().__init__(env_fn) - - def get_env_attr(self, key: str) -> Any: - self.parent_remote.send(["getattr", key]) - return self.parent_remote.recv() - - def set_env_attr(self, key: str, value: Any) -> None: - self.parent_remote.send(["setattr", {"key": key, "value": value}]) - - def _decode_obs(self) -> Union[dict, tuple, np.ndarray]: - - def decode_obs( - buffer: Optional[Union[dict, tuple, ShArray]] - ) -> Union[dict, tuple, np.ndarray]: - if isinstance(buffer, ShArray): - return buffer.get() - elif isinstance(buffer, tuple): - return tuple([decode_obs(b) for b in buffer]) - elif isinstance(buffer, dict): - return {k: decode_obs(v) for k, v in buffer.items()} - else: - raise NotImplementedError - - return decode_obs(self.buffer) - - @staticmethod - def wait( # type: ignore - workers: List["SubprocEnvWorker"], - wait_num: int, - timeout: Optional[float] = None, - ) -> List["SubprocEnvWorker"]: - remain_conns = conns = [x.parent_remote for x in workers] - ready_conns: List[connection.Connection] = [] - remain_time, t1 = timeout, time.time() - while len(remain_conns) > 0 and len(ready_conns) < wait_num: - if timeout: - remain_time = timeout - (time.time() - t1) - if remain_time <= 0: - break - # connection.wait hangs if the list is empty - new_ready_conns = connection.wait(remain_conns, timeout=remain_time) - ready_conns.extend(new_ready_conns) # type: ignore - remain_conns = [conn for conn in remain_conns if conn not in ready_conns] - return [workers[conns.index(con)] for con in ready_conns] - - def send(self, action: Optional[np.ndarray], **kwargs: Any) -> None: - if action is None: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - self.parent_remote.send(["reset", kwargs]) - else: - self.parent_remote.send(["step", action]) - - def recv( - self - ) -> Union[gym_new_venv_step_type, Tuple[np.ndarray, dict]]: # noqa:E125 - result = self.parent_remote.recv() - if isinstance(result, tuple): - if len(result) == 2: - obs, info = result - if self.share_memory: - obs = self._decode_obs() - return obs, info - obs = result[0] - if self.share_memory: - obs = self._decode_obs() - return (obs, *result[1:]) # type: ignore - else: - obs = result - if self.share_memory: - obs = self._decode_obs() - return obs - - def reset(self, **kwargs: Any) -> Tuple[np.ndarray, dict]: - if "seed" in kwargs: - super().seed(kwargs["seed"]) - self.parent_remote.send(["reset", kwargs]) - - result = self.parent_remote.recv() - if isinstance(result, tuple): - obs, info = result - if self.share_memory: - obs = self._decode_obs() - return obs, info - else: - obs = result - if self.share_memory: - obs = self._decode_obs() - return obs - - def seed(self, seed: Optional[int] = None) -> Optional[List[int]]: - super().seed(seed) - self.parent_remote.send(["seed", seed]) - return self.parent_remote.recv() - - def render(self, **kwargs: Any) -> Any: - self.parent_remote.send(["render", kwargs]) - return self.parent_remote.recv() - - def close_env(self) -> None: - try: - self.parent_remote.send(["close", None]) - # mp may be deleted so it may raise AttributeError - self.parent_remote.recv() - self.process.join() - except (BrokenPipeError, EOFError, AttributeError): - pass - # ensure the subproc is terminated - self.process.terminate() diff --git a/speed_benchmark/vector_env.py b/speed_benchmark/vector_env.py deleted file mode 100644 index df6eb4291..000000000 --- a/speed_benchmark/vector_env.py +++ /dev/null @@ -1,78 +0,0 @@ - -# Copied from https://github.com/deepmind/open_spiel/blob/master/open_spiel/python/vector_env.py - -# Copyright 2022 DeepMind Technologies Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""A vectorized RL Environment.""" - - -class SyncVectorEnv(object): - """A vectorized RL Environment. - This environment is synchronized - games do not execute in parallel. Speedups - are realized by calling models on many game states simultaneously. - """ - - def __init__(self, envs): - if not isinstance(envs, list): - raise ValueError( - "Need to call this with a list of rl_environment.Environment objects") - self.envs = envs - - def __len__(self): - return len(self.envs) - - def observation_spec(self): - return self.envs[0].observation_spec() - - @property - def num_players(self): - return self.envs[0].num_players - - def step(self, step_outputs, reset_if_done=False): - """Apply one step. - Args: - step_outputs: the step outputs - reset_if_done: if True, automatically reset the environment - when the epsiode ends - Returns: - time_steps: the time steps, - reward: the reward - done: done flag - unreset_time_steps: unreset time steps - """ - time_steps = [ - self.envs[i].step([step_outputs[i].action]) - for i in range(len(self.envs)) - ] - reward = [step.rewards for step in time_steps] - done = [step.last() for step in time_steps] - unreset_time_steps = time_steps # Copy these because you may want to look - # at the unreset versions to extract - # information from them - - if reset_if_done: - time_steps = self.reset(envs_to_reset=done) - - return time_steps, reward, done, unreset_time_steps - - def reset(self, envs_to_reset=None): - if envs_to_reset is None: - envs_to_reset = [True for _ in range(len(self.envs))] - - time_steps = [ - self.envs[i].reset() - if envs_to_reset[i] else self.envs[i].get_time_step() - for i in range(len(self.envs)) - ] - return time_steps \ No newline at end of file From 3ce33838f0d197a3302a193169fbda553619be44 Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 00:36:41 +0900 Subject: [PATCH 16/19] . --- speed_benchmark/open_spiel_forloop.py | 2 +- speed_benchmark/open_spiel_subproc.py | 2 +- speed_benchmark/run_petting_zoo.py | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/speed_benchmark/open_spiel_forloop.py b/speed_benchmark/open_spiel_forloop.py index d51e7e5d8..adb9e6e28 100644 --- a/speed_benchmark/open_spiel_forloop.py +++ b/speed_benchmark/open_spiel_forloop.py @@ -124,4 +124,4 @@ def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() sec = time_end-time_sta - json.dumps({"game": args.env_name, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec}) + json.dumps({"game": args.env_name, "venv": "for-loop", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec}) diff --git a/speed_benchmark/open_spiel_subproc.py b/speed_benchmark/open_spiel_subproc.py index 0e2efb710..c2f8763ec 100644 --- a/speed_benchmark/open_spiel_subproc.py +++ b/speed_benchmark/open_spiel_subproc.py @@ -126,4 +126,4 @@ def random_play(env: SubprocVectorEnv, n_steps_lim: int, batch_size: int): steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() sec = time_end - time_sta - print(json.dumps({"game": args.env_name, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) + print(json.dumps({"game": args.env_name, "venv": "subproc", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) diff --git a/speed_benchmark/run_petting_zoo.py b/speed_benchmark/run_petting_zoo.py index 0b187209e..78cc76656 100644 --- a/speed_benchmark/run_petting_zoo.py +++ b/speed_benchmark/run_petting_zoo.py @@ -63,14 +63,15 @@ def random_play(env, n_steps_lim: int, batch_size: int) -> int: if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("env_name") # go, chess, tic_tac_toe + parser.add_argument("venv") # for-loop, subproc parser.add_argument("batch_size", type=int) parser.add_argument("n_steps_lim", type=int) parser.add_argument("--seed", default=0, type=int) args = parser.parse_args() assert args.n_steps_lim % args.batch_size == 0 - env = make_env(args.env_name, args.batch_size) + env = make_env(args.env_name, args.batch_size, args.venv) time_sta = time.time() steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() sec = time_end - time_sta - print(json.dumps({"game": args.env_name, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) \ No newline at end of file + print(json.dumps({"game": args.env_name, "venv": args.venv, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) \ No newline at end of file From 634f040395d44f96742d572c6ac0d3050fc44d1e Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 00:37:34 +0900 Subject: [PATCH 17/19] . --- .../{open_spiel_forloop.py => run_open_spiel_forloop.py} | 0 .../{open_spiel_subproc.py => run_open_spiel_subproc.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename speed_benchmark/{open_spiel_forloop.py => run_open_spiel_forloop.py} (100%) rename speed_benchmark/{open_spiel_subproc.py => run_open_spiel_subproc.py} (100%) diff --git a/speed_benchmark/open_spiel_forloop.py b/speed_benchmark/run_open_spiel_forloop.py similarity index 100% rename from speed_benchmark/open_spiel_forloop.py rename to speed_benchmark/run_open_spiel_forloop.py diff --git a/speed_benchmark/open_spiel_subproc.py b/speed_benchmark/run_open_spiel_subproc.py similarity index 100% rename from speed_benchmark/open_spiel_subproc.py rename to speed_benchmark/run_open_spiel_subproc.py From a6e6a51983e3ed1c8aeb8a7e0a26c3c5f0edb5ae Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 00:42:22 +0900 Subject: [PATCH 18/19] . --- speed_benchmark/install.sh | 2 +- speed_benchmark/run_pgx.py | 31 ++++++++++--------------------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/speed_benchmark/install.sh b/speed_benchmark/install.sh index d2fdf5b8a..0b99c2cc0 100755 --- a/speed_benchmark/install.sh +++ b/speed_benchmark/install.sh @@ -1 +1 @@ -python3 -m pip install pettingzoo open_spiel tianshou pygame cloudpickle chess +python3 -m pip install pettingzoo open_spiel tianshou pygame cloudpickle chess pgx diff --git a/speed_benchmark/run_pgx.py b/speed_benchmark/run_pgx.py index 96ce502a2..8f9b6ea8b 100644 --- a/speed_benchmark/run_pgx.py +++ b/speed_benchmark/run_pgx.py @@ -41,29 +41,18 @@ def benchmark(env_id: pgx.EnvId, batch_size, num_steps=(2 ** 12) * 1000): return num_steps, te - ts +games = { + "tic_tac_toe": "tic_tac_toe/v0", + "backgammon": "backgammon/v0", + "shogi": "shogi/v0", + "go": "go-19x19/v0", +} + N = (2 ** 12) * 1 -# print(f"Total # of steps: {N}") bs_list = [2 ** i for i in range(5, 13)] -# print("| env_id |" + "|".join([str(bs) for bs in bs_list]) + "|") -# print("|:---:|" + "|".join([":---:" for bs in bs_list]) + "|") d = {} -for env_id in get_args(pgx.EnvId): - # s = f"|{env_id}|" +for game, env_id in games.items(): for bs in bs_list: num_steps, sec = benchmark(env_id, bs, N) - # s += f"{n_per_sec:.05f}" - # s += "|" - print(json.dumps({"game": "/".join(env_id.split("/")[:-1]), "library": "pgx", - "total_steps": num_steps, "total_sec": sec, "steps/sec": num_steps / sec, "batch_size": bs})) - # print(s) - - -""" -Total # of steps: 409600 -| env_id |32|64|128|256|512|1024|2048|4096| -|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:| -|tic_tac_toe/v0|25193.54921|51203.80125|99197.43688|206175.78196|413948.81221|723250.23824|1664977.36893|3265886.46947| -|go-19x19/v0|15146.45769|29891.62027|58064.94882|108400.19704|173638.40814|286740.95368|379331.88909|449555.32632| -|shogi/v0|21047.48879|42130.05279|82988.81210|175415.01266|259940.79393|290410.20642|299800.69880|308552.26434| -|minatar/asterix/v0|13066.68075|25836.00751|52134.46018|102929.03752|205880.59846|384825.85566|806843.53100|1553951.76960| -""" + print(json.dumps({"game": game, "library": "pgx", + "total_steps": num_steps, "total_sec": sec, "steps/sec": num_steps / sec, "batch_size": bs})) \ No newline at end of file From 3810bb9fd0d036e14a28e3cddb365184307a516a Mon Sep 17 00:00:00 2001 From: Sotetsu KOYAMADA Date: Sat, 25 Feb 2023 01:38:44 +0900 Subject: [PATCH 19/19] . --- speed_benchmark/run_open_spiel_forloop.py | 4 ++-- speed_benchmark/run_open_spiel_subproc.py | 4 ++-- speed_benchmark/run_petting_zoo.py | 4 ++-- speed_benchmark/run_pgx.py | 5 +++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/speed_benchmark/run_open_spiel_forloop.py b/speed_benchmark/run_open_spiel_forloop.py index adb9e6e28..d85461dcf 100644 --- a/speed_benchmark/run_open_spiel_forloop.py +++ b/speed_benchmark/run_open_spiel_forloop.py @@ -115,7 +115,7 @@ def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): parser = argparse.ArgumentParser() parser.add_argument("env_name") # go, chess backgammon tic_tac_toe parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) + parser.add_argument("n_steps_lim", default=2 ** 10 * 10, type=int) parser.add_argument("--seed", default=0, type=int) args = parser.parse_args() assert args.n_steps_lim % args.batch_size == 0 @@ -124,4 +124,4 @@ def random_play(env: SyncVectorEnv, n_steps_lim: int, batch_size: int): steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() sec = time_end-time_sta - json.dumps({"game": args.env_name, "venv": "for-loop", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec}) + json.dumps({"game": args.env_name, "venv": "for-loop", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec, "batch_size": args.batch_size}) diff --git a/speed_benchmark/run_open_spiel_subproc.py b/speed_benchmark/run_open_spiel_subproc.py index c2f8763ec..fe0630ec4 100644 --- a/speed_benchmark/run_open_spiel_subproc.py +++ b/speed_benchmark/run_open_spiel_subproc.py @@ -117,7 +117,7 @@ def random_play(env: SubprocVectorEnv, n_steps_lim: int, batch_size: int): parser = argparse.ArgumentParser() parser.add_argument("env_name") parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) + parser.add_argument("n_steps_lim", default=2 ** 10 * 10, type=int) parser.add_argument("--seed", default=100, type=bool) args = parser.parse_args() assert args.n_steps_lim % args.batch_size == 0 @@ -126,4 +126,4 @@ def random_play(env: SubprocVectorEnv, n_steps_lim: int, batch_size: int): steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() sec = time_end - time_sta - print(json.dumps({"game": args.env_name, "venv": "subproc", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) + print(json.dumps({"game": args.env_name, "venv": "subproc", "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec, "batch_size": args.batch_size})) diff --git a/speed_benchmark/run_petting_zoo.py b/speed_benchmark/run_petting_zoo.py index 78cc76656..75023d52a 100644 --- a/speed_benchmark/run_petting_zoo.py +++ b/speed_benchmark/run_petting_zoo.py @@ -65,7 +65,7 @@ def random_play(env, n_steps_lim: int, batch_size: int) -> int: parser.add_argument("env_name") # go, chess, tic_tac_toe parser.add_argument("venv") # for-loop, subproc parser.add_argument("batch_size", type=int) - parser.add_argument("n_steps_lim", type=int) + parser.add_argument("n_steps_lim", default=2 ** 10 * 10, type=int) parser.add_argument("--seed", default=0, type=int) args = parser.parse_args() assert args.n_steps_lim % args.batch_size == 0 @@ -74,4 +74,4 @@ def random_play(env, n_steps_lim: int, batch_size: int) -> int: steps_num = random_play(env, args.n_steps_lim, args.batch_size) time_end = time.time() sec = time_end - time_sta - print(json.dumps({"game": args.env_name, "venv": args.venv, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec})) \ No newline at end of file + print(json.dumps({"game": args.env_name, "venv": args.venv, "library": "open_spiel", "total_steps": steps_num, "total_sec": sec, "steps/sec": steps_num/sec, "batch_size": args.batch_size})) \ No newline at end of file diff --git a/speed_benchmark/run_pgx.py b/speed_benchmark/run_pgx.py index 8f9b6ea8b..6e0bc66d2 100644 --- a/speed_benchmark/run_pgx.py +++ b/speed_benchmark/run_pgx.py @@ -48,8 +48,9 @@ def benchmark(env_id: pgx.EnvId, batch_size, num_steps=(2 ** 12) * 1000): "go": "go-19x19/v0", } -N = (2 ** 12) * 1 -bs_list = [2 ** i for i in range(5, 13)] + +N = 2 ** 10 * 10 +bs_list = [2 ** i for i in range(1, 11)] d = {} for game, env_id in games.items(): for bs in bs_list: