Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConflatedBroadcastChannel as ReadWriteProperty #245

Closed
jcornaz opened this issue Feb 15, 2018 · 20 comments
Closed

ConflatedBroadcastChannel as ReadWriteProperty #245

jcornaz opened this issue Feb 15, 2018 · 20 comments

Comments

@jcornaz
Copy link
Contributor

jcornaz commented Feb 15, 2018

Hello,

In our code base, we found useful to define the folowing extension functions:

operator fun <T> ConflatedBroadcastChannel<T>.getValue(thisRef: Any, property: KProperty<*>): T = value

operator fun <T> ConflatedBroadcastChannel<T>.setValue(thisRef: Any, property: KProperty<*>, value: T) {
  offer(value)
}

They allow us to use the following pattern:

class MyClass {
  private val _value = ConflatedBroadcastChannel(0)
  var value by _value 

  fun openValueSubscription() = _value.openSubscription()
}

May I ask what do you think about this ?
Do you think it is an anti-pattern ?

  • If yes, why? And how would you achieve better generic observer pattern with coroutines?
  • If no, what would you think about making ConflatedBroadcastChannel implement ReadWriteProperty?
@elizarov
Copy link
Contributor

It is interesting. Can you share your project (or maybe some smaller playground project if cannot) via github to see it in a larger context?

@fvasco
Copy link
Contributor

fvasco commented Feb 23, 2018

I have a similar issue, also.
I need to receive the value's update starting by the current one.
Ie: the state of the light depends by the state of the switch.
In the light constructor i load the switch value, I set up its state and then I subscribe the channel.

class Light(switch : Switch) {
 init {
  val state = switch.value
  setup(value)
  launch {
   switch.channel.consumeEach { setup(it) }
  }
 }
}

This code is weak, I can lost an update during the first setup invocation.

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 5, 2018

Sorry, but I cannot share the project. Anyway it is not used a lot (yet) as we are really not sure if it is good design or not.

The idea is to achieve generic observer pattern. It is inspired of what we can do with JavaFx and tornadofx:

class MyClass {
  val valueProperty = SimpleIntegerProperty(0)
  var value by valueProperty 
}

fun usage(obj: MyClass) {
  
  // get current (latest) value
  val currentValue = obj.value
  
  // subscribe
  obj.valueProperty.addListener { _, _, newValue -> println("new value: $newValue") }
}

And with a coroutines version I proposed in my first post, one could do:

fun usage(obj: MyClass) {
  
  // get current (latest) value
  val currentValue = obj.value
  
  // subscribe
  launch {
    obj.openValueSubscription().consumeEach { newValue -> println("new value: $newValue") }
  }
}

Actually I'm more asking:

  1. "What do you think about this designs in a software using coroutines?"
  2. "How do you achieve generic observer pattern with coroutines?"

@fvasco
Copy link
Contributor

fvasco commented Mar 5, 2018

Regarding setValue you should consider the closed channel case.

Having something like the reactive BehaviorSubject fix your issue?

ConflatedBroadcastChannel should have the function:

fun openSubscription(includeValue: Boolean = false): SubscriptionReceiveChannel

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 5, 2018

Regarding setValue you should consider the closed channel case.

Yes, but there is not a lot to do. I don't see how I can do more than throwing an exception. To achieve it one would simply implement setValue with sendBlocking(value) instead of offer(value).

But anyway, for this usage the broadcast wouldn't normally be closed, unless we know that setValue will never be called again (or we know that nobody care on the value and its changes anymore).

We indeed used BehaviorSubject with Rx to achieve it so far. Our goal is to replace usages BehaviorSubject by ConflatedBroadacastChannel and usages of PublishSubject by BroadacastChannel.

But there are no technical problem. From a technical point of view it does work as expected, and we don't have any issue. I'm really asking what you think about this design. And, if it appears it is a good design propose to implements getValue and setValue directly in kotlinx.coroutines.

@fvasco
Copy link
Contributor

fvasco commented Mar 5, 2018

getValue and setValue are error prone in concurrent programming.

Your usage function have the same bug of my Light example (as said above, the issue is caused by load the current value outside the subscription).

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 5, 2018

getValue and setValue are error prone in concurrent programming.

I agree. That's why an openValueSubcription() is provided. Or would you have a better alternative to propose?

Your usage function have the same bug of my Light example (as said above, the issue is caused by load the current value outside the subscription).

Indeed, if the value changes many times quickly the listener may not receive all values but only the latest one. But it is usually not a problem, as (in most cases*) we only care about the most recent value.

