Skip to content

Commit

Permalink
Merge pull request #105 from digital-fabric/remove_io_internals
Browse files Browse the repository at this point in the history
Remove usage of IO internals
  • Loading branch information
noteflakes authored Jun 1, 2023
2 parents ecdad5d + 7cd5139 commit edfa3fa
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 116 deletions.
1 change: 0 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
- issue #102 - test and see what this is about
- Look at RPC benchmark more closely: is there a way to reduce the overhead of
the `backend_base_switch_fiber` function?

Expand Down
2 changes: 1 addition & 1 deletion docs/cheat-sheet.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def calculate_some_stuff(n)
acc += big_calc(acc, i)
snooze if (i % 1000) == 0
end
end
end
```

### Suspend fiber
Expand Down
28 changes: 14 additions & 14 deletions examples/core/rpc_benchmark.rb
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ def bm_fiber_raw
$server_raw.transfer 3
end

p bm_raw
p bm_send
p bm_fiber
p bm_fiber_optimized
p bm_fiber_single
# p bm_raw
# p bm_send
# p bm_fiber
# p bm_fiber_optimized
# p bm_fiber_single
p bm_fiber_raw
p bm_fiber_schedule

Expand All @@ -116,17 +116,17 @@ def warmup_jit

puts "warming up JIT..."

3.times do
warmup_jit
sleep 1
end
# 3.times do
# warmup_jit
# sleep 1
# end

Benchmark.ips do |x|
x.report("raw") { bm_raw }
x.report("send") { bm_send }
x.report("fiber") { bm_fiber }
x.report("fiber_optimized") { bm_fiber_optimized }
x.report("fiber_single") { bm_fiber_single }
# x.report("raw") { bm_raw }
# x.report("send") { bm_send }
# x.report("fiber") { bm_fiber }
# x.report("fiber_optimized") { bm_fiber_optimized }
# x.report("fiber_single") { bm_fiber_single }
x.report("fiber_raw") { bm_fiber_raw }
x.report("fiber_schedule") { bm_fiber_schedule }
x.compare!
Expand Down
68 changes: 68 additions & 0 deletions examples/core/stream_mockup.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

require 'bundler/setup'
require 'polyphony'

class Stream
def initialize(io)
@io = io
@buffer = +''
@length = 0
@pos = 0
end

def getbyte
if @pos == @length
return nil if !fill_buffer
end
byte = @buffer[@pos].getbyte(0)
@pos += 1
byte
end

def getc
if @pos == @length
return nil if !fill_buffer
end
char = @buffer[@pos]
@pos += 1
char
end

def ungetc(c)
@buffer.insert(@pos, c)
@length += 1
c
end

def gets
end

def read
end

def readpartial
end

private

def fill_buffer
Polyphony.backend_read(@io, @buffer, 8192, false, -1)
@length = @buffer.size
end
end

i, o = IO.pipe
s = Stream.new(i)

f = spin do
loop do
b = s.getbyte
p getbyte: b
s.ungetc(b.to_s) if rand > 0.5
end
end

o << 'hello'
sleep 0.1

8 changes: 3 additions & 5 deletions ext/polyphony/backend_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,12 @@ inline void set_fd_blocking_mode(int fd, int blocking) {
#endif
}

inline void io_verify_blocking_mode(rb_io_t *fptr, VALUE io, VALUE blocking) {
inline void io_verify_blocking_mode(VALUE io, int fd, VALUE blocking) {
VALUE blocking_mode = rb_ivar_get(io, ID_ivar_blocking_mode);
if (blocking == blocking_mode) return;

rb_ivar_set(io, ID_ivar_blocking_mode, blocking);
set_fd_blocking_mode(fptr->fd, blocking == Qtrue);
set_fd_blocking_mode(fd, blocking == Qtrue);
}

inline void backend_run_idle_tasks(struct Backend_base *base) {
Expand Down Expand Up @@ -455,9 +455,7 @@ VALUE Backend_stats(VALUE self) {
}

VALUE Backend_verify_blocking_mode(VALUE self, VALUE io, VALUE blocking) {
rb_io_t *fptr;
GetOpenFile(io, fptr);
io_verify_blocking_mode(fptr, io, blocking);
io_verify_blocking_mode(io, rb_io_descriptor(io), blocking);
return self;
}

Expand Down
11 changes: 10 additions & 1 deletion ext/polyphony/backend_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@
#include "ruby/io.h"
#include "runqueue.h"

#ifndef HAVE_RB_IO_DESCRIPTOR
static int rb_io_descriptor_fallback(VALUE io) {
rb_io_t *fptr;
GetOpenFile(io, fptr);
return fptr->fd;
}
#define rb_io_descriptor rb_io_descriptor_fallback
#endif

struct backend_stats {
unsigned int runqueue_size;
unsigned int runqueue_length;
Expand Down Expand Up @@ -145,7 +154,7 @@ VALUE Backend_stats(VALUE self);
VALUE Backend_verify_blocking_mode(VALUE self, VALUE io, VALUE blocking);
void backend_run_idle_tasks(struct Backend_base *base);
void set_fd_blocking_mode(int fd, int blocking);
void io_verify_blocking_mode(rb_io_t *fptr, VALUE io, VALUE blocking);
void io_verify_blocking_mode(VALUE io, int fd, VALUE blocking);
void backend_setup_stats_symbols();
int backend_getaddrinfo(VALUE host, VALUE port, struct sockaddr **ai_addr);
VALUE name_to_addrinfo(void *name, socklen_t len);
Expand Down
12 changes: 6 additions & 6 deletions ext/polyphony/backend_io_uring.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ VALUE SYM_write;
VALUE eArgumentError;

#ifdef POLYPHONY_UNSET_NONBLOCK
#define io_unset_nonblock(fptr, io) io_verify_blocking_mode(fptr, io, Qtrue)
#define io_unset_nonblock(io, fd) io_verify_blocking_mode(io, fd, Qtrue)
#else
#define io_unset_nonblock(fptr, io)
#define io_unset_nonblock(io, fd)
#endif

typedef struct Backend_t {
Expand Down Expand Up @@ -389,10 +389,10 @@ static inline int fd_from_io(VALUE io, rb_io_t **fptr, int write_mode, int recti
if (underlying_io != Qnil) io = underlying_io;

GetOpenFile(io, *fptr);
io_unset_nonblock(*fptr, io);
int fd = rb_io_descriptor(io);
io_unset_nonblock(io, fd);
if (rectify_file_pos) rectify_io_file_pos(*fptr);

return (*fptr)->fd;
return fd;
}
}

Expand Down Expand Up @@ -1376,7 +1376,7 @@ VALUE Backend_wait_io(VALUE self, VALUE io, VALUE write) {

// if (fd < 0) return Qnil;

// io_unset_nonblock(fptr, io);
// io_unset_nonblock(io, fd);

// ctx = context_store_acquire(&backend->store, OP_CLOSE);
// sqe = io_uring_backend_get_sqe(backend);
Expand Down
10 changes: 5 additions & 5 deletions ext/polyphony/backend_libev.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ static inline int fd_from_io(VALUE io, rb_io_t **fptr, int write_mode, int recti
if (underlying_io != Qnil) io = underlying_io;

GetOpenFile(io, *fptr);
io_verify_blocking_mode(*fptr, io, Qfalse);
int fd = rb_io_descriptor(io);
io_verify_blocking_mode(io, fd, Qfalse);
if (rectify_file_pos) rectify_io_file_pos(*fptr);

return (*fptr)->fd;
return rb_io_descriptor(io);
}
}

Expand Down Expand Up @@ -681,7 +681,7 @@ VALUE Backend_accept(VALUE self, VALUE server_socket, VALUE socket_class) {
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
io_verify_blocking_mode(fp, socket, Qfalse);
io_verify_blocking_mode(socket, fd, Qfalse);
rb_io_synchronized(fp);

// if (rsock_do_not_reverse_lookup) {
Expand Down Expand Up @@ -736,7 +736,7 @@ VALUE Backend_accept_loop(VALUE self, VALUE server_socket, VALUE socket_class) {
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
rb_io_ascii8bit_binmode(socket);
io_verify_blocking_mode(fp, socket, Qfalse);
io_verify_blocking_mode(socket, fd, Qfalse);
rb_io_synchronized(fp);

rb_yield(socket);
Expand Down
6 changes: 6 additions & 0 deletions ext/polyphony/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,10 @@ def define_bool(name, value)

have_header('ruby/io/buffer.h')

have_func('rb_io_path')
have_func('rb_io_descriptor')
have_func('rb_io_get_write_io')
have_func('rb_io_closed_p')
have_func('rb_io_open_descriptor')

create_makefile 'polyphony_ext'
Loading

0 comments on commit edfa3fa

Please sign in to comment.