Skip to content

Commit

Permalink
Add my own lander abstract implementation for lab
Browse files Browse the repository at this point in the history
And notes from the reinforcement learning lunar lab

Change-Id: Ie57bc976c4def6205e57039f74f28ba3ac46a425
  • Loading branch information
emirkmo committed Jan 21, 2024
1 parent 9103b83 commit 7937bb8
Show file tree
Hide file tree
Showing 2 changed files with 290 additions and 1 deletion.
11 changes: 10 additions & 1 deletion Course3/Notes/reinforcement_learning.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ you guess it, and then you update it based on the Bellman equation.
So assuming first y is y1 corresponding to result from s1 and a1:

$y_1 = R(s_1) + \gamma \max_{a'} Q(s_1', a')$
h
$y_2 = R(s_2) + \gamma \max_{a'} Q(s_2', a')$

To train neural network, we take training sample of x data, where y are just
Expand All @@ -147,5 +148,13 @@ This iteratively improves the Q function, making the NN a good estimate of Q(s,

One could imagine creating agents that start at random, and start improving, but we pick only the ones that improve the most and add a few random evolutions to the mix. This is called a genetic algorithm.

### Algorithm refinements
### Algorithmic Instability

The practice of training a neural network to approximate Q(s, a) is unstable and
prone to oscillations and instabilities.

However, there are a few approaches to make it more stable. The lab starts with two:
**Target Network** and **Experience Replay**.
Target Network is a technique in which we use two networks instead of one.
The first network is the one we are training, and the second network is the one we use to compute the target values.
The target values are computed using the second network, and the loss is computed using the first network. The second network is updated to match the first network every N steps. Furthermore, the target network update is damped, which is also called **soft update**. Experience Replay is a technique in which we store the training samples in a buffer and sample from the buffer to train the network. This helps to avoid overfitting to the most recent samples.
280 changes: 280 additions & 0 deletions Course3/lunar_lander_lab/lander.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"\"\"\"The lunar lander lab uses the Actor Environment formalism, where\n",
"the actor takes an action and the environment evaluates the action\n",
"to get a new observation state and reward. These files are my attempt\n",
"at constructing a readable and extensible codebase around the lander\n",
"and formalism. I have not actually implemented the lab, but the point\n",
"of this exercsie was to think of and explore ways to productionalize\n",
"code for data science.\n",
"\n",
"The code is a bit over-abstracted on purpose. To provide flexibility and\n",
"to test the abstractions which I may want to use. \n",
"\"\"\"\n",
"from enum import Enum\n",
"from dataclasses import dataclass\n",
"from typing import Collection, Callable, Protocol, TypeAlias\n",
"\n",
"class Action(Enum):\n",
" do_nothing = 0\n",
" fire_main_engine = 1\n",
" fire_left_engine = 2\n",
" fire_right_engine = 3\n",
"\n",
"\n",
"\n",
"@dataclass\n",
"class State:\n",
" \"\"\"Observation state of the lunar lander\"\"\"\n",
" x: float = 0\n",
" y: float = 0\n",
" x_velocity: float = 0\n",
" y_velocity: float = 0\n",
" angle: float = 0\n",
" angular_velocity: float = 0\n",
" left_leg_contact: bool = False\n",
" right_leg_contact: bool = False\n",
"\n",
" def step(self) -> None:\n",
" \"\"\"Step the state forward in time\"\"\"\n",
" self.y += self.y_velocity\n",
" self.x += self.x_velocity\n",
" self.angle += self.angular_velocity\n",
"\n",
"# This is the ideal state we want to reach\n",
"# However we could still be successful if we land but at an angle\n",
"# or have some residual velocity...\n",
"desired_state = State(\n",
" x=0,\n",
" y=0,\n",
" x_velocity=0,\n",
" y_velocity=0,\n",
" angle=0,\n",
" angular_velocity=0,\n",
" left_leg_contact=True,\n",
" right_leg_contact=True\n",
")\n",
"\n",
"SurfaceFunction: TypeAlias = Callable[[float], float]\n",
"\n",
"def flat_surface(x: float) -> float:\n",
" \"\"\"A flat surface function\"\"\"\n",
" return 0.2\n",
"\n",
"class BoundaryStates(Enum):\n",
" \"\"\"States that are considered boundary conditions or the default\"\"\"\n",
" flying = 0 # default state\n",
" landed = 0\n",
" crashed = 1\n",
" left_screen = 2\n",
"\n",
"class BoundsCheck(Protocol):\n",
" \"\"\"Protocol for determining failure or success states,\n",
" which can be thought of as boundary conditions on the state space.\"\"\"\n",
" def __call__(self, state: State) -> BoundaryStates:\n",
" ...\n",
"\n",
"@dataclass\n",
"class MoonBounds(BoundsCheck):\n",
" \"\"\"Bounds of the moon\"\"\"\n",
" surface_func: SurfaceFunction = flat_surface\n",
" desired_state: State = desired_state\n",
"\n",
" def __call__(self, state: State) -> BoundaryStates:\n",
" \"\"\"Get the boundary condition for the current state\"\"\"\n",
" if self.crashed(state):\n",
" return BoundaryStates.crashed\n",
" if self.left_screen(state):\n",
" return BoundaryStates.left_screen\n",
" if self.landed(state):\n",
" return BoundaryStates.landed\n",
" return BoundaryStates.flying\n",
" \n",
" def landed(self, state: State) -> bool:\n",
" \"\"\"Whether we (safely) landed. Unsafe landing is when we land\n",
" at too much of an angle and/or with too much velocity. Note:\n",
" Currently this is treated just like not landing at all.\"\"\"\n",
" desired_state = self.desired_state\n",
" current_state = state\n",
" return current_state.x == desired_state.x and \\\n",
" current_state.y == desired_state.y and \\\n",
" current_state.left_leg_contact == desired_state.left_leg_contact and \\\n",
" current_state.right_leg_contact == desired_state.right_leg_contact and \\\n",
" current_state.x_velocity <= desired_state.x_velocity and \\\n",
" current_state.y_velocity <= desired_state.y_velocity and \\\n",
" current_state.angle <= abs(desired_state.angle) and \\\n",
" current_state.angular_velocity <= desired_state.angular_velocity\n",
"\n",
" \n",
" def crashed(self, state: State) -> bool:\n",
" \"\"\"Whether we crashed. We crash if we hit the moon surface.\n",
" The surface is defined by a function that takes the x coordinate\n",
" and returns the y coordinate of the surface.\"\"\"\n",
" return state.y <= self.surface_func(state.x)\n",
" \n",
" def left_screen(self, state: State) -> bool:\n",
" \"\"\"Whether we are still in bounds. We are out of bounds if we\n",
" are outside of the x bounds of the screen.\"\"\"\n",
" return 0 <= state.x <= 1\n",
" \n",
"\n",
"\n",
"class RewardAssignment(Protocol):\n",
" \"\"\"Protocol for assigning rewards to states. Allowing for different\n",
" reward functions both for different states and for boundary conditions\n",
" (landed, crashed, left screen, etc.)\"\"\"\n",
" def __call__(self, state: State, boundary_state: BoundaryStates) -> float:\n",
" ...\n",
" \n",
"@dataclass\n",
"class Reward:\n",
" \"\"\"Since the reward function is coupled to the boundary conditions\n",
" and state, define a class that takes in state and bounds, and provides\n",
" a callable as the overall reward function, implementing specifics as\n",
" needed.\"\"\"\n",
" observation_state_reward: RewardAssignment\n",
" collision_penalty: float = -100\n",
" screen_penalty: float = -100\n",
" done_reward: float = 100\n",
"\n",
" def __call__(self, state: State, boundary_state: BoundaryStates = BoundaryStates.flying) -> float:\n",
" \"\"\"Get the reward for the current state\"\"\"\n",
"\n",
" # Assuming failure boundary condition rewards invalidate other\n",
" # state dependent rewards.\n",
" if boundary_state is BoundaryStates.crashed:\n",
" return self.collision_penalty\n",
" if boundary_state is BoundaryStates.left_screen:\n",
" return self.screen_penalty\n",
" reward: float = 0\n",
" if boundary_state is BoundaryStates.landed:\n",
" reward += self.done_reward\n",
"\n",
" # @TODO: implement flying observation state dependent rewards\n",
" reward += self.observation_state_reward(state, boundary_state)\n",
" return reward \n",
" \n",
"@dataclass\n",
"class EngineActions:\n",
" \"\"\"State of the actions being taken for the engines, (here\n",
" we make no assumption of one action at a time.)\"\"\"\n",
" main: bool = False\n",
" left: bool = False\n",
" right: bool = False\n",
" \n",
" def get_actions(self) -> set[Action]:\n",
" \"\"\"Get the actions that are currently being taken, we use\n",
" set since order must not matter.\"\"\"\n",
" actions: set[Action] = set()\n",
" if self.main:\n",
" actions.add(Action.fire_main_engine)\n",
" if self.left:\n",
" actions.add(Action.fire_left_engine)\n",
" if self.right:\n",
" actions.add(Action.fire_right_engine)\n",
" if not actions: # if we are not doing anything...\n",
" actions.add(Action.do_nothing)\n",
" return actions\n",
" \n",
"\n",
"class Policy(Protocol):\n",
" \"\"\"Protocol for defining policies\"\"\"\n",
" def __call__(self, state: State) -> Action:\n",
" ...\n",
"\n",
"class StateAction(Protocol):\n",
" \"\"\"Protocol for defining state modification based on action\"\"\"\n",
" def __call__(self, state: State, action: Action) -> State:\n",
" ...\n",
"\n",
"\n",
"\n",
"def modify_state_with_action(state: State, action: Action) -> State:\n",
" \"\"\"Modify the state with the given action (in place modification)\"\"\"\n",
" match action:\n",
" case Action.do_nothing:\n",
" pass\n",
" case Action.fire_main_engine:\n",
" state.y_velocity += 0.1\n",
" case Action.fire_left_engine:\n",
" state.x_velocity -= 0.05\n",
" state.angular_velocity -= 0.05\n",
" case Action.fire_right_engine:\n",
" state.x_velocity += 0.05\n",
" state.angular_velocity += 0.05\n",
" gravity = -0.00 # Assuming negligible gravity\n",
" state.y_velocity += gravity\n",
" state.step()\n",
" return state\n",
"\n",
"\n",
"\n",
"@dataclass\n",
"class Agent:\n",
" \"\"\"We've leaked the abstraction a bit, as we communicate over state\n",
" instead of actions.\"\"\"\n",
" previous_action: Action = Action.do_nothing\n",
" current_state: State = State()\n",
" policy: str = \"SimplePolicy\" # TODO: implement policy\n",
" state_action: StateAction = modify_state_with_action\n",
"\n",
" def take_action(self, action: Action) -> State:\n",
" \"\"\"Take an action in the environment\"\"\"\n",
" # Technically we modify in place but we are being explicit\n",
" # especially good if we change to copy on write\n",
" self.current_state = self.state_action(self.current_state, action)\n",
" self.previous_action = action\n",
" return self.current_state\n",
"\n",
"\n",
"@dataclass\n",
"class Environment:\n",
" agent: Agent\n",
" bounds: BoundsCheck\n",
" reward: RewardAssignment #= Reward()\n",
" surface: SurfaceFunction = flat_surface\n",
" current_boundary_state: BoundaryStates = BoundaryStates.flying\n",
" done_boundary_state: BoundaryStates = BoundaryStates.landed\n",
" \n",
" def step(self, action: Action) -> tuple[State, float, bool]:\n",
" \"\"\"Take a step in the environment\"\"\"\n",
" # This is a leaky abstraction; we are communicating over state.\n",
" # We should instead communicate over actions and determine the\n",
" # state based on the action reported by the agent and its previous\n",
" # state!\n",
" unresolved_state = self.agent.take_action(action)\n",
"\n",
" # Evaluate leg contact\n",
" if (unresolved_state.y - self.surface(unresolved_state.x)) <= 0.01:\n",
" unresolved_state.left_leg_contact = True\n",
" unresolved_state.right_leg_contact = True\n",
"\n",
" # Update state and boundary state.\n",
" current_state = unresolved_state\n",
" self.current_boundary_state = self.bounds(current_state)\n",
"\n",
" reward = self.reward(current_state, self.current_boundary_state)\n",
" done = self.bounds(current_state) is self.done_boundary_state\n",
" return current_state, reward, done\n",
" \n",
"\n",
"\n",
"\n",
"agent = \"lander\"\n"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

0 comments on commit 7937bb8

Please sign in to comment.