*: This is not the case of the light example.

@fvasco
Copy link
Contributor

fvasco commented Mar 5, 2018

That's why an openValueSubcription() is provided.

Sorry, I missed it.
I remain skeptical about using a ConflatedBroadcastChannel as a variable, this hides too much (purely personal opinion).

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 5, 2018

I remain skeptical about using a ConflatedBroadcastChannel as a variable, this hides too much (purely personal opinion)

I think it is a perfectly valid opinion. Actually I'm really not sure if it is a good idea either.

One alternative could be:

class MyClass {
  private val broadcast = ConflatedBroadcastChannel(0)
  
  fun setValue(newValue: Int) {
    broadcast.sendBlocking(newValue)
  }

  fun openValueSubscription() = broadcast.openSubscription()
}

With this latest example, the only way to get a value is to subscribe with openValueSubscription(). The drawback is that it is not possible to set the value like this anymore: obj.value = 42.

@fvasco, Do you think this latest pattern is preferable to my first example? Or would you propose something different?

@fvasco
Copy link
Contributor

fvasco commented Mar 6, 2018

Hi @jcornaz,
I took some time to think about this issue, unfortunately I didn't solve it, so I decided to share my opinion with you to understand better this problem.
I dump you some code with ugly names with the only purpose to be clear.

My use case requires an interface like this

interface ObservableVariable<T> {
    var value: T
    fun openValueSubscription(): SubscriptionReceiveChannel<T>
}

in such case delegate a variable to the ObservableVariable is a reasonable choice.
The ObservableVariable is like a regular variable and you can observe it, you cannot close or something similar.
Personally for my use case I use the ObservableVariable (mutable) and ObservableValue (read-only), so it is explicit the emitter and the receivers.

We can choose to implement ObservableVariable using ConflatedBroadcastChannel (my current implementation) or ConflatedBroadcastChannel : ObservableVariable.

First choice is pretty simple and safe.
Second choice is almost ready, ConflatedBroadcastChannel.value is a val but the change is pretty trivial.

What are the downside of the second choice.
ConflatedBroadcastChannel is a complex class, this is a personal opinion so I consider it a bad motivation against it.
ConflatedBroadcastChannel can be closed, this is not a great feature. Can I invalidate all variable references? (with great power comes great responsibility)
ConflatedBroadcastChannel is a read-write channel, damn! We have SendChannel and ReadChannel but both BroadcastChannel and ConflatedBroadcastChannel are read-write, is this a desing flaws?

Finally we should choose a good practice for a common case:

class MyClass(observableVariable: ObservableValue<Int>) {
    var double: Int

    init {
        double = observableVariable.value * 2
        launch {
            observableVariable.openValueSubscription().consumeEach {
                double = it * 2
            }
        }
    }
}

a simple solution should be:

class MyClass(observableVariable: ObservableValue<Int>) {
    var double: Int by observableVariable.mapValue { it * 2 }
}

Can you compare my considerations with you experience, please?
Thank you.

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 6, 2018

I have to say we came up with an interface like the one you proposed too.

Our version of the ObservableVariable interface is just a bit more complex as it allows to subscribe to invalidation events too. (This is inspired from the ObservableValue interface of JavaFx). Being able to open invalidation subscription make possible to have lazy calculations. Nevermind, it doesn't change the design choice you propose.

Then I'd like to say, I really like the naming you used (ObservableVariable and ObservableValue). Though, I would rename them to SubscribableVariable and SubscribableValue to avoid confusion with JavaFx ObservableValue.

I personaly like the option of making ConflatedBroadcastChannel implement ObservableVariable. Even if I agree with your considerations (already complex, possibility to be closed), I think it is semantically what ConflatedBroadcastChannel already is. It does already provide a value read property and an openSubscription() which start by the current value. So making it implement this interface would only enforce this semantic and make it more clear. But it wouldn't change its implementation.

About the possibility to be closed, I would say like you "with great power comes great responsibility". I think it fine to say that's the responsibility to the client code to have a correct usage of it. If one close a property and then try to update it, it should throw an exception. I totally agree with your point. But what else can we do?

ConflatedBroadcastChannel is a read-write channel, damn! We have SendChannel and ReadChannel but both BroadcastChannel and ConflatedBroadcastChannel are read-write, is this a desing flaws?

