Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove usage of IO internals #105

Merged
merged 2 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
- issue #102 - test and see what this is about
- Look at RPC benchmark more closely: is there a way to reduce the overhead of
the `backend_base_switch_fiber` function?

Expand Down
2 changes: 1 addition & 1 deletion docs/cheat-sheet.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def calculate_some_stuff(n)
acc += big_calc(acc, i)
snooze if (i % 1000) == 0
end
end
end
```

### Suspend fiber
Expand Down
28 changes: 14 additions & 14 deletions examples/core/rpc_benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ def bm_fiber_raw
$server_raw.transfer 3
end

p bm_raw
p bm_send
p bm_fiber
p bm_fiber_optimized
p bm_fiber_single
# p bm_raw
# p bm_send
# p bm_fiber
# p bm_fiber_optimized
# p bm_fiber_single
p bm_fiber_raw
p bm_fiber_schedule

Expand All @@ -116,17 +116,17 @@ def warmup_jit

puts "warming up JIT..."

3.times do
warmup_jit
sleep 1
end
# 3.times do
# warmup_jit
# sleep 1
# end

Benchmark.ips do |x|
x.report("raw") { bm_raw }
x.report("send") { bm_send }
x.report("fiber") { bm_fiber }
x.report("fiber_optimized") { bm_fiber_optimized }
x.report("fiber_single") { bm_fiber_single }
# x.report("raw") { bm_raw }
# x.report("send") { bm_send }
# x.report("fiber") { bm_fiber }
# x.report("fiber_optimized") { bm_fiber_optimized }
# x.report("fiber_single") { bm_fiber_single }
x.report("fiber_raw") { bm_fiber_raw }
x.report("fiber_schedule") { bm_fiber_schedule }
x.compare!
Expand Down
68 changes: 68 additions & 0 deletions examples/core/stream_mockup.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

require 'bundler/setup'
require 'polyphony'

class Stream
def initialize(io)
@io = io
@buffer = +''
@length = 0
@pos = 0
end

def getbyte
if @pos == @length
return nil if !fill_buffer
end
byte = @buffer[@pos].getbyte(0)
@pos += 1
byte
end

def getc
if @pos == @length
return nil if !fill_buffer
end
char = @buffer[@pos]
@pos += 1
char
end

def ungetc(c)
@buffer.insert(@pos, c)
@length += 1
c
end

def gets
end

def read
end

def readpartial
end

private

def fill_buffer
Polyphony.backend_read(@io, @buffer, 8192, false, -1)
@length = @buffer.size
end
end

i, o = IO.pipe
s = Stream.new(i)

f = spin do
loop do
b = s.getbyte
p getbyte: b
s.ungetc(b.to_s) if rand > 0.5
end
end

o << 'hello'
sleep 0.1

8 changes: 3 additions & 5 deletions ext/polyphony/backend_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ inline void set_fd_blocking_mode(int fd, int blocking) {
#endif
}

inline void io_verify_blocking_mode(rb_io_t *fptr, VALUE io, VALUE blocking) {
inline void io_verify_blocking_mode(VALUE io, int fd, VALUE blocking) {
VALUE blocking_mode = rb_ivar_get(io, ID_ivar_blocking_mode);
if (blocking == blocking_mode) return;

rb_ivar_set(io, ID_ivar_blocking_mode, blocking);
set_fd_blocking_mode(fptr->fd, blocking == Qtrue);
set_fd_blocking_mode(fd, blocking == Qtrue);
}

inline void backend_run_idle_tasks(struct Backend_base *base) {
Expand Down Expand Up @@ -455,9 +455,7 @@ VALUE Backend_stats(VALUE self) {
}

VALUE Backend_verify_blocking_mode(VALUE self, VALUE io, VALUE blocking) {
rb_io_t *fptr;
GetOpenFile(io, fptr);
io_verify_blocking_mode(fptr, io, blocking);
io_verify_blocking_mode(io, rb_io_descriptor(io), blocking);
return self;
}

Expand Down
11 changes: 10 additions & 1 deletion ext/polyphony/backend_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
#include "ruby/io.h"
#include "runqueue.h"

#ifndef HAVE_RB_IO_DESCRIPTOR
static int rb_io_descriptor_fallback(VALUE io) {
rb_io_t *fptr;
GetOpenFile(io, fptr);
return fptr->fd;
}
#define rb_io_descriptor rb_io_descriptor_fallback
#endif

struct backend_stats {
unsigned int runqueue_size;
unsigned int runqueue_length;
Expand Down Expand Up @@ -145,7 +154,7 @@ VALUE Backend_stats(VALUE self);
VALUE Backend_verify_blocking_mode(VALUE self, VALUE io, VALUE blocking);
void backend_run_idle_tasks(struct Backend_base *base);
void set_fd_blocking_mode(int fd, int blocking);
void io_verify_blocking_mode(rb_io_t *fptr, VALUE io, VALUE blocking);
void io_verify_blocking_mode(VALUE io, int fd, VALUE blocking);
void backend_setup_stats_symbols();
int backend_getaddrinfo(VALUE host, VALUE port, struct sockaddr **ai_addr);
VALUE name_to_addrinfo(void *name, socklen_t len);
Expand Down
12 changes: 6 additions & 6 deletions ext/polyphony/backend_io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ VALUE SYM_write;
VALUE eArgumentError;

#ifdef POLYPHONY_UNSET_NONBLOCK
#define io_unset_nonblock(fptr, io) io_verify_blocking_mode(fptr, io, Qtrue)
#define io_unset_nonblock(io, fd) io_verify_blocking_mode(io, fd, Qtrue)
#else
#define io_unset_nonblock(fptr, io)
#define io_unset_nonblock(io, fd)
#endif

typedef struct Backend_t {
Expand Down Expand Up @@ -389,10 +389,10 @@ static inline int fd_from_io(VALUE io, rb_io_t **fptr, int write_mode, int recti
if (underlying_io != Qnil) io = underlying_io;

GetOpenFile(io, *fptr);
io_unset_nonblock(*fptr, io);
int fd = rb_io_descriptor(io);
io_unset_nonblock(io, fd);
if (rectify_file_pos) rectify_io_file_pos(*fptr);

return (*fptr)->fd;
return fd;
}
}

Expand Down Expand Up @@ -1376,7 +1376,7 @@ VALUE Backend_wait_io(VALUE self, VALUE io, VALUE write) {

// if (fd < 0) return Qnil;

// io_unset_nonblock(fptr, io);
// io_unset_nonblock(io, fd);

// ctx = context_store_acquire(&backend->store, OP_CLOSE);
// sqe = io_uring_backend_get_sqe(backend);
Expand Down
10 changes: 5 additions & 5 deletions ext/polyphony/backend_libev.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ static inline int fd_from_io(VALUE io, rb_io_t **fptr, int write_mode, int recti
if (underlying_io != Qnil) io = underlying_io;

GetOpenFile(io, *fptr);
io_verify_blocking_mode(*fptr, io, Qfalse);
int fd = rb_io_descriptor(io);
io_verify_blocking_mode(io, fd, Qfalse);
if (rectify_file_pos) rectify_io_file_pos(*fptr);

return (*fptr)->fd;
return rb_io_descriptor(io);
}
}

Expand Down Expand Up @@ -681,7 +681,7 @@ VALUE Backend_accept(VALUE self, VALUE server_socket, VALUE socket_class) {
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
io_verify_blocking_mode(fp, socket, Qfalse);
io_verify_blocking_mode(socket, fd, Qfalse);
rb_io_synchronized(fp);

// if (rsock_do_not_reverse_lookup) {
Expand Down Expand Up @@ -736,7 +736,7 @@ VALUE Backend_accept_loop(VALUE self, VALUE server_socket, VALUE socket_class) {
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
io_verify_blocking_mode(fp, socket, Qfalse);
io_verify_blocking_mode(socket, fd, Qfalse);
rb_io_synchronized(fp);

rb_yield(socket);
Expand Down
6 changes: 6 additions & 0 deletions ext/polyphony/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,10 @@ def define_bool(name, value)

have_header('ruby/io/buffer.h')

have_func('rb_io_path')
have_func('rb_io_descriptor')
have_func('rb_io_get_write_io')
have_func('rb_io_closed_p')
have_func('rb_io_open_descriptor')

create_makefile 'polyphony_ext'
Loading