Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Channels #12042

Closed
wants to merge 6 commits into from
Closed

Added Channels #12042

wants to merge 6 commits into from

Conversation

amitmurthy
Copy link
Contributor

Edit: Current design iteration starts at : #12042 (comment)

  • Channels are queues providing a fast means of inter-task communication
  • Multiple writers and readers
  • Type aware
  • By default RemoteRefs are now backed by a Channel{Any} with a size of 1 and is a parametric type
  • It is also possible to have a RemoteRef that can hold more than one value via constructor RemoteRef(T::DataType, pid::Integer, sz::Int=1)

API

  • put!, take, fetch, isready, wait have the same action as for RemoteRefs
  • open(cref::ChannelRef, T::Type=Any, sz::Int=1) creates a remote channel and returns a cref with the channel id filled in.
  • put!, take, fetch, isready, wait work with ChannelRef s for remote access to a channel.
  • A remote channel created with open will have to be explicitly closed with a close(addr::ChannelRef) to be garbage collected. RemoteRefs continue to be collected by the distributed gc mechanism we currently have.

@amitmurthy amitmurthy added the parallelism Parallel or distributed computation label Jul 7, 2015
@amitmurthy amitmurthy mentioned this pull request Jul 7, 2015
3 tasks
@amitmurthy
Copy link
Contributor Author

@malmaud
Copy link
Contributor

malmaud commented Jul 7, 2015

Looks really good! I'm excited for this.

What do you think about implementing a select-like construct from Go? IMO it leads to really elegant code when combined with channels: https://gobyexample.com/select.

@malmaud
Copy link
Contributor

malmaud commented Jul 7, 2015

Another cool thing would be if channels could be backed by shared memory if all the workers are on the same machine. Then we might be able to approach Go speeds for most common uses of channels. Of course that could be implemented any time in the future, but I thought it might be worth considering now if we're going to talk about design.

@amitmurthy
Copy link
Contributor Author

Yes, Go's select seems nice. What would a Julian interface look like?

@amitmurthy
Copy link
Contributor Author

Channels via shared memory It should be easy enough to do at least for bitstypes channels. However, this would involve using a cross-platform library to provide portable synchronization primitives (or we roll our own). AFAIK libuv only provides for mutexes within a process.

