-
Notifications
You must be signed in to change notification settings - Fork 0
/
envsim.py
122 lines (93 loc) · 3.58 KB
/
envsim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import pygame
import subprocess
import mmap
import ctypes
import os
from posix_ipc import Semaphore, O_CREAT
import random
import math
import time
MAX_ROBOTS = 100000
class RobotState(ctypes.Structure):
_fields_ = [("x", ctypes.c_double),
("y", ctypes.c_double),
("t", ctypes.c_double),
("dx", ctypes.c_double),
("dt", ctypes.c_double),
("ux", ctypes.c_double),
("ut", ctypes.c_double)]
class SharedMemory(ctypes.Structure):
_fields_ = [("robots", RobotState * MAX_ROBOTS),
("message", ctypes.c_int32)]
MSG_NONE = 0
MSG_STOP = 1
WIDTH, HEIGHT = 800, 800
ROBOT_SIZE = 5
ROBOT_COUNT = 5000
def main():
sem_start_name = f"gro300_sem_start_{random.randint(1000,9999)}"
sem_done_name = f"gro300_sem_done_{random.randint(1000,9999)}"
# Create and initialize semaphore
sem_start = Semaphore(sem_start_name, O_CREAT, initial_value=0)
sem_done = Semaphore(sem_done_name, O_CREAT, initial_value=0)
process = subprocess.Popen(["./robotsim",
sem_start_name,
sem_done_name,
str(ROBOT_COUNT)],
stdout=subprocess.PIPE)
#stderr=subprocess.PIPE)
# Read shared memory filename from robotsim's output
shared_mem_fname = process.stdout.readline().decode().strip()
if (shared_mem_fname == ""):
print("Erreur: Impossible de lire le nom du fichier de mémoire partagée.")
return
# Open the shared memory file and create a memory map
with open(shared_mem_fname, "r+b") as f:
mm = mmap.mmap(f.fileno(), 0)
shared_data = SharedMemory.from_buffer(mm)
running = True
cycle_count = 0
time_spent_ms = 0
print("PID du processus robotsim:", process.pid)
input("Appuyez sur Entrée pour lancer la simulation...")
pygame.init()
screen = pygame.display.set_mode((WIDTH, HEIGHT))
pygame.display.set_caption("Simulation de Robots")
clock = pygame.time.Clock()
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
start_time = time.perf_counter()
# Signal robotsim to perform a step
sem_start.release()
# Wait for the step to complete
sem_done.acquire()
end_time = time.perf_counter()
time_spent_ms += (end_time - start_time) * 1000
# Draw robots
screen.fill((255, 255, 255))
for robot in shared_data.robots[:ROBOT_COUNT]:
x = int(robot.x * WIDTH)
y = int((1 - robot.y) * HEIGHT)
angle = robot.t
points = [
(x + ROBOT_SIZE * math.cos(angle), y - ROBOT_SIZE * math.sin(angle)),
(x + ROBOT_SIZE * math.cos(angle + 2.6), y - ROBOT_SIZE * math.sin(angle + 2.6)),
(x + ROBOT_SIZE * math.cos(angle - 2.6), y - ROBOT_SIZE * math.sin(angle - 2.6))
]
pygame.draw.polygon(screen, (0, 0, 255), points)
pygame.display.flip()
cycle_count += 1
clock.tick(60)
print(f"Cycle count: {cycle_count}, average time per cycle: {time_spent_ms / cycle_count:.2f} ms")
pygame.quit()
shared_data.message = MSG_STOP
sem_start.release()
process.wait()
# NOTE: The subprocess has the responsibility to unlink the shared memory file
# Everything should be cleaned after the process is done.
sem_start.unlink()
sem_done.unlink()
if __name__ == "__main__":
main()