diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index fe496572..8ac6f373 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -14,8 +14,11 @@ jobs: pre-commit: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v3 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v4 + with: + python-version: '3.11' + - run: pip install . - run: pip install pre-commit - run: pre-commit --version - run: pre-commit install diff --git a/minari/dataset/minari_dataset.py b/minari/dataset/minari_dataset.py index 6feb0571..01ba6e02 100644 --- a/minari/dataset/minari_dataset.py +++ b/minari/dataset/minari_dataset.py @@ -49,7 +49,7 @@ def parse_dataset_id(dataset_id: str) -> tuple[str | None, str, int]: @dataclass class MinariDatasetSpec: - env_spec: EnvSpec + env_spec: Optional[EnvSpec] total_episodes: int total_steps: np.int64 dataset_id: str @@ -99,9 +99,11 @@ def __init__( metadata = self._data.metadata - env_spec = metadata["env_spec"] - assert isinstance(env_spec, str) - self._env_spec = EnvSpec.from_json(env_spec) + env_spec = metadata.get("env_spec") + if env_spec is not None: + assert isinstance(env_spec, str) + env_spec = EnvSpec.from_json(env_spec) + self._env_spec = env_spec eval_env_spec = metadata.get("eval_env_spec") if eval_env_spec is not None: @@ -150,7 +152,11 @@ def recover_environment(self, eval_env: bool = False, **kwargs) -> gym.Env: logger.info( f"`eval_env` has been set to True but the dataset {self._dataset_id} doesn't provide an evaluation environment. Instead, the environment used for collecting the data will be returned: {self._env_spec}" ) - return gym.make(self._env_spec, **kwargs) + + if self.env_spec is None: + raise ValueError("Environment cannot be recovered when env_spec is None") + + return gym.make(self.env_spec, **kwargs) def set_seed(self, seed: int): """Set seed for random episode sampling generator.""" diff --git a/minari/dataset/minari_storage.py b/minari/dataset/minari_storage.py index a7a82be8..02493bae 100644 --- a/minari/dataset/minari_storage.py +++ b/minari/dataset/minari_storage.py @@ -254,7 +254,7 @@ def update_episodes(self, episodes: Iterable[dict]): _add_episode_to_group(eps_buff, episode_group) current_steps = file.attrs["total_steps"] - assert isinstance(current_steps, np.int64) + assert isinstance(current_steps, np.integer) total_steps = current_steps + additional_steps total_episodes = len(file.keys()) @@ -273,7 +273,7 @@ def update_from_storage(self, storage: MinariStorage): with h5py.File(self._file_path, "a", track_order=True) as file: last_episode_id = file.attrs["total_episodes"] - assert isinstance(last_episode_id, np.int64) + assert isinstance(last_episode_id, np.integer) storage_total_episodes = storage.total_episodes for id in range(storage.total_episodes): @@ -294,7 +294,7 @@ def update_from_storage(self, storage: MinariStorage): "total_episodes", last_episode_id + storage_total_episodes ) total_steps = file.attrs["total_steps"] - assert isinstance(total_steps, np.int64) + assert isinstance(total_steps, np.integer) file.attrs.modify("total_steps", total_steps + storage.total_steps) storage_metadata = storage.metadata @@ -316,19 +316,19 @@ def data_path(self) -> PathLike: return os.path.dirname(self._file_path) @property - def total_episodes(self) -> np.int64: + def total_episodes(self) -> np.integer: """Total episodes in the dataset.""" with h5py.File(self._file_path, "r") as file: total_episodes = file.attrs["total_episodes"] - assert isinstance(total_episodes, np.int64) + assert isinstance(total_episodes, np.integer) return total_episodes @property - def total_steps(self) -> np.int64: + def total_steps(self) -> np.integer: """Total steps in the dataset.""" with h5py.File(self._file_path, "r") as file: total_steps = file.attrs["total_steps"] - assert isinstance(total_steps, np.int64) + assert isinstance(total_steps, np.integer) return total_steps @property diff --git a/minari/utils.py b/minari/utils.py index ed451831..7d89cdf1 100644 --- a/minari/utils.py +++ b/minari/utils.py @@ -150,7 +150,9 @@ def combine_minari_version_specifiers(specifier_set: SpecifierSet) -> SpecifierS return final_version_specifier -def validate_datasets_to_combine(datasets_to_combine: List[MinariDataset]) -> EnvSpec: +def validate_datasets_to_combine( + datasets_to_combine: List[MinariDataset], +) -> EnvSpec | None: """Check if the given datasets can be combined. Tests if the datasets were created with the same environment (`env_spec`) and re-calculates the @@ -163,36 +165,42 @@ def validate_datasets_to_combine(datasets_to_combine: List[MinariDataset]) -> En Returns: combined_dataset_env_spec (EnvSpec): the resulting EnvSpec of combining the MinariDatasets + """ - assert all(isinstance(dataset, MinariDataset) for dataset in datasets_to_combine), f"Some of the datasets to combine are not of type {MinariDataset}" - - # Check if there are any `None` max_episode_steps - if any( - (dataset.spec.env_spec.max_episode_steps is None) - for dataset in datasets_to_combine - ): - max_episode_steps = None - else: - max_episode_steps = max( - dataset.spec.env_spec.max_episode_steps for dataset in datasets_to_combine - ) + # get first among the dataset's env_spec which is not None + first_not_none_env_spec = next((dataset.spec.env_spec for dataset in datasets_to_combine if dataset.spec.env_spec is not None), None) - combine_env_spec = [] - for dataset in datasets_to_combine: - dataset_env_spec = copy.deepcopy(dataset.spec.env_spec) - dataset_env_spec.max_episode_steps = max_episode_steps - combine_env_spec.append(dataset_env_spec) + # early return where all datasets have no env_spec + if first_not_none_env_spec is None: + return None + + common_env_spec = copy.deepcopy(first_not_none_env_spec) - assert all( - env_spec == combine_env_spec[0] for env_spec in combine_env_spec - ), "The datasets to be combined have different values for `env_spec` attribute." + # updating the common_env_spec's max_episode_steps & checking equivalence of all env specs + for dataset in datasets_to_combine: + assert isinstance(dataset, MinariDataset) + env_spec = dataset.spec.env_spec + if env_spec is not None: + if ( + common_env_spec.max_episode_steps is None + or env_spec.max_episode_steps is None + ): + common_env_spec.max_episode_steps = None + else: + common_env_spec.max_episode_steps = max( + common_env_spec.max_episode_steps, env_spec.max_episode_steps + ) + # setting max_episode_steps in object's copy to same value for sake of checking equality + env_spec_copy = copy.deepcopy(env_spec) + env_spec_copy.max_episode_steps = common_env_spec.max_episode_steps + if env_spec_copy != common_env_spec: + raise ValueError( + "The datasets to be combined have different values for `env_spec` attribute." + ) + else: + raise ValueError("Cannot combine datasets having env_spec with those having no env_spec.") - # Check that all datasets have the same action/observation space - if any(dataset.action_space != datasets_to_combine[0].action_space for dataset in datasets_to_combine): - raise ValueError("The datasets to combine must have the same action space.") - if any(dataset.observation_space != datasets_to_combine[0].observation_space for dataset in datasets_to_combine): - raise ValueError("The datasets to combine must have the same observation space.") - return combine_env_spec[0] + return common_env_spec class RandomPolicy: @@ -301,7 +309,7 @@ def get_average_reference_score( for _ in range(num_episodes): while True: action = policy(obs) - obs, _, terminated, truncated, info = env.step(action) + obs, _, terminated, truncated, info = env.step(action) # pyright: ignore[reportGeneralTypeIssues] if terminated or truncated: episode_returns.append(info["episode"]["r"]) obs, _ = env.reset() @@ -326,8 +334,8 @@ def _generate_dataset_path(dataset_id: str) -> str | os.PathLike: def _generate_dataset_metadata( - env_spec: EnvSpec, dataset_id: str, + env_spec: Optional[EnvSpec], eval_env: Optional[str | gym.Env | EnvSpec], algorithm_name: Optional[str], author: Optional[str], @@ -403,7 +411,7 @@ def _generate_dataset_metadata( if eval_env is None: warnings.warn( f"`eval_env` is set to None. If another environment is intended to be used for evaluation please specify corresponding Gymnasium environment (gym.Env | gym.envs.registration.EnvSpec).\ - If None the environment used to collect the data (`env={env_spec}`) will be used for this purpose.", + If None the environment used to collect the data (`env={env_spec}`) will be used for this purpose.", UserWarning, ) eval_env_spec = env_spec @@ -421,7 +429,13 @@ def _generate_dataset_metadata( assert eval_env_spec is not None dataset_metadata["eval_env_spec"] = eval_env_spec.to_json() - if expert_policy is not None or ref_max_score is not None: + if env_spec is None: + warnings.warn( + "env_spec is None, no environment spec is provided during collection for this dataset", + UserWarning, + ) + + if eval_env_spec is not None and (expert_policy is not None or ref_max_score is not None): env_ref_score = gym.make(eval_env_spec) if ref_min_score is None: ref_min_score = get_average_reference_score( @@ -441,8 +455,8 @@ def _generate_dataset_metadata( def create_dataset_from_buffers( dataset_id: str, - env: str | gym.Env | EnvSpec, buffer: List[Dict[str, Union[list, Dict]]], + env: Optional[str | gym.Env | EnvSpec] = None, eval_env: Optional[str | gym.Env | EnvSpec] = None, algorithm_name: Optional[str] = None, author: Optional[str] = None, @@ -473,10 +487,10 @@ def create_dataset_from_buffers( Args: dataset_id (str): name id to identify Minari dataset. - env (str|gym.Env|EnvSpec): Gymnasium environment(gym.Env)/environment id(str)/environment spec(EnvSpec) used to collect the buffer data. buffer (list[Dict[str, Union[list, Dict]]]): list of episode dictionaries with data. + env (Optional[str|gym.Env|EnvSpec]): Gymnasium environment(gym.Env)/environment id(str)/environment spec(EnvSpec) used to collect the buffer data. Defaults to None. eval_env (Optional[str|gym.Env|EnvSpec]): Gymnasium environment(gym.Env)/environment id(str)/environment spec(EnvSpec) to use for evaluation with the dataset. After loading the dataset, the environment can be recovered as follows: `MinariDataset.recover_environment(eval_env=True). - If None the `env` used to collect the buffer data should be used for evaluation. + If None, and if the `env` used to collect the buffer data is available, latter will be used for evaluation. algorithm_name (Optional[str], optional): name of the algorithm used to collect the data. Defaults to None. author (Optional[str], optional): author that generated the dataset. Defaults to None. author_email (Optional[str], optional): email of the author that generated the dataset. Defaults to None. @@ -503,11 +517,25 @@ def create_dataset_from_buffers( env_spec = env elif isinstance(env, gym.Env): env_spec = env.spec + elif env is None: + if observation_space is None or action_space is None: + raise ValueError("Both observation space and action space must be provided, if env is None") + env_spec = None else: - raise ValueError("The `env` argument must be of types str|EnvSpec|gym.Env") + raise ValueError("The `env` argument must be of types str|EnvSpec|gym.Env|None") + + if isinstance(env, (str, EnvSpec)): + env = gym.make(env) + if observation_space is None: + assert isinstance(env, gym.Env) + observation_space = env.observation_space + if action_space is None: + assert isinstance(env, gym.Env) + action_space = env.action_space + metadata = _generate_dataset_metadata( - env_spec, dataset_id, + env_spec, eval_env, algorithm_name, author, @@ -575,8 +603,8 @@ def create_dataset_from_collector_env( assert collector_env.datasets_path is not None dataset_path = _generate_dataset_path(dataset_id) metadata: Dict[str, Any] = _generate_dataset_metadata( - copy.deepcopy(collector_env.env.spec), dataset_id, + copy.deepcopy(collector_env.env.spec), eval_env, algorithm_name, author, diff --git a/tests/common.py b/tests/common.py index bee6fa68..4f8ee6a5 100644 --- a/tests/common.py +++ b/tests/common.py @@ -1,6 +1,8 @@ +import copy import sys import unicodedata -from typing import Any, Iterable, List, Optional, Union +from collections import OrderedDict +from typing import Any, Dict, Iterable, List, Optional, Union import gymnasium as gym import numpy as np @@ -627,3 +629,63 @@ def check_episode_data_integrity( assert episode.total_timesteps == len(episode.rewards) assert episode.total_timesteps == len(episode.terminations) assert episode.total_timesteps == len(episode.truncations) + + +def _space_subset_helper(entry: Dict): + + return OrderedDict( + { + "component_2": OrderedDict( + {"subcomponent_2": entry["component_2"]["subcomponent_2"]} + ) + } + ) + + +def get_sample_buffer_for_dataset_from_env(env, num_episodes=10): + + buffer = [] + observations = [] + actions = [] + rewards = [] + terminations = [] + truncations = [] + + observation, info = env.reset(seed=42) + + # Step the environment, DataCollectorV0 wrapper will do the data collection job + observation, _ = env.reset() + observations.append(_space_subset_helper(observation)) + for episode in range(num_episodes): + terminated = False + truncated = False + + while not terminated and not truncated: + action = env.action_space.sample() # User-defined policy function + observation, reward, terminated, truncated, _ = env.step(action) + observations.append(_space_subset_helper(observation)) + actions.append(_space_subset_helper(action)) + rewards.append(reward) + terminations.append(terminated) + truncations.append(truncated) + + episode_buffer = { + "observations": copy.deepcopy(observations), + "actions": copy.deepcopy(actions), + "rewards": np.asarray(rewards), + "terminations": np.asarray(terminations), + "truncations": np.asarray(truncations), + } + + buffer.append(episode_buffer) + + observations.clear() + actions.clear() + rewards.clear() + terminations.clear() + truncations.clear() + + observation, _ = env.reset() + observations.append(_space_subset_helper(observation)) + + return buffer diff --git a/tests/utils/test_dataset_combine.py b/tests/utils/test_dataset_combine.py index 0e9045f2..e14ac045 100644 --- a/tests/utils/test_dataset_combine.py +++ b/tests/utils/test_dataset_combine.py @@ -1,13 +1,16 @@ from typing import Optional import gymnasium as gym +import numpy as np import pytest +from gymnasium import spaces from gymnasium.utils.env_checker import data_equivalence from packaging.specifiers import SpecifierSet import minari from minari import DataCollectorV0, MinariDataset from minari.utils import combine_datasets, combine_minari_version_specifiers +from tests.common import get_sample_buffer_for_dataset_from_env def _check_env_recovery(gymnasium_environment: gym.Env, dataset: MinariDataset): @@ -103,6 +106,52 @@ def _generate_dataset_with_collector_env( env.close() +def _generate_dataset_without_env(dataset_id: str, num_episodes: int = 10): + """Helper function to create tmp dataset without an env to use for testing combining. + + Args: + dataset_id (str): name of the generated Minari dataset + num_episodes (int): number of episodes in the generated dataset + """ + buffer = [] + action_space_subset = spaces.Dict( + { + "component_2": spaces.Dict( + { + "subcomponent_2": spaces.Box(low=4, high=5, dtype=np.float32), + } + ), + } + ) + observation_space_subset = spaces.Dict( + { + "component_2": spaces.Dict( + { + "subcomponent_2": spaces.Box(low=4, high=5, dtype=np.float32), + } + ), + } + ) + + env = gym.make("DummyDictEnv-v0") + buffer = get_sample_buffer_for_dataset_from_env(env, num_episodes) + + # Create Minari dataset and store locally + dataset = minari.create_dataset_from_buffers( + dataset_id=dataset_id, + buffer=buffer, + env=None, + algorithm_name="random_policy", + code_permalink="https://github.com/Farama-Foundation/Minari/blob/main/tests/utils/test_dataset_combine.py", + author="WillDudley", + author_email="wdudley@farama.org", + action_space=action_space_subset, + observation_space=observation_space_subset, + ) + assert isinstance(dataset, MinariDataset) + env.close() + + def test_combine_datasets(): num_datasets, num_episodes = 5, 10 test_datasets_ids = [f"cartpole-test-{i}-v0" for i in range(num_datasets)] @@ -128,7 +177,7 @@ def test_combine_datasets(): assert list(combined_dataset.spec.combined_datasets) == test_datasets_ids assert combined_dataset.spec.total_episodes == num_datasets * num_episodes assert combined_dataset.spec.total_steps == sum( - d.spec.total_steps for d in test_datasets + int(d.spec.total_steps) for d in test_datasets ) _check_env_recovery(gym.make("CartPole-v1"), combined_dataset) @@ -163,6 +212,7 @@ def test_combine_datasets(): combined_dataset = combine_datasets( test_datasets, new_dataset_id="cartpole-combined-test-v0" ) + assert combined_dataset.spec.env_spec is not None assert combined_dataset.spec.env_spec.max_episode_steps is None _check_load_and_delete_dataset("cartpole-combined-test-v0") @@ -172,6 +222,7 @@ def test_combine_datasets(): combined_dataset = combine_datasets( test_datasets, new_dataset_id="cartpole-combined-test-v0" ) + assert combined_dataset.spec.env_spec is not None assert combined_dataset.spec.env_spec.max_episode_steps == 10 _check_load_and_delete_dataset("cartpole-combined-test-v0") diff --git a/tests/utils/test_dataset_creation.py b/tests/utils/test_dataset_creation.py index 8e234ee8..8f0f21f3 100644 --- a/tests/utils/test_dataset_creation.py +++ b/tests/utils/test_dataset_creation.py @@ -1,6 +1,4 @@ import copy -from collections import OrderedDict -from typing import Dict import gymnasium as gym import numpy as np @@ -14,6 +12,7 @@ check_env_recovery, check_env_recovery_with_subset_spaces, check_load_and_delete_dataset, + get_sample_buffer_for_dataset_from_env, register_dummy_envs, ) @@ -173,9 +172,9 @@ def test_generate_dataset_with_external_buffer(dataset_id, env_id): # Create Minari dataset and store locally dataset = minari.create_dataset_from_buffers( dataset_id=dataset_id, + buffer=buffer, env=env_dataset_id, eval_env=eval_env_dataset_id, - buffer=buffer, algorithm_name="random_policy", code_permalink=CODELINK, author="WillDudley", @@ -196,20 +195,9 @@ def test_generate_dataset_with_external_buffer(dataset_id, env_id): eval_env.close() -def _space_subset_helper(entry: Dict): - - return OrderedDict( - { - "component_2": OrderedDict( - {"subcomponent_2": entry["component_2"]["subcomponent_2"]} - ) - } - ) - - -def test_generate_dataset_with_space_subset_external_buffer(): - """Test create dataset from external buffers without using DataCollectorV0.""" - buffer = [] +@pytest.mark.parametrize("is_env_needed", [True, False]) +def test_generate_dataset_with_space_subset_external_buffer(is_env_needed): + """Test create dataset from external buffers without using DataCollectorV0 or environment.""" dataset_id = "dummy-dict-test-v0" # delete the test dataset if it already exists @@ -238,57 +226,15 @@ def test_generate_dataset_with_space_subset_external_buffer(): minari.delete_dataset(dataset_id) env = gym.make("DummyDictEnv-v0") - - observations = [] - actions = [] - rewards = [] - terminations = [] - truncations = [] - num_episodes = 10 - - observation, info = env.reset(seed=42) - - # Step the environment, DataCollectorV0 wrapper will do the data collection job - observation, _ = env.reset() - observations.append(_space_subset_helper(observation)) - for episode in range(num_episodes): - terminated = False - truncated = False - - while not terminated and not truncated: - action = env.action_space.sample() # User-defined policy function - observation, reward, terminated, truncated, _ = env.step(action) - observations.append(_space_subset_helper(observation)) - actions.append(_space_subset_helper(action)) - rewards.append(reward) - terminations.append(terminated) - truncations.append(truncated) - - episode_buffer = { - "observations": copy.deepcopy(observations), - "actions": copy.deepcopy(actions), - "rewards": np.asarray(rewards), - "terminations": np.asarray(terminations), - "truncations": np.asarray(truncations), - } - - buffer.append(episode_buffer) - - observations.clear() - actions.clear() - rewards.clear() - terminations.clear() - truncations.clear() - - observation, _ = env.reset() - observations.append(_space_subset_helper(observation)) + buffer = get_sample_buffer_for_dataset_from_env(env, num_episodes) # Create Minari dataset and store locally + env_to_pass = env if is_env_needed else None dataset = minari.create_dataset_from_buffers( dataset_id=dataset_id, - env=env, buffer=buffer, + env=env_to_pass, algorithm_name="random_policy", code_permalink=CODELINK, author="WillDudley", @@ -309,9 +255,10 @@ def test_generate_dataset_with_space_subset_external_buffer(): assert len(dataset.episode_indices) == num_episodes check_data_integrity(dataset.storage, dataset.episode_indices) - check_env_recovery_with_subset_spaces( - env, dataset, action_space_subset, observation_space_subset - ) + if is_env_needed: + check_env_recovery_with_subset_spaces( + env, dataset, action_space_subset, observation_space_subset + ) env.close()