function put!{T}(c::Channel{T, true}, v::T)
store = c.store
d = store.take_pos - store.put_pos
if (d == 1) || (d == -(store.szp1-1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push! / pop! / shift! / unshift! should generally already take care of all of this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I implemented a circular buffer to optimize unnecessary memmoves on highly active channels. The flipside is that currently the buffer does not shrink from its peak size, but that can be implemented.

@carnaval
Copy link
Contributor

carnaval commented Jul 7, 2015

When we have this merged, would it be reasonable to implement what we discussed when you where here ? For the record :

  • RemoteRef use a distributed refcount
  • fetching implicitely decref and destroys the local ref
  • passing the remoteref to another worker is the only operation requiring explicit message traffic for refcount
  • [ maybe introduce a move operation to move a remoteref from a worker to another one while destroying the local one : this does not require message sending ]

all of this gets rid of the dependency of remoteref on gc finalizers. Finalizing a remoteref could then either : decref OR error saying that you should fetch (or close) a remoteref explicitely. A lot of time is spent right now in the explicit gc() call in the remote ref constructor because of the memory pressure issue.

Then, if you want a more long lived thing that you can fetch multiple times you use a channel and you control the lifetime explicitely.

@carnaval
Copy link
Contributor

carnaval commented Jul 7, 2015

It might also be a good time to add a type parameter to remote refs and be able to specify it explicitely in remotecall for performance (and it would default to Any).

@amitmurthy
Copy link
Contributor Author

RemoteRefs now have a type parameter. Will ensure put! and take! on RemoteRefs do annotate the return values from remotecall_fetch appropriately.

I would like to keep the distributed gc mechanism that we have for the simple reason that it is currently very user friendly. Just like folks do not worry about any gc issues in single processes mode, the distributed gc ensures the same for distributed mode too. It is totally transparent and there are lots of cases where fetch on remote refs are called repeatedly.

The explicit call to gc is after every 200 remote ref creations. We could remove that and document that user code requiring immediate release of remote refs must manually call the finalizer. And ensure that RemoteRefs are not serializable once the finalizer has been called. Calling the finalizer sends a "delete" message to the process storing the ref value.

The lifetime of channels must be explicitly managed only if they have been created with channel_at for remote access. Locally created and accessible channels are managed by the regular gc mechanism.

@carnaval
Copy link
Contributor

carnaval commented Jul 7, 2015

Well we really can't have this be the default IMO. It's extremely inefficient and is useless to 99% of the use case of remoteref which is an RPC call you don't want to block on ?

I don't care if it's called a remoteref or something else, but the return type from remotecall cannot be something with a complicated lifetime implementation since it is such a basic primitive of our system.

Essentially, what people want most of the time is :

r = @async remotecall_fetch(...)

because this way it doesn't incur distributed gc and you are still not blocking. This should be the default way of doing things that's all I'm saying. Complex lifetime should be opt-in and something you go out of your way for.

The problem with the @async way I wrote is that if you want to pass this remoteref to someone else, then it will have an indirection through the caller, whereas with the scheme I described in my previous post the third party can fetch the ref directly (and have no gc messages).

Please correct me if I'm wrong, because I'm not very familiar with our rpc system.

@carnaval
Copy link
Contributor

carnaval commented Jul 7, 2015

Also what are the use cases you have in mind for remoteref that are fetched multiple times and would not be covered by a channel ?

@malmaud
Copy link
Contributor

malmaud commented Jul 7, 2015

What if we had both a peek and a fetch function, where the former behaves like today's fetch, and the latter instantly decrements the remote value's reference count?

@amitmurthy
Copy link
Contributor Author

t = @async remotecall_fetch(...) does not return a remote ref, does not trigger distributed gc, and executes asynchronously - it returns a task that you can wait for completion. If you wanted to collect the response, you would use wait(t) which would then return the result that can passed to a different process if required.

@andreasnoack examples involved fetch of the same ref multiple times. But, more important is the usability. Transparent distributed gc is a good thing. Folks have enough to worry about when writing distributed code as is. For the all the RPC only cases, distributed gc should not be required.

@amitmurthy
Copy link
Contributor Author

@malmaud not required! As I said, unless you are explicitly calling put!, take! or a fetch on a remote ref, there is no distributed gc. remotecall_fetch does not create a remote reference, it executes and returns the result. @async can execute a remotecall_fetch asynchronously.

@amitmurthy
Copy link
Contributor Author

OTOH, remotecall_wait and remotecall return remote refs. These calls should be used when user code explicitly requires storage of the RPC result on the remote process in order to be shared or just processed later.

@spawn and @spawnat both return remote refs too. I think we need a version of these that executes the expressions asynchronously, but fetches and stores the result in a local ref which is returned.

r = @spawn_fetch expr where the ref returned is a local ref, expr is executed asynchronously and r does not trigger distributed gc.

Better documentation, and more examples are required to explain these subtle differences between these calls and their effect on performance.

@carnaval
Copy link
Contributor

carnaval commented Jul 7, 2015

I don't think we should pretend that distributed gc is transparent if it is inefficient, people will just get bitten and use a workaround. That's not usable. Calling gc() manually is never fine, it is extremely slow (runs gc twice, one of those being a full collection).

Maybe I didn't express myself clearly. In fact, what I don't believe should be the default is mutable remoterefs. If it is immutable it is always a bad idea to fetch it twice (and we could actually just destroy the ref on fetch and cache the result to allow fetching twice). If the remoteref is immutable then we can make the optimization I'm talking about to have efficient distributed gc without finalizers.

Yes, @async remotecall_fetch does what I want for the most part, apart from the fact that I can't send the result of this call to someone else as efficiently as in the way I described (with immutable remoteref moving).

So my question is essentially : why do we need mutable remoteref to be the default ? We should at least have immutable ones for efficiency, make it the default, and have people ask for the mutable one when they want it. In my mind that's what I call a channel of length 1 but it could be called remoteref (the difference being explicit lifetime).

In other words, remotecall is a nice short name, so it should be the good way of doing things, not a very general thing that does way too much for most of use cases and thus ends up being inefficient.

@amitmurthy
Copy link
Contributor Author

We will remove the gc() call. Explicitly finalizing RemoteRefs will solve the lifetime issue and not trigger a full gc().

The other issue is the design of DistributedArrays and SharedArrays, where everytime the distributed array or the shared array object is sent to any worker, all the remote refs of all the parts are sent over. Changing these implementations to use an explicitly managed channel which stores the references, will drastically reduce the number of "add_client" messages that we saw on @andreasnoack test code the other day.

I am uncomfortable having a fetch decrement ref count implementation. User code can fetch multiple times at different places in the call stack, and it will just lead to hard-to-debug bugs.

As I understand the mutable/immutable difference is relevant only for use cases where the result of an RPC needs to shared across processes. With a ref count implementation you save on a "delete_msg" from each worker, right?

@carnaval
Copy link
Contributor

carnaval commented Jul 7, 2015

Well, the problem is the gc() call was added for a reason, probably because otherwise those would pile up. So now users would have to be aware of it and finalize manually ? Then there is no real point in having those finalizers as convenience anyway.

When you say user code might fetch multiple times the same ref, is it expecting the same value everytime ? (in most cases) If yes, then we should have ImmutableRemoteRef be the default and then we can cache the result on the receiver side and decref on (the first) fetch, while still allowing multiple fetch without going over the network. So the immutable thing also "solves" the multiple fetch problem.

I'm sure everything I want is doable now, but it's more about what is the default (and encouraged) behavior. I would argue that the only time you want a shared cell that holds some data everyone can modify, you are quite aware of its lifetime and it would be fine to have channels for this case.

@amitmurthy
Copy link
Contributor Author

A background task pushes "del_msgs" whenever local refs are collected by the local gc as a matter of course. The explicit gc was added to handle the case where the local gc is not collecting refs due to a lack of memory pressure (given that they are only 24 bytes each).

You would explicitly call finalizers only when you feel that the local gc is not collecting ref objects in time.

Yes, an immutable with a local cache would work.

@malmaud
Copy link
Contributor

malmaud commented Jul 7, 2015

I wanted to try out this branch, but I got an error with addprocs:

julia> addprocs(1)
ERROR (unhandled task failure): TypeError: subtype: expected Type{T}, got Symbol
 in tmerge at ./inference.jl:1156
 in abstract_call_gf at ./inference.jl:726
 in abstract_call at ./inference.jl:882
 in abstract_eval_call at ./inference.jl:934
 in abstract_eval at ./inference.jl:961
 in abstract_interpret at ./inference.jl:1120
 in typeinf_uncached at ./inference.jl:1549
 in typeinf at ./inference.jl:1339
 in typeinf at ./inference.jl:1289
 in abstract_call_gf at ./inference.jl:725
 in abstract_call at ./inference.jl:882
 in abstract_eval_call at ./inference.jl:934
 in abstract_eval at ./inference.jl:961
 in typeinf_uncached at ./inference.jl:1622
 in typeinf at ./inference.jl:1339
 in typeinf_ext at ./inference.jl:1283
 in anonymous at task.jl:13

@amitmurthy
Copy link
Contributor Author

@malmaud Sorry about that, some issue with a few precompile lines that were working so far. I have commented them out for now, you ought to be able to try it now.

@amitmurthy amitmurthy force-pushed the amitm/channels branch 2 times, most recently from b5800db to 835ec94 Compare July 13, 2015 05:03
@amitmurthy amitmurthy changed the title RFC/WIP : Added Channels Added Channels Jul 13, 2015
@amitmurthy
Copy link
Contributor Author

@JeffBezanson , could you please review this? I'll add documentation and can merge after a review.

  • Adds two new exports - Channel and ChannelRef .
  • ChannelRef s are references to remote channels - created with a open and freed with a close. They are NOT collected via the distributed gc.
  • RemoteRefs use a Channel as their backing store and are collected via the distributed gc mechanism that we have.

Channels are about as fast as produce-consume for inter-task communication, with the added advantage of a) length > 1 and b) being type-aware.