Either I don't understand your point, or it is incorrect. ConflatedBroadcastChannel and BroadcastChannel are SendChannel only. They allow to open new ReadChannel via openSubscription(), but they are not ReadChannel. Or am I missing something ?

Finally, as you proposed, I would love to have map and combine operators, allowing the following usage:

class MyClass(observableVariable: ObservableValue<Int>) {
    val doubleObservable: ObservableValue<Int> = observableVariable.map { it * 2 }
    val sumObservable: ObservableValue<Int> = combine(observableVariable, double) { x, y -> x + y }
    val double: Double by doubleObservable
}

Note: for your first usage example it is not necessary to do double = observableVariable.value * 2, as openSubscription() would anyway start by sending the first value.

So globally I'd say your proposition is going in a good direction (in my opinion). We already have something similar implemented in our code base (with two operators map and combine). So far, it works fine, and I am not aware of a better design.

@fvasco
Copy link
Contributor

fvasco commented Mar 6, 2018

Well, really nice.

Either I don't understand your point, or it is incorrect. ConflatedBroadcastChannel and BroadcastChannel are SendChannel only. They allow to open new ReadChannel via openSubscription(), but they are not ReadChannel. Or am I missing something ?

You are right, I try to explain it better adding SubscribableChannel and ConflatedBroadcastChannel.

interface SubscribableChannel<E> { // like ReceiveBroadcastChannel
    fun openSubscription(): SubscriptionReceiveChannel<E>
}

interface BroadcastChannel<E> : SendChannel<E>, SubscribableChannel<E> {
}

interface ConflatedSubscribableChannel<E> : SubscribableChannel<E> {
    val value: E
}

interface ConflatedBroadcastChannel<E> : BroadcastChannel<E>, ConflatedSubscribableChannel<E> {
    override var value: E
}

It does already provide a value read property and an openSubscription() which start by the current value

Yes, whould be nice to preserve both behaviors.

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 6, 2018

Thanks @fvasco, I like the interfaces you propose.

I would only propose these changes:

  • Rename SubscribableChannel by Subscribable and ConflatedSubscribableChannel by ConflatedSubscribable because they are not necessarily Channel. They only provide a way to open new channels.
  • Add out variance for SubscribableChannel and ConflatedSubscribableChannel

And I would either:

Make ConflatedSubscribable implement ReadOnlyPropery and ConflatedBroadcastChannel implement ReadWriteProperty

Or provide:

operator fun <T> ConflatedSubscribable<T>.getValue(thisRef: Any, property: KProperty<*>): T = value
operator fun <T> ConflatedBroadcastChannel<T>.setValue(thisRef: Any, property: KProperty<*>, value: T) { this.value = value }

@fvasco
Copy link
Contributor

fvasco commented Mar 6, 2018

Make ConflatedSubscribable implement ReadOnlyPropery and ConflatedBroadcastChannel implement ReadWriteProperty

I consider better extending the interface (your alternative).

@fvasco
Copy link
Contributor

fvasco commented Mar 7, 2018

I notify this post regarding the "replay channel" (https://discuss.kotlinlang.org/t/replay-channel/6933): the ObservableVariable is, de facto, a similar to a regular ConflatedBroadcastChannel with replay=1.

@elizarov
Copy link
Contributor

elizarov commented Mar 7, 2018

So far I like what I see here. You are welcome to send pull request with those interfaces.

I also see that abstraction for cold streams (see #254) could implement the same interfaces and it should provide a lighter-weight implementation of those interfaces than ConflatedBroadcastChannel. This would allow for a lazier and lighter-weight implementation of combinators like map and filter in the future.

@fvasco
Copy link
Contributor

fvasco commented Mar 7, 2018

Hi @jcornaz,
are you interested to engage for the pull request?

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 8, 2018

Hi @fvasco
Yes, I've drafted a PR yesterday and I'd like to submit it this week.

Just one thing, may I suggest to use this naming scheme: Subscribable, SubscribableValue and SubscribableVariable?

I prefer this naming scheme for two reasons: First, ConflatedBroadcastChannel already exist and is a class, not an interface. Second, I think it is not very clear that ConflatedBroadcastChannel is the mutable version of ConflatedSubscribable.

@fvasco
Copy link
Contributor

fvasco commented Mar 8, 2018

Hi @jcornaz,
can you write a draft to understand better your proposed changes?

Otherwise open a draft pull request to pin up some comments.

@jcornaz
Copy link
Contributor Author

jcornaz commented Mar 8, 2018

@fvasco Yes, I've open a draft PR. let's move this discussion there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants