Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for the equivalent of javascript's setInterval #230

Open
rundel opened this issue Aug 9, 2019 · 11 comments
Open

Support for the equivalent of javascript's setInterval #230

rundel opened this issue Aug 9, 2019 · 11 comments

Comments

@rundel
Copy link
Contributor

rundel commented Aug 9, 2019

Most of the usage I've seen of httpuv's websocket functionality seems to use a method where the client will send a message which is then handled by the server which then responds with its own message.

For a project I am working on, a simpler solution is to have everything originate from the server which then sends a message on a scheduled basis out to the clients, i.e. something similar to what is possible with javascript's setInterval.

I've been able to achieve something like this by having a function recursively call itself with later on the desired interval,

onWSOpen = function(ws) {
  interval = 2
  
  f = function() {
    msg = paste("Hello", Sys.time())
    ws$send(msg)
    later::later(f, interval)
  }
  
  f()
}

but this feels a bit hacky, and I'm currently stuck with the later calls persisting even after the server is terminated. I feel like I will be able to find a way of using the ws object and httpuv::listServers to include a halt condition, but again this feels super hacky.

@rundel
Copy link
Contributor Author

rundel commented Aug 9, 2019

A full reprex of my example above:

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    f = function() {
      ws$send(paste("Hello", Sys.time()))
      later::later(f, 2)
    }
    f()
  }
)

browseURL("http://localhost:9454/")
startServer(host, port, app)

@wch
Copy link
Collaborator

wch commented Aug 9, 2019

I think this should do it. The change here is that there's an is_open flag, which gets set to FALSE when the websocket is closed; when it's FALSE, the function doesn't reschedule itself.

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    is_open <- TRUE
    
    ws$onClose(function() {
      message("in onClose")
      is_open <<- FALSE
    })

    send_and_reschedule <- function() {
      message("in send_and_reschedule")
      if (is_open) {
        message("websocket is open")
        ws$send(paste("Hello", Sys.time()))
        later::later(send_and_reschedule, 2)
      } else {
        message("websocket is closed")
      }
    }
    send_and_reschedule()
  }
)

browseURL("http://localhost:9454/")
startServer(host, port, app)

The only slightly weird thing is that there's one last call to send_and_reschedule() that occurs after the websocket is closed.

If you have the development version of later, that can also be addressed. The dev version of later supports cancelling a callback. So you can have the onClose callback immediately cancel the pending call to send_and_reschedule(), and not use the is_open flag at all.

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    cancel_send_and_reschedule <- function() {}
    
    ws$onClose(function() {
      message("in onClose")
      cancel_send_and_reschedule()
    })

    send_and_reschedule <- function() {
      message("in send_and_reschedule")
      message("sending")
      ws$send(paste("Hello", Sys.time()))
      cancel_send_and_reschedule <<- later::later(send_and_reschedule, 2)
    }
    send_and_reschedule()
  }
)

@rundel
Copy link
Contributor Author

rundel commented Aug 9, 2019

That looks much cleaner than what I was able to come up with, it gracefully handles both the server shutdown case and the client disconnect case (which I hadn't considered).

It seems like including something similar to this in the included demos might be useful for others like me who are looking for a more server push based approach.

@wch
Copy link
Collaborator

wch commented Aug 9, 2019

Good idea. I'll leave this issue open as a reminder to add a demo.

@rundel
Copy link
Contributor Author

rundel commented Aug 9, 2019

A tangential (but somewhat related question), do you have any thoughts on how to structure something like this to handle a moderate number of concurrent users where the task that is contained within send_and_reschedule is moderately involved (e.g. reading a file).

With the current setup each new session will trigger onWSOpen and get their own unique chain of later scheduled functions which will not necessarily be synchronized with one another. I can setup some kind of global cache that they all look at but it feels like I'll potentially be wasting a bunch of cycles unnecessarily.

@wch
Copy link
Collaborator

wch commented Aug 9, 2019

You can keep a registry of all currently-open websockets and iterate over it, sending a message to each one, with a single app-level callback that reschedules itself.

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

websockets <- new.env(parent = emptyenv())
next_ws_id <- 0

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list('Content-Type' = 'text/html'),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {
    # Create an ID for this websocket and add it to the registry
    ws_id <- sprintf("%09d", next_ws_id)
    next_ws_id <<- next_ws_id + 1
    
    websockets[[ws_id]] <- ws

    ws$onClose(function() {
      message("in onClose for ", ws_id)
      rm(list = ws_id, envir = websockets)
    })
  }
)

send_and_reschedule <- function() {
  message("send_and_reschedule")
  if (s$isRunning()) {
    later::later(send_and_reschedule, 2)
  } else {
    # If app has closed, clear out websocket registry and exit
    rm(list = ls(websockets, all.names = TRUE), envir = websockets)
    return()
  }
  
  # Send a message to each websocket
  for (id in names(websockets)) {
    message("Sending message to ", id)
    websockets[[id]]$send(paste0("Hello ID ", id, "  ",  Sys.time()))
  }
}

s <- startServer(host, port, app)
send_and_reschedule()

browseURL("http://localhost:9454/")

Some notes:

  • In this example, it doesn't send a message immediately when a connection is made, but you could easily add that.
  • The overhead from scheduling callbacks with later() is pretty insignificant, so that alone shouldn't cause performance issues.
  • If you have expensive code in the callback (like reading a file) that would be the same for each user, you could also use some sort of caching (e.g. the memoise package).

@rundel
Copy link
Contributor Author

rundel commented Aug 13, 2019

Thanks again for the help, I'm slowly wrapping my head around websockets and httpuv and your code has been invaluable in that process.

The approach I ended up with when trying to implement this myself looks like the following:

