Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Draft design of new WebSockets #6508

Closed
Tracked by #6163
Guitarheroua opened this issue Sep 26, 2024 · 6 comments
Closed
Tracked by #6163

[Access] Draft design of new WebSockets #6508

Guitarheroua opened this issue Sep 26, 2024 · 6 comments
Assignees
Labels

Comments

@Guitarheroua
Copy link
Contributor

Guitarheroua commented Sep 26, 2024

User Story: WebSocket Subscription Management

  1. Connection Establishment:
    The client establishes a single WebSocket connection with the Access Node (AN), e.g., via ws://localhost:8080/ws. This connection is maintained until closed by either the AN or the client.

  2. Subscription Mechanism:
    The client sends a subscription request through the WebSocket to subscribe to topics. The AN responds with either success or failure. Upon success, a unique subscription ID is generated:

    ws.send(JSON.stringify({
      action: 'subscribe',
      topic: 'events',
      arguments: {}
    }));

    The client can subscribe to multiple topics through the same connection, managing the messages received accordingly.

    If needed, the client can pass initial parameters via the arguments field:

    ws.send(JSON.stringify({
      action: 'subscribe',
      topic: 'events',
      arguments: {
        start_height: '123456789'
      }
    }));

    Updates from the AN are received as follows:

    ws.onmessage = (event) => {
      const message = JSON.parse(event.data);
      
      /*
      Example message structure:
      {
        id: 'sub123',
        topic: 'events',
        data: [...]
      }
      */
    
      switch (message.topic) {
        case 'events':
          // Handle events
          break;
        default:
          console.log('Received message for unsupported topic:', message.topic);
      }
    };
  3. Unsubscription:
    To unsubscribe the client sends the following message:

    ws.send(JSON.stringify({
      action: 'unsubscribe',
      id: 'sub123'
    }));
  4. List Active Subscriptions:
    The client can request the list of active subscriptions:

    ws.send(JSON.stringify({
      action: 'list_subscriptions'
    }));
  5. Closing the Connection:
    The client can close the connection manually, or the AN may do so. When the connection closes, all subscriptions are lost.

Access Node Implementation Requirements

WebSocketController Requirements

The new WebSocketController is similar to the existing WebSocketController but includes several key improvements:

  1. Connection Management:
    The new controller should establish the WebSocket connection during construction but avoid subscribing to topics immediately. It will handle ping/pong messages for connection tracking and error management.

  2. Messages Responce
    The response messages for the client should have the following format:

{
    id: 'sub123',
    topic: 'events',
    data: [...], // optional, will be present when receiving data from node
    action: 'subscribe', // optional, will be present when a message is related to the action status
    success: true/false // optional, only present, in case of response to the action. If true - the action is processed successfully, if false - check the error for a reason.
    error_message: 'failed to create subscription' // optional, will be present when an action processing is failed
}

The list_subscriptions action`s response will be different and should have the following format:

{
    action: 'list_subscriptions',
    subscriptions: [
        { topic: 'events', id: 'sub123' },
        { topic: 'blocks', id: 'sub456' }
    ]
}
  1. Message Handling: The new controller listens for incoming client messages and processes actions like subscribe, unsubscribe, and list_subscriptions. Supported topics include:

    • events
    • account_statuses
    • blocks
    • block_headers
    • block_digests
    • transaction_statuses
  2. Handler Creation:
    For each new subscription, the new controller creates a DataProvider specific to the topic. The handler formats and sends the appropriate response to the client, using the correct topic and data.

  3. Unsubscription:
    The new controller should allow unsubscribing by subscription ID.

  4. List Subscriptions:
    The new controller should return a list of all active subscriptions for the client upon request.

  5. Limitations
    The new controller should implement limits on the maximum number of subscriptions per connection, the maximum number of responses per second, and the send timeout.

  6. Connection Handling:
    The new controller should manage connectivity by handling ping/pong messages and unsubscriptions. If the client fails to respond to ping/pong messages, the new controller should gracefully close the connection and clean up all associated subscriptions.

A visual representation of the new REST subscription process:

websockets drawio

New Pub/Sub API Description

1. The router.go

A new AddWsPubSubRoute function will configure the route for the new subscription mechanism, using a distinct address such as v1/ws, separate from the current v1/subscribe_events route. There will be one main route and one handler for the pub/sub mechanism. Different topics (akin to REST routes) will be handled by the new WebSocketController, which reacts to messages from the client.

2. The WebSocketController

The WebSocketController manages subscriptions within a single connection between the client and the node.

type WebSocketController struct {
    conn      *websocket.Conn
    subs      map[string]DataProvider // The key is the subscription ID
    communicationChannel chan interface{}
    /*
    Other fields similar to the current WebSocketController,
    except for `api`, `eventFilterConfig`, and `heartbeatInterval`,
    which are specific to event streaming.
    */
}

The conn field represents the WebSocket connection for bidirectional communication with the client. It handles incoming messages from the client and broadcasts messages back to the client based on subscribed topics. Additionally, it manages ping/pong messages, error handling, and connectivity issues.

The methods associated with the conn field include:

  1. readMessages:
    This method runs while the connection is active. It retrieves, validates, and processes client messages. Actions handled include subscribe, unsubscribe, and list_subscriptions. Additional actions can be added as needed.

  2. writeMessages:
    This method runs while the connection is active, listening on the broadcast channel. It retrieves responses and sends them to the client.

  3. broadcastMessage:
    This method will be called by each DataProvider, who will receive formatted subscription messages and write them to the broadcast channel.

  4. pingPongHandler:
    This method periodically checks connection availability using ping/pong messages and will terminate the connection if the client becomes unresponsive.

The methods associated with the subs field include:

  1. subscribe:
    Triggered by the readMessages method when the action is subscribe. It takes the topic from the message’s topic field, creates the appropriate DataProvider for the topic using a factory function CreateSubscription, and adds an instance of the new handler to the subs map. The client receives a notification confirming the successful subscription along with the specific ID.

  2. unsubscribe:
    It is triggered by the readMessages method when the action is unsubscribe. It removes the relevant handler from the subs map by calling DataProvider::Close and notifying the client of successful unsubscription.

  3. listSubscriptions:
    It is triggered by the readMessages method when the action is list_subscriptions. It gathers all active subscriptions for the current connection, formats the response, and sends it back to the client.

3. The DataProvider

type DataProvider interface {
	ID() string
	Topic() string
	Close() error
}

type DataProvidersFactory struct {
	eventFilterConfig state_stream.EventFilterConfig

	stateStreamApi state_stream.API
	accessApi      access.API
}

func (s *DataProvidersFactory) CreateSubscriptionHandler(topic string, arguments map[string]interface{}, broadcastMessage func(interface{})) (DataProvider, error) {
	switch topic {
	// TODO: Implemented handlers for each topic should be added in respective case
	case EventsTopic,
		AccountStatusesTopic,
		BlocksTopic,
		BlockHeadersTopic,
		BlockDigestsTopic,
		TransactionStatusesTopic:
		return nil, fmt.Errorf("topic \"%s\" not implemented yet", topic)
	default:
		return nil, fmt.Errorf("unsupported topic \"%s\"", topic)
	}
}

The DataProvider interface abstracts the actual subscriptions used by the WebSocketController. Concrete DataProvider implementations will be created during the WebSocketController::subscribe call, depending on the topic provided by the client. For example, the topic events will have an EventsDataProvider implementation managing event subscriptions.

  1. New[Concrete]DataProvider:
    Each constructor function takes a topic, the arguments from the client’s message, and the broadcastMessage callback function. It stores these values and creates the corresponding subscription.Subscription on the backend. Each subscription is unique, identified by an ID from subscription.Subscription and linked to an individual instance of DataProvider. This ensures the client can subscribe to the same topic multiple times with different parameters.

  2. messagesHandler:
    Each handler includes a method that processes messages received from the backend and formats them for the client. This formatted message is passed to the new web socket for further processing by calling the broadcastMessage callback.

  3. Close:
    The method gracefully shuts down the subscription when called.

  4. ID:
    The method should return the subscription.Subscription ID.

  5. Topic:
    The method should return the subscription topic.

  6. NewDataProvider:
    A factory function, part of the DataProvider module that creates concrete DataProvider based on the topic from the WebSocketController and returns a new instance of DataProvider.

WebSocketController and DataProvider Relationship

  1. Receiving a Subscription Request:

    • The client sends a subscription request over the WebSocket, which includes a subscribe action and the topic to subscribe to.
    • The WebSocketController processes this message inside the readMessages() method. It parses the message, and extracts the topic and arguments. Then the subscribe() method is called.
    • Inside the subscribe() method, based on the topic from the subscription request, the WebSocketController calls CreateSubscription to instantiate the appropriate DataProvider for the topic.
    • The NewDataProvider function is a factory method responsible for returning the correct DataProvider (e.g., EventsDataProvider, BlocksSubscriptionHandler, etc.).
    • The created DataProvider is stored in the subs map of the WebSocketController using the topic and a generated subscription ID.
    • The controller then sends a confirmation message back to the client with the subscription ID using broadcastMessage.
  2. Handling Subscription Data:

    • The DataProvider listens for updates from the backend relevant to its topic.
    • When new data is received, the DataProvider calls the broadcastMessage(data) callback function provided by the WebSocketController.
    • The broadcastMessage method then sends this data to the broadcast channel, which the new controller monitors.
  3. Broadcasting Data to the Client:

    • The WebSocketController listens on the broadcast channel using the writeMessages() method.
    • When new data is available, writeMessages() retrieves it from the channel and sends it to the client over the WebSocket connection.
@peterargue
Copy link
Contributor

peterargue commented Oct 9, 2024

Looks great @Guitarheroua. A few comments:

  • I think the subscribe/unsubscribe needs an ID field to uniquely identify a subscription on a topic that could have multiple instances (e.g. events, account statuses)
  • re: “The router should maintain both - the new and old WebSocket (WS) connections for backward compatibility”
    • I think we should create the new system hosted on a new endpoint, and leave the old system in place with no interoperability between them.
    • this would be the simplest and incentivize everyone to move over.
  • We may also need a type field on responses to differentiate between different response message types.
  • We should include special response messages from the subscribe and unsubscribe actions returning details about the subscription (like ID)
  • let's include a list_subscriptions action that returns a list of all active subscriptions

@Guitarheroua
Copy link
Contributor Author

Guitarheroua commented Oct 10, 2024

@peterargue What is described here is the Web Application Messaging Protocol, or simply WAMP Protocol, which has a few implementations in Go. The most popular one is NEXUS, which implements the WAMP protocol and includes the features we need. It's also actively maintained, with a new version released this year. While it seems like a good fit for our requirements, we should first discuss the pros and cons of using it. My main concern is the subscription model on the client side. To me, it adds an extra layer of complexity, and clients might not be happy with that.

@Guitarheroua
Copy link
Contributor Author

@peterargue What is described here is the Web Application Messaging Protocol, or simply WAMP Protocol, which has a few implementations in Go. The most popular one is NEXUS, which implements the WAMP protocol and includes the features we need. It's also actively maintained, with a new version released this year. While it seems like a good fit for our requirements, we should first discuss the pros and cons of using it. My main concern is the subscription model on the client side. To me, it adds an extra layer of complexity, and clients might not be happy with that.

We agreed that the Nexus library offers many useful features, but it also includes a lot of unnecessary functionality that we won't use. Additionally, the WAMP protocol implemented by this library adds an extra layer of complexity, particularly on the client side, making it more challenging to handle.

@peterargue
Copy link
Contributor

Thanks for the updates.

I think we can consolidate the endpoints a bit. With grpc, we get some input validation for free. Since we don't get that with websockets, I think it makes sense to group the endpoints and add explicit argument checks, similar to the rest endpoints. e.g.

events
account_statuses
blocks
block_headers
block_digests
transaction_statuses

each have optional args start_height and start_block_id

For SubscriptionHandler, I think this design works, but I would suggest not including the create method in the interface, since that would require that you already have a handler object instantiated. Instead, have a factory function CreateSubscription(topic string, arguments map[string]interface{}, broadcast chan []byte) (string, error) that returns a new SubscriptionHandler. Also, I'd suggest using Close() error so it implements the more common io.Closer interface.

What does the Endpoint to SubscriptionHandler interface look like? You mentioned using a channel to feed messages back to the broker. How is data passed back from the endpoint to the handler, then to the broker? Do you have any thoughts on what the response messages to the client will look like?

What will the endpoint logic look like? Will it be similar to the existing events endpoint where it simply passes the backend subscription object to the SubscriptionHandler? Are there commonalities between the SubscriptionHandler implementations so they could be reused, or do you think we will need separate implementations for each?

@peterargue
Copy link
Contributor

Subscription Tracking:
The broker should track active subscriptions by topic and subscription ID, ensuring it does not subscribe to the same topic more than once under the same connection.

I don't think we'll want this on all topics, so I don't think it's worth the complexity of adding it for only a subset. It's not harmful to have duplicate streams, and may limit some usecases. Best to use a combination of max subscriptions per connection, max responses per second, and send timeouts.

Unsubscription:
The broker should allow unsubscribing from a topic using both the topic name and subscription ID.

I think a client should only need the subscription ID to unsubscribe

@peterargue
Copy link
Contributor

Looks good @Guitarheroua.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants