Skip to content

Commit

Permalink
Clean up some code in games and powersystem (#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrapin authored Dec 30, 2019
1 parent beab7e9 commit db6878e
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 184 deletions.
265 changes: 131 additions & 134 deletions nevergrad/functions/games/game.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
# Discussions with Tristan Cazenave, Bruno Bouzy, have been helpful.
# Dagstuhl's 2019 seminar on games has been helpful (seminar 19511).

import functools
import numpy as np
from nevergrad import instrumentation as inst
from nevergrad.instrumentation.multivariables import Instrumentation


class _Game:
Expand All @@ -18,11 +18,11 @@ def __init__(self):
self.history2 = []
self.batawaf = False
self.converter = {
"flip": lambda p1, p2: self.flip_play_game(p1, p2),
"batawaf": lambda p1, p2: self.war_play_game(p1, p2, batawaf=True),
"war": lambda p1, p2: self.war_play_game(p1, p2),
"guesswho": lambda p1, p2: self.guesswho_play_game(p1, p2),
"bigguesswho": lambda p1, p2: self.guesswho_play_game(p1, p2, init=96),
"flip": self.flip_play_game,
"batawaf": functools.partial(self.war_play_game, batawaf=True),
"war": self.war_play_game,
"guesswho": self.guesswho_play_game,
"bigguesswho": functools.partial(self.guesswho_play_game, init=96),
}

def get_list_of_games(self):
Expand All @@ -31,11 +31,10 @@ def get_list_of_games(self):
# If both policies are None, then we return the length of a policy (which is a list of float).
# Otherwise we return 1 if policy1 wins, 2 if policy2 wins, and 0 in case of draw.
def play_game(self, game, policy1=None, policy2=None):
# pylint: disable=too-many-return-statements
self.history1 = []
self.history2 = []
if game not in self.converter.keys():
raise NotImplementedError(game)
raise NotImplementedError(f"{game} is not implemented, choose among: {list(self.converter.keys())}")
return self.converter[game](policy1, policy2)

def guesswho_play_noturn(self, decks, policy):
Expand All @@ -54,7 +53,7 @@ def guesswho_play_noturn(self, decks, policy):
+ policy[2] * late * difference / (1 + decks[0])
+ policy[3] * late * difference / (1 + decks[1])
)
except:
except Exception: # pylint: disable=broad-except
return baseline

def guesswho_play(self, policy, decks, turn):
Expand Down Expand Up @@ -170,11 +169,11 @@ def flip_play_game_nosym(self, policy1, policy2):
del cards1[0]
del cards2[0]
something_moves = True
# print "=========="
# print visible1 + [nan] + [len(visible1) + len(cards1)]
# print stack
# print visible2 + [nan] + [len(visible2) + len(cards2)]
# print "=========="
# print "=========="
# print visible1 + [nan] + [len(visible1) + len(cards1)]
# print stack
# print visible2 + [nan] + [len(visible2) + len(cards2)]
# print "=========="
return 1 if len(visible1) < len(visible2) else 2 if len(visible2) < len(visible1) else 0

def flip_value(self, visible1, visible2, l1, l2, stack, policy1):
Expand All @@ -195,116 +194,117 @@ def flip_value(self, visible1, visible2, l1, l2, stack, policy1):
value += policy1[i * 57 + j] * state[i] * state[j]
return value

def phantomgo_choose(self, policy, history):
if policy is not None and history < len(policy):
state = np.random.RandomState(hash(policy[history]))
result = state.randint(board.NN)
return result
else:
result = np.random.randint(board.NN)
return result

def phantomgo_play_game(self, policy1, policy2, size=7):
if policy1 is None and policy2 is None:
return 20000
if np.random.uniform(0., 1.) < .5:
result = self.internal_phantomgo_play_game(policy2, policy1, size)
return 1 if result == 2 else 2 if result == 1 else 0

return self.internal_phantomgo_play_game(policy1, policy2, size)

def internal_phantomgo_play_game(self, policy1, policy2, size):
# pylint: disable=too-many-locals
# Empty board.
if size == 7:
EMPTY_BOARD = EMPTY_BOARD7
Position = Position7
if size == 9:
EMPTY_BOARD = EMPTY_BOARD9
Position = Position9
if size == 19:
EMPTY_BOARD = EMPTY_BOARD19
Position = Position19

p = Position(board=EMPTY_BOARD, ko=None)

mixing = 3 # We mix 3 policies. Please note that we also apply a 8-fold random rotation/symmetry.
if policy1 is not None:
init = np.random.randint(mixing)
policy1 = [policy1[i] for i in range(init, len(policy1), mixing)]
if policy2 is not None:
init = np.random.randint(mixing)
policy2 = [policy2[i] for i in range(init, len(policy2), mixing)]

history1 = 1 # Player 1 starts.
history2 = 2

random_rot = np.random.randint(8)

def transformation(x):
a = x // size
b = x % size
if random_rot == 0:
a = size - 1 - a
b = size - 1 - b
elif random_rot == 1:
b = size - 1 - b
elif random_rot == 2:
a = size - 1 - a
b = size - 1 - b
elif random_rot == 3:
b = size - 1 - b
elif random_rot == 4:
c = b
b = a
a = c
a = size - 1 - a
elif random_rot == 5:
c = b
b = a
a = c
elif random_rot == 6:
c = b
b = a
a = c
a = size - 1 - a
elif random_rot == 7:
c = b
b = a
a = c
return a * size + b

# pylint: disable=broad-except
for _ in range(2 * size * size - size - 1):
# print("move " + str(idx))
# print("=================")
# print(str(p))
# print("=================")
for _ in range((size * size - 9) // 2):
try:
move1 = transformation(self.phantomgo_choose(policy1, history1))
# print("player1 trying ", move1)
p = p.play_move(move1, board.BLACK)
history1 = 2 * history1 + 1 # legal move.
break
except Exception:
# print("failed!" + str(e))
history1 = 2 * history1 + 2 # illegal move.
# print("=================")
# print(p)
# print("=================")
for _ in range((size * size - 9) // 2):
try:
move2 = self.phantomgo_choose(policy2, history2)
# print("player 2 trying ", move2)
p = p.play_move(move2, board.WHITE)
history2 = 2 * history2 + 1 # legal move.
break
except Exception:
# print("failed!" + str(e))
history2 = 2 * history2 + 2 # illegal move.

return 1 if p.score() > 0 else 2
# TODO remove if not planned to be used
# def phantomgo_choose(self, policy, history):
# if policy is not None and history < len(policy):
# state = np.random.RandomState(hash(policy[history]))
# result = state.randint(board.NN)
# return result
# else:
# result = np.random.randint(board.NN)
# return result

# def phantomgo_play_game(self, policy1, policy2, size=7):
# if policy1 is None and policy2 is None:
# return 20000
# if np.random.uniform(0., 1.) < .5:
# result = self.internal_phantomgo_play_game(policy2, policy1, size)
# return 1 if result == 2 else 2 if result == 1 else 0

# return self.internal_phantomgo_play_game(policy1, policy2, size)

# def internal_phantomgo_play_game(self, policy1, policy2, size):
# # pylint: disable=too-many-locals
# # Empty board.
# if size == 7:
# EMPTY_BOARD = EMPTY_BOARD7
# Position = Position7
# if size == 9:
# EMPTY_BOARD = EMPTY_BOARD9
# Position = Position9
# if size == 19:
# EMPTY_BOARD = EMPTY_BOARD19
# Position = Position19

# p = Position(board=EMPTY_BOARD, ko=None)

# mixing = 3 # We mix 3 policies. Please note that we also apply a 8-fold random rotation/symmetry.
# if policy1 is not None:
# init = np.random.randint(mixing)
# policy1 = [policy1[i] for i in range(init, len(policy1), mixing)]
# if policy2 is not None:
# init = np.random.randint(mixing)
# policy2 = [policy2[i] for i in range(init, len(policy2), mixing)]

# history1 = 1 # Player 1 starts.
# history2 = 2

# random_rot = np.random.randint(8)

# def transformation(x):
# a = x // size
# b = x % size
# if random_rot == 0:
# a = size - 1 - a
# b = size - 1 - b
# elif random_rot == 1:
# b = size - 1 - b
# elif random_rot == 2:
# a = size - 1 - a
# b = size - 1 - b
# elif random_rot == 3:
# b = size - 1 - b
# elif random_rot == 4:
# c = b
# b = a
# a = c
# a = size - 1 - a
# elif random_rot == 5:
# c = b
# b = a
# a = c
# elif random_rot == 6:
# c = b
# b = a
# a = c
# a = size - 1 - a
# elif random_rot == 7:
# c = b
# b = a
# a = c
# return a * size + b

# # pylint: disable=broad-except
# for _ in range(2 * size * size - size - 1):
# # print("move " + str(idx))
# # print("=================")
# # print(str(p))
# # print("=================")
# for _ in range((size * size - 9) // 2):
# try:
# move1 = transformation(self.phantomgo_choose(policy1, history1))
# # print("player1 trying ", move1)
# p = p.play_move(move1, board.BLACK)
# history1 = 2 * history1 + 1 # legal move.
# break
# except Exception:
# # print("failed!" + str(e))
# history1 = 2 * history1 + 2 # illegal move.
# # print("=================")
# # print(p)
# # print("=================")
# for _ in range((size * size - 9) // 2):
# try:
# move2 = self.phantomgo_choose(policy2, history2)
# # print("player 2 trying ", move2)
# p = p.play_move(move2, board.WHITE)
# history2 = 2 * history2 + 1 # legal move.
# break
# except Exception:
# # print("failed!" + str(e))
# history2 = 2 * history2 + 2 # illegal move.

# return 1 if p.score() > 0 else 2

def war_play_game(self, policy1, policy2, batawaf=False):
# pylint: disable=too-many-return-statements
Expand Down Expand Up @@ -380,8 +380,7 @@ def war_decide(self, policy, num_cards, list_of_cards):


# Real life is more complicated! This is a very simple model.
# pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-statements,too-many-locals
class Game(inst.InstrumentedFunction):
class Game(inst.InstrumentedFunction): # TODO: Improve seeding support (with ParametrizedFunction)
"""
Parameters
----------
Expand All @@ -393,25 +392,23 @@ class Game(inst.InstrumentedFunction):
def __init__(self, game: str = "war") -> None:
self.game = game
self.game_object = _Game()
the_dimension = self.game_object.play_game(self.game) * 2 # times 2 because we consider both players separately.
instrumentation = Instrumentation(inst.var.Array(the_dimension))
super().__init__(self._simulate_game, instrumentation)
dimension = self.game_object.play_game(self.game) * 2 # times 2 because we consider both players separately.
super().__init__(self._simulate_game, inst.var.Array(dimension))
self.instrumentation.probably_noisy = True
self.instrumentation.is_nonmetrizable = game in ["war", "batawaf"]
self._descriptors.update(game=game)

def _simulate_game(self, x: np.ndarray) -> float:
# FIXME: an adaptive opponent, e.g. bandit, would be better.
# We play a game as player 1.
np_state = np.random.get_state()
np.random.seed(self.instrumentation.random_state.randint(12560, dtype=np.uint32))
# np_state = np.random.get_state() # TODO TOO DANGEREOUS! this can make the game play the same move all over again
p1 = x[:(self.dimension // 2)]
p2 = self.instrumentation.random_state.normal(size=self.dimension // 2)
p2 = np.random.normal(size=self.dimension // 2)
r = self.game_object.play_game(self.game, p1, p2)
result = 0. if r == 1 else 0.5 if r == 0 else 1.
# We play a game as player 2.
p1 = self.instrumentation.random_state.normal(size=self.dimension // 2)
p1 = np.random.normal(size=self.dimension // 2)
p2 = x[(self.dimension // 2):]
r = self.game_object.play_game(self.game, p1, p2)
np.random.set_state(np_state)
# np.random.set_state(np_state)
return (result + (0. if r == 2 else 0.5 if r == 0 else 1.)) / 2
13 changes: 5 additions & 8 deletions nevergrad/functions/games/test_game.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
from ...common import testing
from typing import Any
from typing import List
import typing as tp
from nevergrad.common import testing
import numpy as np
from . import game

@testing.parametrized(**{name: (name,) for name in ["war", "flip", "batawaf", "guesswho", "bigguesswho"]})

@testing.parametrized(**{name: (name,) for name in game._Game().get_list_of_games()})
def test_games(name: str) -> None:
dimension = game._Game().play_game(name)
res: List[Any] = []
res: tp.List[tp.Any] = []
for _ in range(200):
res += [game._Game().play_game(name, np.random.uniform(0, 1, dimension), None)]
score = (float(sum(1 if r == 2 else 0 if r == 1 else 0.5 for r in res)) / len(res))
assert score >= 0.1
assert score <= 0.9
function = game.Game(name)
function(function.instrumentation.random_state.normal(size=function.dimension))



Loading

0 comments on commit db6878e

Please sign in to comment.