library(httpuv)

host = "0.0.0.0"
port = 9454

html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'

httpuv:::AppWrapper$set(
  "public", "get_wsconns",
  function() {
    private$wsconns
  }
)

WebServer2 <- R6::R6Class(
  "WebServer2",
  cloneable = FALSE,
  inherit = httpuv:::WebServer,
  public = list(
    initialize = function(host, port, app, interval = 5) {
      super$initialize(host, port, app)

      ws = private$appWrapper$get_wsconns()

      send_and_reschedule <- function() {
        if (!self$isRunning())
          return()

        print(Sys.time())
        #print(names(ws))

        for(n in names(ws)) {
          if (is.null(ws[[n]]))
            next
          msg = paste("Hello", n, Sys.time())
          message("Sent: ", msg)
          ws[[n]]$send(msg)
        }

        later::later(send_and_reschedule, interval)
      }
      send_and_reschedule()
    }
  )
)

app <- list(
  call = function(req) {
    list(
      status = 200L,
      headers = list(
        'Content-Type' = 'text/html'
      ),
      body = glue::glue(html, host = host, port = port)
    )
  },
  onWSOpen = function(ws) {

  }
)

z = WebServer2$new(host, port, app)
browseURL("http://localhost:9454/")
browseURL("http://localhost:9454/")
browseURL("http://localhost:9454/")

it has a lot more messing with the internals of the R6 classes but it avoids the need for the global variables.

A couple of questions that came up as I was working through this:

  • This approach involves monkey patching AppWrapper, was there a reason an accessor function wasn't included for wsconns?

  • wsconns is an environment and currently AppWrapper$onWSClose sets the named wsconn address to NULL but doesn't actually remove the variable. It seems like the latter would be preferable.

@wch
Copy link
Collaborator

wch commented Aug 13, 2019

Instead of dealing with the internals of WebServer and AppWrapper, it would be better to
just wrap your application object in a local environment. Here I've done it with local(), but you could also use a function to generate the app object -- that is, instead of app <- local({ ... }), you have create_app <- function() { ... }; app <- create_app().

library(httpuv)


app <- local({
  host = "0.0.0.0"
  port = 9454
  
  html = '<!DOCTYPE HTML>
<html>
  <head>
    <script>
      var host = "ws://{host}:{port}";
      var ws = new WebSocket(host);

      ws.onmessage = function(msg) {{
        document.getElementsByTagName("p")[0].innerHTML = msg.data;
      }};
    </script>
  </head>
  <body>
    <p>
    </p>
  </body>
</html>
'
  
  websockets <- new.env(parent = emptyenv())
  next_ws_id <- 0
  
  send_and_reschedule <- function() {
    message("send_and_reschedule")
    if (s$isRunning()) {
      later::later(send_and_reschedule, 2)
    } else {
      # If app has closed, clear out websocket registry and exit
      rm(list = ls(websockets, all.names = TRUE), envir = websockets)
      return()
    }
    
    # Send a message to each websocket
    for (id in names(websockets)) {
      message("Sending message to ", id)
      websockets[[id]]$send(paste0("Hello ID ", id, "  ",  Sys.time()))
    }
  }
  
  list(
    call = function(req) {
      list(
        status = 200L,
        headers = list('Content-Type' = 'text/html'),
        body = glue::glue(html, host = host, port = port)
      )
    },
    onWSOpen = function(ws) {
      # On the first websocket connection only, kick off the
      # send_and_reschedule() polling.
      if (next_ws_id == 0) {
        send_and_reschedule()
      }
      
      # Create an ID for this websocket and add it to the registry
      ws_id <- sprintf("%09d", next_ws_id)
      next_ws_id <<- next_ws_id + 1
      
      websockets[[ws_id]] <- ws
      
      ws$onClose(function() {
        message("in onClose for ", ws_id)
        rm(list = ws_id, envir = websockets)
      })
    }
  )
})

s <- startServer(host, port, app)

browseURL("http://localhost:9454/")
  • I think the reason wsconns isn't public is because we never needed it to be.
  • Setting the wsconns entry to NULL instead of removing it seems like a bug. Can you file an issue on that?

@jcheng5
Copy link
Member

jcheng5 commented Aug 13, 2019

@wch What, no fastmap? 😁

@rundel
Copy link
Contributor Author

rundel commented Aug 13, 2019

The local({...}) bit makes this a lot more palatable, but deep down in my soul I can't help but feel wrong using <<-. And this is definitely more than enough for what I'm trying to get working.

The only other bit I feel a bit dissatisfied with is that this seems a lot like reinventing the wheel since AppWrapper is already doing all this internally. Is there a reason why it wouldn't be possible to pass each WebSocket the equivalent of a context pointer back to the parent AppWrapper? Does this kind of circular dependency cause grief with R6 classes?

Thanks again for all the help, you've gone way above and beyond.

I'll submit and issue and a PR for cleaning up the wsconns entry.

@jcheng5
Copy link
Member

jcheng5 commented Aug 13, 2019

Don't feel bad at all about using <<- in this context; it's only wrong IMHO if you don't know with 100% certainty what scope is going to end up taking the assignment. In this case, it's totally well-defined, since next_ws_id is defined right there in the parent scope.

That said, if it makes you feel better, you could replace the line next_ws_id <- 0 with:

get_next_ws_id <- local({
  next_ws_id <- 0L
  function() {
    (next_ws_id <<- next_ws_id + 1L)
  }
})

And then whenever you need a new id, call get_next_ws_id(). This would at least put the variable and the super-assignment right next to each other, and wrap it behind a function that's impossible to use incorrectly.

rundel added a commit to rundel/livecode that referenced this issue Aug 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants