-
Notifications
You must be signed in to change notification settings - Fork 0
/
auto_assault.py
374 lines (337 loc) · 16.7 KB
/
auto_assault.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# Autonomous Squad Assault
# Copyright (C) 2019 Richard Scott McNew.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import argparse
import copy
import logging
import sys
import time
from multiprocessing import Process, Queue
import pygame
from pygame.locals import *
from agent.agent_messages import *
from graphics.pygame_constants import *
from opfor.opfor import Opfor
from simulation.direction import Direction
from simulation.drawable import Drawable
from simulation.missionmap import MissionMap
from simulation.point import Point
from warbot.warbot import Warbot
from warbot.warbot_radio_broker import WarbotRadioBroker
class AutoAssault:
"""Autonomous Assault Simulation"""
def __init__(self, args):
self.mission_complete = False
# generate map
self.mission_map = MissionMap(args)
# create IPC queues and warbot radio message broker
self.to_agent_queues = []
self.to_all_warbots_queue = Queue()
self.warbot_radio_broker = WarbotRadioBroker(self.to_all_warbots_queue)
self.warbot_radio_broker_process = Process(target=self.warbot_radio_broker.run)
# create containers for child processes and objects
self.processes = []
self.warbots = [] # locations for warbots and opfor are updated in the child processes and mission_map,
self.opfors = [] # but not in these objects
self.civilians = [] # civilians are not agents (no child processes)
self.active_agents = set()
# simulation setup methods
def create_warbots(self, to_sim_queue):
"""Create warbot objects and associated child processes"""
logging.debug("Creating {} warbots".format(len(self.mission_map.warbot_locations)))
for location in self.mission_map.warbot_locations.values():
to_queue = Queue()
self.to_agent_queues.append(to_queue)
visible_map = self.mission_map.get_visible_map_around_point(location, WARBOT_VISION_DISTANCE)
name = self.mission_map.get_named_drawable_at_location(location, WARBOT_PREFIX)
logging.info("Creating warbot {} at location {}".format(name, location))
to_this_warbot_queue = Queue()
warbot = Warbot(to_queue, to_sim_queue, location, visible_map,
name, to_this_warbot_queue, self.warbot_radio_broker,
self.mission_map.objective_location, self.mission_map.rally_point_location)
self.active_agents.add(warbot.name)
self.warbots.append(warbot)
process = Process(target=warbot.run)
self.processes.append(process)
def create_opfor(self, to_sim_queue):
"""Create OPFOR objects and associated child processes"""
logging.debug("Creating {} OPFOR".format(len(self.mission_map.opfor_locations)))
for location in self.mission_map.opfor_locations.values():
to_queue = Queue()
self.to_agent_queues.append(to_queue)
visible_map = self.mission_map.get_visible_map_around_point(location, OPFOR_VISION_DISTANCE)
name = self.mission_map.get_named_drawable_at_location(location, OPFOR_PREFIX)
logging.info("Creating OPFOR {} at location {}".format(name, location))
opfor = Opfor(to_queue, to_sim_queue, location, visible_map, name)
self.active_agents.add(opfor.name)
self.opfors.append(opfor)
process = Process(target=opfor.run)
self.processes.append(process)
def create_civilians(self):
"""Create civilian objects"""
None # stubbed out for now
# pygame drawing and event handling
def check_for_quit(self):
"""See if the user indicated quit ('q' key, ESC key, or clicked window close button"""
for event in pygame.event.get(): # event handling loop
if event.type == QUIT:
self.terminate()
elif event.type == KEYDOWN and (event.key == K_q or event.key == K_ESCAPE):
self.terminate()
def get_color(self, mission_map, x, y):
drawables = mission_map.grid[Point(x, y)]
if len(drawables) >= 1:
return drawables[0].color
else:
return Colors.BLACK, Colors.BLACK
def draw_grid(self, mission_map):
"""Draw the map grid via pygame"""
# draw gridlines
for x in range(0, WINDOW_WIDTH, CELL_SIZE): # draw vertical lines
pygame.draw.line(DISPLAY_SURF, Colors.DARK_GRAY.value, (x, TOP_BUFFER), (x, WINDOW_HEIGHT + TOP_BUFFER))
for y in range(TOP_BUFFER, WINDOW_HEIGHT, CELL_SIZE): # draw horizontal lines
pygame.draw.line(DISPLAY_SURF, Colors.DARK_GRAY.value, (0, y), (WINDOW_WIDTH, y))
# draw grid objects
for x in range(0, mission_map.grid.width):
for y in range(0, mission_map.grid.height):
if mission_map.grid.array[x][y] != 0: # use the internal array directly for speed
(lineColor, fillColor) = self.get_color(mission_map, x, y)
cell_x = x * CELL_SIZE
cell_y = y * CELL_SIZE + TOP_BUFFER
rect = pygame.Rect(cell_x, cell_y, CELL_SIZE, CELL_SIZE)
pygame.draw.rect(DISPLAY_SURF, lineColor.value, rect)
inner_rect = pygame.Rect(cell_x + 4, cell_y + 4, CELL_SIZE - 8, CELL_SIZE - 8)
pygame.draw.rect(DISPLAY_SURF, fillColor.value, inner_rect)
def check_for_key_press(self):
"""See if the user pressed any key to quit at simulation over"""
if len(pygame.event.get(QUIT)) > 0:
self.terminate()
key_up_events = pygame.event.get(KEYUP)
if len(key_up_events) == 0:
return None
if key_up_events[0].key == K_ESCAPE:
self.terminate()
return key_up_events[0].key
def draw_press_key_message(self):
"""Draw a message indicating how to end the simulation"""
press_key_surf = BASIC_FONT.render('Press a key to quit', True, Colors.DARK_GRAY.value)
press_key_rect = press_key_surf.get_rect()
press_key_rect.topleft = (WINDOW_WIDTH - 200, WINDOW_HEIGHT - 30)
DISPLAY_SURF.blit(press_key_surf, press_key_rect)
def draw_legend(self):
"""Draw a legend that shows what map colors represent"""
x = 7
for index in range(1, 10):
drawable = Drawable(index)
surf = BASIC_FONT.render(drawable.name, True, drawable.color[0].value)
rect = surf.get_rect()
rect.topleft = (x, TOP_BUFFER + WINDOW_HEIGHT + 15)
DISPLAY_SURF.blit(surf, rect)
x = x + LEGEND_SCALE * len(drawable.name) + 7
x = 7
for index in range(10, 18):
drawable = Drawable(index)
surf = BASIC_FONT.render(drawable.name, True, drawable.color[0].value)
rect = surf.get_rect()
rect.topleft = (x, TOP_BUFFER + WINDOW_HEIGHT + 60)
DISPLAY_SURF.blit(surf, rect)
x = x + LEGEND_SCALE * len(drawable.name) + 7
def show_game_over_screen(self):
"""Draw simulation ended message"""
game_over_font = pygame.font.Font(SANS_FONT, 150)
game_surf = game_over_font.render("Objective", True, Colors.WHITE.value)
over_surf = game_over_font.render("Secured", True, Colors.WHITE.value)
game_rect = game_surf.get_rect()
over_rect = over_surf.get_rect()
game_rect.midtop = (WINDOW_WIDTH / 2, 10)
over_rect.midtop = (WINDOW_WIDTH / 2, game_rect.height + 10 + 25)
DISPLAY_SURF.blit(game_surf, game_rect)
DISPLAY_SURF.blit(over_surf, over_rect)
self.draw_press_key_message()
pygame.display.update()
pygame.time.wait(500)
self.check_for_key_press() # clear out any key presses in the event queue
while True:
if self.check_for_key_press():
pygame.event.get() # clear event queue
return
def update_display(self):
"""Draw the updated game state (mission_map) to the screen"""
DISPLAY_SURF.fill(BG_COLOR.value)
self.draw_grid(self.mission_map)
# draw_legend()
pygame.display.update()
FPS_CLOCK.tick(FPS)
def update_warbots(self):
"""Draw the updated game state (mission_map) to the screen"""
for warbot in self.mission_map.warbot_locations.keys():
for location in [self.mission_map.warbot_locations[warbot],
self.mission_map.previous_warbot_locations[warbot]]:
if location is not None:
(lineColor, fillColor) = self.get_color(self.mission_map, location.x, location.y)
cell_x = location.x * CELL_SIZE
cell_y = location.y * CELL_SIZE + TOP_BUFFER
rect = pygame.Rect(cell_x, cell_y, CELL_SIZE, CELL_SIZE)
pygame.draw.rect(DISPLAY_SURF, lineColor.value, rect)
inner_rect = pygame.Rect(cell_x + 4, cell_y + 4, CELL_SIZE - 8, CELL_SIZE - 8)
pygame.draw.rect(DISPLAY_SURF, fillColor.value, inner_rect)
pygame.display.update()
FPS_CLOCK.tick(FPS)
def update_bullets(self):
"""Only animate bullets"""
for bullet in self.mission_map.bullets:
for location in [bullet.location, bullet.prev_location]:
if location is not None:
(lineColor, fillColor) = self.get_color(self.mission_map, location.x, location.y)
cell_x = location.x * CELL_SIZE
cell_y = location.y * CELL_SIZE + TOP_BUFFER
rect = pygame.Rect(cell_x, cell_y, CELL_SIZE, CELL_SIZE)
pygame.draw.rect(DISPLAY_SURF, lineColor.value, rect)
inner_rect = pygame.Rect(cell_x + 4, cell_y + 4, CELL_SIZE - 8, CELL_SIZE - 8)
pygame.draw.rect(DISPLAY_SURF, fillColor.value, inner_rect)
pygame.display.update()
FPS_CLOCK.tick(FPS)
# simulation main loop and helpers
def terminate(self):
"""Simulation shutdown and clean-up"""
logging.debug("Sending shutdown message to child processes . . .")
for to_agent_queue in self.to_agent_queues:
to_agent_queue.put(shutdown_message())
self.to_all_warbots_queue.put(shutdown_message())
logging.debug("Waiting for child processes to shutdown . . .")
for process in self.processes:
process.join()
self.warbot_radio_broker_process.join()
logging.debug("Quitting . . .")
logging.shutdown()
pygame.quit()
sys.exit()
def update_mission_map(self, messages_received):
"""Update game state (mission_map) from agent subprocess messages"""
bullets_live = False
for message in messages_received:
logging.debug("Simulation: Received message: {}".format(message))
if message[MESSAGE_TYPE] == TAKE_TURN:
agent = message[FROM]
action = message[ACTION]
if action == MOVE_TO:
location = Point.from_dict(message[LOCATION])
self.mission_map.move_agent(agent, location)
elif action == FIRE_AT:
location = Point.from_dict(message[LOCATION])
direction = Direction.from_str(message[DIRECTION])
self.mission_map.create_bullet(location, direction)
bullets_live = True
elif message[MESSAGE_TYPE] == MISSION_COMPLETE:
self.mission_complete = True
return
if bullets_live:
while len(self.mission_map.bullets) > 0:
self.update_bullets()
self.mission_map.move_bullets()
def start_child_processes(self):
"""Start child processes running for agents and warbot radio broker process"""
self.warbot_radio_broker_process.start()
# start warbot and opfor processes running
for process in self.processes:
process.start()
def run_simulation(self, to_sim_queue):
"""Main simulation loop"""
self.start_child_processes()
quit_wanted = False
# Draw the whole map the first time; afterwards only draw updates for better performance
self.update_display()
live_process_count = len(self.processes)
while not self.mission_complete and not quit_wanted: # main game loop
# check for q or Esc keypress or window close events to quit
self.check_for_quit()
# give agents updated simulation state and await their actions for this turn
agents_left = copy.deepcopy(self.active_agents)
messages_received = []
for warbot in self.warbots:
warbot_location = self.mission_map.warbot_locations[warbot.name]
visible_map = self.mission_map.get_visible_map_around_point(warbot_location, warbot.sight_radius)
warbot.to_me_queue.put(your_turn_message(visible_map))
for opfor in self.opfors:
opfor_location = self.mission_map.opfor_locations[opfor.name]
visible_map = self.mission_map.get_visible_map_around_point(opfor_location, opfor.sight_radius)
opfor.to_me_queue.put(your_turn_message(visible_map))
# await "take_turn" response messages
logging.debug("Waiting on turn responses . . .")
while len(messages_received) < live_process_count:
message = json.loads(to_sim_queue.get())
if message[MESSAGE_TYPE] == TAKE_TURN:
agents_left.remove(message[FROM])
messages_received.append(message)
logging.debug("Received {} turn responses out of {} expected. Still need response from: {}"
.format(len(messages_received), live_process_count, agents_left))
elif message[MESSAGE_TYPE] == MISSION_COMPLETE:
self.mission_complete = True
break
if self.mission_complete:
self.show_game_over_screen()
else:
logging.debug("Done waiting on turn responses. Updating mission_map")
# update mission_map
self.update_mission_map(messages_received)
# update warbot movement
self.update_warbots()
# after the mission is complete or quit is indicated, clean-up and shutdown
self.terminate()
def parse_arguments():
"""Parse the command line arguments"""
# setup command line argument parsing
parser = argparse.ArgumentParser()
parser.add_argument('-r', default=6, type=int, choices=range(2, 11),
help='number of rifle warbots (2 to 10)')
parser.add_argument('-e', default=6, type=int, choices=range(1, 11),
help='number of enemies (1 to 10)')
parser.add_argument('-c', default=0, type=int, choices=range(0, 21),
help='number of civilians (0 to 20)')
return parser.parse_args()
def setup_pygame():
"""Setup global PyGame state"""
global FPS_CLOCK, DISPLAY_SURF, BASIC_FONT, SCORE_FONT
pygame.init()
FPS_CLOCK = pygame.time.Clock()
DISPLAY_SURF = pygame.display.set_mode((WINDOW_WIDTH, WINDOW_HEIGHT + TOP_BUFFER + BOTTOM_BUFFER))
BASIC_FONT = pygame.font.Font(SANS_FONT, 24)
SCORE_FONT = pygame.font.Font(SANS_FONT, 36)
pygame.display.set_caption(AUTO_ASSAULT)
def main():
"""Main entry point"""
# start logging
log_file = "Auto_Assault.log"
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d : %(message)s',
filename=log_file,
level=logging.DEBUG)
# parse command line args
args = parse_arguments()
# setup simulation
auto_assault = AutoAssault(args)
to_sim_queue = Queue()
# create warbots
auto_assault.create_warbots(to_sim_queue)
# create opfor
auto_assault.create_opfor(to_sim_queue)
# create civilians
auto_assault.create_civilians()
# pygame setup
setup_pygame()
# run the simulation
time.sleep(1) # wait just a moment for everything to come up
auto_assault.run_simulation(to_sim_queue)
if __name__ == '__main__':
main()