julia> function foo(t::Task)
           while true
               consume(t)
           end
       end
foo (generic function with 1 method)

julia> function foon(n)
           self = current_task()
           @schedule foo(self)
           for i in 1:10^n
               produce(i)
           end
       end
foon (generic function with 1 method)

julia> foon(1)

julia> @time foon(6)
   2.291 seconds      (2005 k allocations: 31482 KB, 0.17% gc time)

julia> function bar(c::Channel)
           while true
               take!(c)
           end
       end
bar (generic function with 1 method)

julia> function barn(n)
           c=Channel(Int64,1)
           @schedule bar(c)
           for i in 1:10^n
               put!(c, i)
           end
       end
barn (generic function with 1 method)

julia> barn(1)

julia> @time barn(6)
   2.460 seconds      (4000 k allocations: 183 MB, 0.61% gc time)

@JeffBezanson
Copy link
Member

The Channel type looks really good. We should probably remove produce and consume entirely (at some point).

On the RemoteRef side, things are starting to feel complicated: we have channels, remoterefs which include channels (plus RemoteValues), and remote channels. Something seems amiss here. This is also reflected in the two flavors of create_and_register_channel. There seem to be two abstractions: channels and remote references, where remote references can be managed either manually (open/close) or by DGC.

It might be simpler, and in line with @carnaval 's suggestion, to make RemoteRefs immutable, and only support put! and take! if they point to a Channel. This could make things more efficient when a RemoteRef only needs to point to one value, which is a common case. RemoteRefs could also have a no-DGC option, making them behave like ChannelRef. Although I'm not sure if a manually-managed ref should be an option or a different type.

