diff --git a/internal/federation/server.go b/internal/federation/server.go index 02fb7de1..f20629e3 100644 --- a/internal/federation/server.go +++ b/internal/federation/server.go @@ -213,7 +213,7 @@ func (s *Server) MustSendTransaction(t *testing.T, deployment *docker.Deployment // SendFederationRequest signs and sends an arbitrary federation request from this server. // // The requests will be routed according to the deployment map in `deployment`. -func (s *Server) SendFederationRequest(deployment *docker.Deployment, req gomatrixserverlib.FederationRequest, resBody interface{}) error { +func (s *Server) SendFederationRequest(ctx context.Context, deployment *docker.Deployment, req gomatrixserverlib.FederationRequest, resBody interface{}) error { if err := req.Sign(gomatrixserverlib.ServerName(s.serverName), s.KeyID, s.Priv); err != nil { return err } @@ -224,7 +224,7 @@ func (s *Server) SendFederationRequest(deployment *docker.Deployment, req gomatr } httpClient := gomatrixserverlib.NewClient(gomatrixserverlib.WithTransport(&docker.RoundTripper{Deployment: deployment})) - return httpClient.DoRequestAndParseResponse(context.Background(), httpReq, resBody) + return httpClient.DoRequestAndParseResponse(ctx, httpReq, resBody) } // MustCreateEvent will create and sign a new latest event for the given room. diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 3fa1092a..5fa56c48 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -7,8 +7,11 @@ package tests import ( + "context" "encoding/json" "fmt" + "io/ioutil" + "net" "net/http" "net/url" "strconv" @@ -16,9 +19,12 @@ import ( "testing" "time" + "github.com/gorilla/mux" "github.com/tidwall/gjson" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" "github.com/matrix-org/complement/internal/b" "github.com/matrix-org/complement/internal/client" @@ -141,76 +147,122 @@ func TestPartialStateJoin(t *testing.T) { // the HS will make an /event_auth request for the event federation.HandleEventAuthRequests()(psjResult.Server) + event := psjResult.CreateMessageEvent(t, "derek", nil) + t.Logf("Derek created event with ID %s", event.EventID()) + // derek sends an event in the room - event := psjResult.Server.MustCreateEvent(t, psjResult.ServerRoom, b.Event{ - Type: "m.room.message", - Sender: psjResult.Server.UserID("derek"), - Content: map[string]interface{}{ - "msgtype": "m.text", - "body": "Message", - }, - }) - psjResult.ServerRoom.AddEvent(event) - psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) - t.Logf("Derek sent event event ID %s", event.EventID()) + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event) + }) - /* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks. - * https://github.com/matrix-org/synapse/issues/13146 - alice.MustSyncUntil(t, - client.SyncReq{ - Filter: buildLazyLoadingSyncFilter(nil), - }, - client.SyncTimelineHasEventID(psjResult.ServerRoom.RoomID, event.EventID()), - ) - */ + // we should be able to receive events with a missing prev event over federation during the resync + t.Run("CanReceiveEventsWithMissingParentsDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") - // still, Alice should be able to see the event with an /event request. We might have to try it a few times. - start := time.Now() - for { - if time.Since(start) > time.Second { - t.Fatalf("timeout waiting for received event to be visible") - } - res := alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "event", event.EventID()}) - eventResBody := client.ParseJSON(t, res) - if res.StatusCode == 200 { - t.Logf("Successfully fetched received event %s", event.EventID()) - break - } - if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" { - t.Logf("Fetching received event failed with M_NOT_FOUND; will retry") - time.Sleep(100 * time.Millisecond) - continue - } - t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody)) - } + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() - // allow the partial join to complete - psjResult.FinishStateRequest() - alice.MustSyncUntil(t, - client.SyncReq{}, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), - ) + // we construct the following event graph: + // ... <-- M <-- A <-- B + // + // M is @alice:hs1's join event. + // A and B are regular m.room.messsage events created by @derek on the Complement homeserver. + // + // initially, hs1 only knows about event M. + // we send only event B to hs1. + eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) + eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID()}) + t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) + t.Logf("Derek created event A with ID %s", eventA.EventID()) + t.Logf("Derek created event B with ID %s", eventB.EventID()) + + // the HS will make an /event_auth request for event A + federation.HandleEventAuthRequests()(psjResult.Server) - // check the server's idea of the state at the event. We do this by making a `state_ids` request over federation - stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", - fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", - url.PathEscape(psjResult.ServerRoom.RoomID), - url.QueryEscape(event.EventID()), - ), - ) - var respStateIDs gomatrixserverlib.RespStateIDs - if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil { - t.Errorf("/state_ids request returned non-200: %s", err) - return - } - var gotState, expectedState []interface{} - for _, ev := range respStateIDs.StateEventIDs { - gotState = append(gotState, ev) - } - for _, ev := range psjResult.ServerRoom.AllCurrentState() { - expectedState = append(expectedState, ev.EventID()) - } - must.CheckOffAll(t, gotState, expectedState) + // the HS will make a /get_missing_events request for the missing prev events of event B + handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventA}) + + // send event B to hs1 + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) + }) + + // we should be able to receive events with partially missing prev events over federation during the resync + t.Run("CanReceiveEventsWithHalfMissingParentsDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // we construct the following event graph: + // +---------+ + // v \ + // ... <-- M <-- A <-- B + // + // M is @alice:hs1's join event. + // A and B are regular m.room.messsage events created by @derek on the Complement homeserver. + // + // initially, hs1 only knows about event M. + // we send only event B to hs1. + eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) + eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) + t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) + t.Logf("Derek created event A with ID %s", eventA.EventID()) + t.Logf("Derek created event B with ID %s", eventB.EventID()) + + // the HS will make an /event_auth request for event A + federation.HandleEventAuthRequests()(psjResult.Server) + + // the HS will make a /get_missing_events request for the missing prev event of event B + handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventA}) + + // send event B to hs1 + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) + }) + + // we should be able to receive events with a missing prev event, with half missing prev events, + // over federation during the resync + t.Run("CanReceiveEventsWithHalfMissingGrandparentsDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // we construct the following event graph: + // +---------+ + // v \ + // ... <-- M <-- A <-- B <-- C + // + // M is @alice:hs1's join event. + // A, B and C are regular m.room.messsage events created by @derek on the Complement homeserver. + // + // initially, hs1 only knows about event M. + // we send only event C to hs1. + eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) + eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) + eventC := psjResult.CreateMessageEvent(t, "derek", []string{eventB.EventID()}) + t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) + t.Logf("Derek created event A with ID %s", eventA.EventID()) + t.Logf("Derek created event B with ID %s", eventB.EventID()) + t.Logf("Derek created event C with ID %s", eventC.EventID()) + + // the HS will make a /get_missing_events request for the missing prev event of event C, + // to which we respond with event B only. + handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventB}) + + // dedicated state_ids and state handlers for event A + handleStateIdsRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil) + handleStateRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil) + + // send event C to hs1 + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC) }) // a request to (client-side) /members?at= should block until the (federation) /state request completes @@ -513,6 +565,104 @@ func TestPartialStateJoin(t *testing.T) { }) } +// test reception of an event over federation during a resync +// sends the given event to the homeserver under test, checks that a client can see it and checks +// the state at the event +func testReceiveEventDuringPartialStateJoin( + t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event, +) { + // send the event to the homeserver + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + + /* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks. + * https://github.com/matrix-org/synapse/issues/13146 + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncTimelineHasEventID(psjResult.ServerRoom.RoomID, event.EventID()), + ) + */ + + // still, Alice should be able to see the event with an /event request. We might have to try it a few times. + alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "event", event.EventID()}, + client.WithRetryUntil(time.Second, func(res *http.Response) bool { + if res.StatusCode == 200 { + return true + } + eventResBody := client.ParseJSON(t, res) + if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" { + return false + } + t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody)) + return false + }), + ) + t.Logf("Successfully fetched received event %s", event.EventID()) + + // fire off a /state_ids request for the last event. + // it must either: + // * block because the homeserver does not have full state at the last event + // * or 403 because the homeserver does not have full state yet and does not consider the + // Complement homeserver to be in the room + // Synapse's behaviour will likely change once https://github.com/matrix-org/synapse/issues/13288 + // is resolved. For now, we use this to check whether Synapse has calculated the partial state + // flag for the last event correctly. + + stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", + fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", + url.PathEscape(psjResult.ServerRoom.RoomID), + url.QueryEscape(event.EventID()), + ), + ) + var respStateIDs gomatrixserverlib.RespStateIDs + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err := psjResult.Server.SendFederationRequest(ctx, deployment, stateReq, &respStateIDs) + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + t.Logf("/state_ids request for event %s blocked as expected", event.EventID()) + } else if httpErr, ok := err.(gomatrix.HTTPError); ok && httpErr.Code == 403 { + t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID()) + } else { + t.Errorf("/state_ids request returned non-200: %s", err) + } + } else { + // since we have not yet given the homeserver the full state at the join event and allowed + // the partial join to complete, it can't possibly know the full state at the last event. + // While it may be possible for the response to be correct by some accident of state res, + // the homeserver is still wrong in spirit. + t.Fatalf("/state_ids request for event %s did not block when it should have", event.EventID()) + } + + // allow the partial join to complete + psjResult.FinishStateRequest() + alice.MustSyncUntil(t, + client.SyncReq{}, + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + + // check the server's idea of the state at the event. We do this by making a `state_ids` request over federation + stateReq = gomatrixserverlib.NewFederationRequest("GET", "hs1", + fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", + url.PathEscape(psjResult.ServerRoom.RoomID), + url.QueryEscape(event.EventID()), + ), + ) + if err := psjResult.Server.SendFederationRequest(context.Background(), deployment, stateReq, &respStateIDs); err != nil { + t.Errorf("/state_ids request returned non-200: %s", err) + return + } + var gotState, expectedState []interface{} + for _, ev := range respStateIDs.StateEventIDs { + gotState = append(gotState, ev) + } + for _, ev := range psjResult.ServerRoom.AllCurrentState() { + expectedState = append(expectedState, ev.EventID()) + } + must.CheckOffAll(t, gotState, expectedState) +} + // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq func buildLazyLoadingSyncFilter(timelineOptions map[string]interface{}) string { timelineFilter := map[string]interface{}{ @@ -631,6 +781,28 @@ func (psj *partialStateJoinResult) Destroy() { } } +// send a message into the room without letting the homeserver under test know about it. +func (psj *partialStateJoinResult) CreateMessageEvent(t *testing.T, senderLocalpart string, prevEventIDs []string) *gomatrixserverlib.Event { + var prevEvents interface{} + if prevEventIDs == nil { + prevEvents = nil + } else { + prevEvents = prevEventIDs + } + + event := psj.Server.MustCreateEvent(t, psj.ServerRoom, b.Event{ + Type: "m.room.message", + Sender: psj.Server.UserID(senderLocalpart), + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Message", + }, + PrevEvents: prevEvents, + }) + psj.ServerRoom.AddEvent(event) + return event +} + // wait for a /state_ids request for the test room to arrive func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) { psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") @@ -664,7 +836,7 @@ func handleStateIdsRequests( if sendResponseWaiter != nil { sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state_ids request") } - t.Logf("Replying to /state_ids request") + t.Logf("Replying to /state_ids request for event %s", queryParams["event_id"]) res := gomatrixserverlib.RespStateIDs{ AuthEventIDs: eventIDsFromEvents(serverRoom.AuthChainForEvents(roomState)), @@ -718,6 +890,50 @@ func handleStateRequests( ) } +// register a handler for `/get_missing_events` requests +func handleGetMissingEventsRequests( + t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, + eventsToReturn []*gomatrixserverlib.Event, +) { + srv.Mux().HandleFunc("/_matrix/federation/v1/get_missing_events/{roomID}", func(w http.ResponseWriter, req *http.Request) { + roomID := mux.Vars(req)["roomID"] + if roomID != serverRoom.RoomID { + t.Fatalf("Received unexpected /get_missing_events request for room: %s", roomID) + } + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("unable to read request body: %v", err) + } + var getMissingEventsRequest struct { + EarliestEvents []string `json:"earliest_events"` + LatestEvents []string `json:"latest_events"` + Limit int `json:"int"` + MinDepth int `json:"min_depth"` + } + err = json.Unmarshal(body, &getMissingEventsRequest) + if err != nil { + errResp := util.MessageResponse(400, err.Error()) + w.WriteHeader(errResp.Code) + b, _ := json.Marshal(errResp.JSON) + w.Write(b) + return + } + + t.Logf("Incoming get_missing_events request for prev events of %s in room %s", getMissingEventsRequest.LatestEvents, roomID) + + // TODO: return events based on those requested + w.WriteHeader(200) + res := struct { + Events []*gomatrixserverlib.Event `json:"events"` + }{ + Events: eventsToReturn, + } + responseBytes, _ := json.Marshal(&res) + w.Write(responseBytes) + }).Methods("POST") +} + func eventIDsFromEvents(he []*gomatrixserverlib.Event) []string { eventIDs := make([]string, len(he)) for i := range he { diff --git a/tests/federation_room_join_test.go b/tests/federation_room_join_test.go index ce6af1dc..50849f2c 100644 --- a/tests/federation_room_join_test.go +++ b/tests/federation_room_join_test.go @@ -385,7 +385,7 @@ func testValidationForSendMembershipEndpoint(t *testing.T, baseApiPath, expected } var res interface{} - err := srv.SendFederationRequest(deployment, req, &res) + err := srv.SendFederationRequest(context.Background(), deployment, req, &res) if err == nil { t.Errorf("send request returned 200") return