-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce server plugin helpers #1312
Conversation
@repeatedly Could you take a look for API design and basic features? If you are ok, I'll write tests and move forward to implement socket plugin helper. |
router.emit(tag, time, record) | ||
option = msg[3] | ||
end | ||
|
||
# return option for response | ||
option | ||
ensure | ||
p(here: "ensure of on_message", error: $!) if $! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use p
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just about debugging. I missed to remote it.
end | ||
|
||
class TCPServerHandler < Coolio::TCPSocket | ||
PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No use in new implementation.
end | ||
|
||
def initialize(sock, resolve_name, max_bytes, flags, log, under_plugin_development, &callback) | ||
raise ArgumentError, "socket is a UDPSocket" unless sock.is_a?(UDPSocket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
socket must be a UDPSocket: socket = #{sock}
or socket is not a UDPSocket: socket = #{sock}
is better.
Basic implementation looks good. Off topic: How about moving |
def on_read_without_connection(data) | ||
@data_callback.call(data) | ||
rescue => e | ||
p(here: "error without connection", error: e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p
debug.
def on_read_with_connection(data) | ||
@data_callback.call(data, self) | ||
rescue => e | ||
p(here: "error with connection", error: e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
57e1c9f
to
a8a6ccc
Compare
@repeatedly No way. It's very bad practice to use Engine for such purpose. Engine is a singleton object, and using Engine for such purpose makes many messy code to overriding |
a8a6ccc
to
fe38634
Compare
I pushed updated commits to follow review comments, and add some changes:
I changed to pass an instance object of CallbackSocket to callbacks, to avoid to pass instances of cool.io classes. If plugin helper passes instances of cool.io classes, users may use methods of these classes (even if we want to keep these methods as semi-private). |
There's no change about server plugin helper API for plugins in updated commits. |
a364153
to
b6565b7
Compare
b6565b7
to
722c9b6
Compare
I pushed all commits to add/fix server plugin helper, and tests which pass on my laptop. |
d563fa5
to
aaaaa1c
Compare
desc 'The payload is read up to this character.' | ||
config_param :delimiter, :string, default: "\n" # syslog family add "\n" to each message and this seems only way to split messages in tcp stream | ||
|
||
def listen(callback) | ||
log.info "listening tcp socket on #{@bind}:#{@port}" | ||
def configure(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should check source_host_key
and source_hostname_key
?
if !@source_host_key.nil? && @source_hostname_key.nil?
@source_hostname_key = @source_host_key
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right.
tag = extract_tag_from_record(record) | ||
tag ||= @tag | ||
time ||= extract_time_from_record(record) || Fluent::EventTime.now | ||
record[@source_host_key] = conn.remote_host if @source_host_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use @source_hostname_key
instead.
tag = extract_tag_from_record(record) | ||
tag ||= @tag | ||
time ||= extract_time_from_record(record) || Fluent::EventTime.now | ||
record[@source_host_key] = sock.remote_host if @source_host_key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|
||
attr_reader :_event_loop # for tests | ||
|
||
def event_loop_attach(watcher) | ||
@_event_loop_mutex.synchronize do | ||
@_event_loop.attach(watcher) | ||
@_event_loop_attached_watchers << watcher |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means Loop#watchers
sometimes returns incorrect watchers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Loop#watchers
contains watchers which are NOT registered by event loop helpers, like TCP connection I/O watchers registered by TCP server watcher.
These watchers should be detached/closed by these watchers itself. Detaching such watchers by event loop helpers forcedly makes troubles.
when :data | ||
@sock.data(&callback) | ||
when :write_complete | ||
cb = ->(){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use one line for :close
code.
SOCK_OPT_FORMAT = 'I!I!' # { int l_onoff; int l_linger; } | ||
|
||
def initialize(sock, close_callback, resolve_name, linger_timeout, log, under_plugin_development, connect_callback) | ||
raise ArgumentError, "socket must be a TCPSocket" unless sock.is_a?(TCPSocket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add : sock = #{sock}
to show actual object.
a7f83ce
to
0e6c4ea
Compare
…hostname only for 127.0.0.1
08d2360
to
4e63ef4
Compare
Server plugin helpers are to make plugin authors to write network servers very easily.
These make authors free from writing code to create sockets, handle event loops or threads.
The key features are:
The full features of socket/server plugin helpers and TBDs are:
I committed all changes for this pull-request
To reduce the size of diff, keepalive support and socket plugin helper support will be implemented in following pull requests.