@amitmurthy
Copy link
Contributor Author

  • Have added documentation
  • Channel() without a size specified creates a typemax(Int) sized channel. Previously was of size 1.
  • @jakebolewski , having both ChannelDataSingle and ChannelDataMultiple was actually slower. Have removed them and now inter-task communication via channels is slightly faster than produce/consume
  • Both Channel and remote channels, i.e. ChannelRef are now iterable.

@malmaud
Copy link
Contributor

malmaud commented Jul 20, 2015

What's the motivation for the change in the default Channel size? That puts us at odds with Go, where channels are unbuffered by default.

@amitmurthy
Copy link
Contributor Author

Given the name "channel" I felt that the default of size 1 was non-intuitive. No other reason.

@malmaud
Copy link
Contributor

malmaud commented Jul 20, 2015

That does make sense, although if we ever implement same-node channels via shared memory, I think we'd have to allocate the maximum buffer size at the time the channel was created. Obviously that's not great if the default size is typemax(Int).

@amitmurthy
Copy link
Contributor Author

A saner default then? 32?

@amitmurthy
Copy link
Contributor Author

Or 42.

@malmaud
Copy link
Contributor

malmaud commented Jul 20, 2015

Any integer less than a million sounds good to me.

@JeffBezanson
Copy link
Member

I'm ok with adding Channels in 0.4 but the totality of this change feels like too much right now.

We should use Channel{T}(...) instead of Channel(T, ...). Only arrays use the second form, but even they support the first syntax now.

I find the name open_channel unacceptable. We try very hard to avoid underscore names. I think the underscore could be removed with more thought about the API. I'm still uncomfortable with the apparent non-orthogonality of having a certain kind of remote reference that only points to channels. Very little about ChannelRef is specific to channels.

Is it possible for multiple processors to talk to the same remote channel? It's not clear to me how that works. For example, you could serialize a ChannelRef to another processor, but I don't see where references are added.

@amitmurthy
Copy link
Contributor Author

I see your point. RemoteValue can wrap an AbstractChannel instead of a Channel as is being done.
An AbstractChannel implements put!, take!, fetch, isready and wait.

Instead of open_channel, we will have register{T<:}(pid, T, args...) which will return a RegisteredRef.

put! and take! on a RegisteredRef will be of the form put!(rr, args...), take!(rr, args...), fetch(rr, args...)

For example, this will allow users to implement remote dictionaries as an AbstractChannel.

register(2, Channel, sz) will register a Channel on pid 2 returning a RegisteredRef.

