Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add implementation for events data providers #6588

Open
Tracked by #6163
Guitarheroua opened this issue Oct 22, 2024 · 0 comments · May be fixed by #6766
Open
Tracked by #6163

[Access] Add implementation for events data providers #6588

Guitarheroua opened this issue Oct 22, 2024 · 0 comments · May be fixed by #6766
Assignees
Labels

Comments

@Guitarheroua
Copy link
Contributor

Guitarheroua commented Oct 22, 2024

Events data providers should be implemented as part of the new WebSocket pub/sub system based on the [Draft design of new WebSockets] (#6508).

Requirements:

  1. Events data provider constructors should be implemented. The constructor should create a corresponding subscription.Subscription based on input arguments, and store topic, subscription and other necessary parameters in a newly created instance. These constructors should be called in a DataProviderFactory::NewDataProvider method.

  2. Run should be implemented. It will start in a forever loop until the subscription is closed. This method collects streaming data from the subscription, similar to how we do it, for example in SubscribeEventsFromStartBlockID:

func (h *Handler) SubscribeEventsFromStartBlockID(request *executiondata.SubscribeEventsFromStartBlockIDRequest, stream executiondata.ExecutionDataAPI_SubscribeEventsFromStartBlockIDServer) error {
// check if the maximum number of streams is reached
if h.StreamCount.Load() >= h.MaxStreams {
return status.Errorf(codes.ResourceExhausted, "maximum number of streams reached")
}
h.StreamCount.Add(1)
defer h.StreamCount.Add(-1)
startBlockID, err := convert.BlockID(request.GetStartBlockId())
if err != nil {
return status.Errorf(codes.InvalidArgument, "could not convert start block ID: %v", err)
}
filter, err := h.getEventFilter(request.GetFilter())
if err != nil {
return err
}
sub := h.api.SubscribeEventsFromStartBlockID(stream.Context(), startBlockID, filter)
return subscription.HandleSubscription(sub, h.handleEventsResponse(stream.Send, request.HeartbeatInterval, request.GetEventEncodingVersion()))
}

Then the data is formatted in response messages for the client and written to the send callback.

Also, MessageIndex should be included in the response.

  1. The Close method should be implemented to gracefully shut down the subscription.

  2. ID and Topic getters should return UUID and the topic respectively.

@Guitarheroua Guitarheroua changed the title [Access] Add implementation EventStatusSubscriptionHandler for event subscriptions [Access] Add implementation for events data providers Nov 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants