Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
esquerbatua committed Sep 26, 2024
1 parent b1c9941 commit 7016094
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 32 deletions.
16 changes: 8 additions & 8 deletions vlib/picoev/loop_linux.c.v
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn (mut pv Picoev) update_events(fd int, events int) int {
mut ev := C.epoll_event{}

// fd belongs to loop
if events & picoev_del != target.events && target.loop_id != pv.loop.id {
if events & picoev_del != target.events && target.loop_id != pv.loop[0].id {
return -1
}

Expand All @@ -78,17 +78,17 @@ fn (mut pv Picoev) update_events(fd int, events int) int {
// nothing to do
} else if events & picoev_readwrite == 0 {
// delete the file if it exists
epoll_ret := C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_DEL, fd, &ev)
epoll_ret := C.epoll_ctl(pv.loop[0].epoll_fd, C.EPOLL_CTL_DEL, fd, &ev)

// check error
assert epoll_ret == 0
} else {
// change settings to 0
mut epoll_ret := C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_MOD, fd, &ev)
mut epoll_ret := C.epoll_ctl(pv.loop[0].epoll_fd, C.EPOLL_CTL_MOD, fd, &ev)
if epoll_ret != 0 {
// if the file is not present we want to add it
assert C.errno == C.ENOENT
epoll_ret = C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_ADD, fd, &ev)
epoll_ret = C.epoll_ctl(pv.loop[0].epoll_fd, C.EPOLL_CTL_ADD, fd, &ev)

// check error
assert epoll_ret == 0
Expand All @@ -102,20 +102,20 @@ fn (mut pv Picoev) update_events(fd int, events int) int {

@[direct_array_access]
fn (mut pv Picoev) poll_once(max_wait_in_sec int) int {
nevents := C.epoll_wait(pv.loop.epoll_fd, &pv.loop.events, max_fds, max_wait_in_sec * 1000)
nevents := C.epoll_wait(pv.loop[0].epoll_fd, &pv.loop[0].events, max_fds, max_wait_in_sec * 1000)

if nevents == -1 {
// timeout has occurred
return -1
}

for i := 0; i < nevents; i++ {
mut event := pv.loop.events[i]
mut event := pv.loop[0].events[i]
target := unsafe { pv.file_descriptors[event.data.fd] }
unsafe {
assert event.data.fd < max_fds
}
if pv.loop.id == target.loop_id && target.events & picoev_readwrite != 0 {
if pv.loop[0].id == target.loop_id && target.events & picoev_readwrite != 0 {
mut read_events := 0
if event.events & u32(C.EPOLLIN) != 0 {
read_events |= picoev_read
Expand All @@ -132,7 +132,7 @@ fn (mut pv Picoev) poll_once(max_wait_in_sec int) int {
// defer epoll delete
event.events = 0
unsafe {
C.epoll_ctl(pv.loop.epoll_fd, C.EPOLL_CTL_DEL, event.data.fd, &event)
C.epoll_ctl(pv.loop[0].epoll_fd, C.EPOLL_CTL_DEL, event.data.fd, &event)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion vlib/picoev/loop_macos.c.v
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ fn backend_build(next_fd int, events u32) int {

// get the lower 8 bits
@[inline]
fn backend_get_old_events(backend int) int {
fn backend_get_old_events(backend int) int {loop &LoopType,
return backend & 0xff
}

Expand Down
54 changes: 32 additions & 22 deletions vlib/picoev/picoev.v
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub:
max_write int = 8192
family net.AddrFamily = .ip6
host string
num_pools int = 1
}

// Core structure for managing the event loop and connections.
Expand All @@ -72,10 +73,12 @@ pub struct Picoev {
err_cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response, IError) = default_error_callback @[deprecated: 'use `error_callback` instead']
raw_cb fn (mut Picoev, int, int) = unsafe { nil } @[deprecated: 'use `raw_callback` instead']
mut:
loop &LoopType = unsafe { nil }
last_loop int
file_descriptors [max_fds]&Target
timeouts map[int]i64
num_loops int
num_pools int
req_loop &LoopType = unsafe { nul }
loop []&LoopType

buf &u8 = unsafe { nil }
idx [1024]int
Expand All @@ -90,8 +93,6 @@ pub:
pub fn (mut pv Picoev) init() {
assert max_fds > 0

pv.num_loops = 0

for i in 0 .. max_fds {
pv.file_descriptors[i] = &Target{}
}
Expand All @@ -107,7 +108,7 @@ pub fn (mut pv Picoev) add(fd int, events int, timeout int, callback voidptr) in
mut target := pv.file_descriptors[fd]
target.fd = fd
target.cb = callback
target.loop_id = pv.loop.id
target.loop_id = pv.req_loop.id
target.events = 0

if pv.update_events(fd, events | picoev_add) != 0 {
Expand All @@ -124,7 +125,7 @@ pub fn (mut pv Picoev) add(fd int, events int, timeout int, callback voidptr) in

// del remove a file descriptor from the event loop
@[deprecated: 'use delete() instead']
@[direct_array_access]
@[inline]
pub fn (mut pv Picoev) del(fd int) int {
return pv.delete(fd)
}
Expand Down Expand Up @@ -155,15 +156,15 @@ pub fn (mut pv Picoev) delete(fd int) int {
}

fn (mut pv Picoev) loop_once(max_wait_in_sec int) int {
pv.loop.now = get_time()
loop.now = get_time()

if pv.poll_once(max_wait_in_sec) != 0 {
eprintln('Error during poll_once')
return -1
}

if max_wait_in_sec != 0 {
pv.loop.now = get_time() // Update loop start time again if waiting occurred
loop.now = get_time() // Update loop start time again if waiting occurred
} else {
// If no waiting, skip timeout handling for potential performance optimization
return 0
Expand All @@ -179,7 +180,7 @@ fn (mut pv Picoev) loop_once(max_wait_in_sec int) int {
fn (mut pv Picoev) set_timeout(fd int, secs int) {
assert fd < max_fds
if secs != 0 {
pv.timeouts[fd] = pv.loop.now + secs
pv.timeouts[fd] = pv.loop[0].now + secs
} else {
pv.timeouts.delete(fd)
}
Expand All @@ -193,14 +194,14 @@ fn (mut pv Picoev) handle_timeout() {
mut to_remove := []int{}

for fd, timeout in pv.timeouts {
if timeout <= pv.loop.now {
if timeout <= pv.loop[0].now {
to_remove << fd
}
}

for fd in to_remove {
target := pv.file_descriptors[fd]
assert target.loop_id == pv.loop.id
assert target.loop_id == pv.loop[0].id
pv.timeouts.delete(fd)
unsafe { target.cb(fd, picoev_timeout, &pv) }
}
Expand Down Expand Up @@ -356,7 +357,8 @@ pub fn new(config Config) !&Picoev {
}

mut pv := &Picoev{
num_loops: 1
num_pools: config.num_pools
loop: unsafe { []&LoopType{len: config.num_pools, init: nil} }
cb: config.cb
error_callback: config.err_cb
raw_callback: config.raw_cb
Expand All @@ -372,18 +374,11 @@ pub fn new(config Config) !&Picoev {
pv.out = unsafe { malloc_noscan(max_fds * config.max_write + 1) }
}

// epoll on linux
// kqueue on macos and bsd
// select on windows and others
$if linux {
pv.loop = create_epoll_loop(0) or { panic(err) }
} $else $if freebsd || macos {
pv.loop = create_kqueue_loop(0) or { panic(err) }
} $else {
pv.loop = create_select_loop(0) or { panic(err) }
for loop_id in 0 .. config.num_pools {
pv.loop[loop_id] = pv.create_loop(loop_id)!
}

if pv.loop == unsafe { nil } {
if pv.loop[0] == unsafe { nil } {
eprintln('Failed to create loop')
close_socket(listening_socket_fd)
return unsafe { nil }
Expand All @@ -395,6 +390,21 @@ pub fn new(config Config) !&Picoev {
return pv
}

// create_loop - Creates a new Event Loop
pub fn (mut pv Picoev) create_loop(id int) !&LoopType {
// epoll on linux
// kqueue on macos and bsd
// select on windows and others
$if linux {
return create_epoll_loop(id) or { panic(err) }
} $else $if freebsd || macos {
return create_kqueue_loop(id) or { panic(err) }
} $else {
return create_select_loop(id) or { panic(err) }
}
return unsafe { nil }
}

// serve starts the event loop for accepting new connections
// See also picoev.new().
pub fn (mut pv Picoev) serve() {
Expand Down
3 changes: 2 additions & 1 deletion vlib/veb/veb.v
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub:
port int = 8080
show_startup_message bool = true
timeout_in_seconds int = 30
num_loops int = 1
}

struct FileResponse {
Expand Down Expand Up @@ -151,7 +152,7 @@ pub fn run_at[A, X](mut global_app A, params RunParams) ! {

if params.show_startup_message {
host := if params.host == '' { 'localhost' } else { params.host }
println('[veb] Running app on http://${host}:${params.port}/')
println('[veb] Running app on http://${host}:${params.port}/, num_loops: ${params.num_loops}')
}
flush_stdout()

Expand Down

0 comments on commit 7016094

Please sign in to comment.