Multiple processors can talk to the same remote channel, or in the modified form, a RegisteredRef. References are not added because these are manually managed, not by DGC. As long as no process has "closed"ed a RegisteredRef, serialized references will work. close on a RegisteredRef removes the RemoteValue and makes it available for gc on the remote process. Parallel operations on a closed RegisteredRef throw a InvalidStateException

@amitmurthy
Copy link
Contributor Author

Have reworked the interface.

Channel constructors are now

const DEF_CHANNEL_SZ=32
Channel() = Channel(DEF_CHANNEL_SZ)
Channel(sz::Int) = Channel{Any}(sz)

and of course Channel{T}(sz)

The remote references type hierarchy is now

abstract AbstractRemoteRef{T}
abstract SysManagedRef{T} <: AbstractRemoteRef{T}
abstract UserManagedRef{T} <: AbstractRemoteRef{T}
type Future{T} <: SysManagedRef{T}
type RegisteredRef{T} <: UserManagedRef{T}

Registered Refs are created with

RegisteredRef(;pid::Int=myid(), reftype::Type=Channel{Any}, sz::Int=DEF_CHANNEL_SZ)
RegisteredRef(f::Function, pid::Int=myid())

f() above must return an AbstractChannel when executed on pid

A RegisteredRef points to a AbstractChannel - basically any type that implements put!, take!, etc.

Objects pointed to by a RegisteredRef must be explicitly closed with a close.

Some of the changes in this PR are not directly related to Channels but came about while implementing the same. Will submit them as separate PRs in order to make this a bit more easily reviewable.

@ArchRobison
Copy link
Contributor

One of my colleagues, Jim Cownie, who once worked on Transputers, noted that the isready method is going to lead to races once we have multithreading. As far as I can tell, Go omits isready. I'm passing on his comments, in paraphrased form below, on possible ways to deal with the problem:

  1. Implement Occam ALT (or Go select) allowing a “true” case which lets you poll a set of inputs, receive on one which is ready or return immediately. Then isready is no longer necessary.
  2. Replace isready with tryfetch and trytake. The analogy with locks is useful here. We always have a function that claims a lock or returns on failure rather than an "isfree" predicate, precisely to avoid this race.

MPI now recognises that MPI_Probe is a mistake for exactly the reason that it’s racy when the communication endpoint is shared between threads.

@malmaud
Copy link
Contributor

malmaud commented Jul 31, 2015

I totally agree that select is the way to go - you can see earlier #12042 (comment) it's mainly a matter of designing a nice Julia API for it.

@amitmurthy
Copy link
Contributor Author

isready can lead to race conditions even with distributed Julia as noted in the documentation - http://docs.julialang.org/en/latest/stdlib/parallel/#Base.isready .

A Go type of select which actually removes the data (unlike socket select which indicates readiness) would be a nice API. With Julia processes distributed across nodes, the challenges of synchronizing across networks remain (for an example - JuliaLang/Distributed.jl#28).

A solution would be to spawn individual tasks that take from individual references and write to a common channel. The caller reads only from this channel and processes all values written to it.

@StefanKarpinski
Copy link
Member

+1 to Go-style select.

@tbreloff
Copy link

+1 on select (or something comparable)

On Friday, July 31, 2015, Stefan Karpinski notifications@github.com wrote:

+1 to Go-style select.


Reply to this email directly or view it on GitHub
#12042 (comment).

@ViralBShah
Copy link
Member

Channels need a mention in NEWS.

@amitmurthy
Copy link
Contributor Author

I added a mention few hours ago.

On Tue, Aug 11, 2015 at 6:54 PM, Viral B. Shah notifications@github.com
wrote:

Channels need a mention in NEWS.


Reply to this email directly or view it on GitHub
#12042 (comment).

@malmaud
Copy link
Contributor

malmaud commented Oct 11, 2015

Is this PR obsolete at this point?

@amitmurthy
Copy link
Contributor Author

Yes. Ideas discussed here should be incorporated in a new PR. Closing this one.

@amitmurthy amitmurthy closed this Oct 12, 2015
@amitmurthy amitmurthy deleted the amitm/channels branch December 22, 2015 03:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging this pull request may close these issues.