diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 57e289b7eff..cc031028094 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -135,7 +135,19 @@ func NewSqlEngine( config.ClusterController.RegisterStoredProcedures(pro) pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook) - pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook + + sqlEngine := &SqlEngine{} + + var dropDatabase = func(ctx context.Context, name string) error { + sqlCtx, err := sqlEngine.NewDefaultContext(ctx) + if err != nil { + return err + } + return pro.DropDatabase(sqlCtx, name) + } + + config.ClusterController.SetDropDatabase(dropDatabase) + pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook() // Create the engine engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{ @@ -223,12 +235,12 @@ func NewSqlEngine( } } - return &SqlEngine{ - provider: pro, - contextFactory: sqlContextFactory(), - dsessFactory: sessFactory, - engine: engine, - }, nil + sqlEngine.provider = pro + sqlEngine.contextFactory = sqlContextFactory() + sqlEngine.dsessFactory = sessFactory + sqlEngine.engine = engine + + return sqlEngine, nil } // NewRebasedSqlEngine returns a smalled rebased engine primarily used in filterbranch. diff --git a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go index 58861156901..6ac797942a8 100644 --- a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go +++ b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go @@ -208,6 +208,92 @@ func (*UpdateBranchControlResponse) Descriptor() ([]byte, []int) { return file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescGZIP(), []int{3} } +type DropDatabaseRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // The name of the database to be dropped. + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` +} + +func (x *DropDatabaseRequest) Reset() { + *x = DropDatabaseRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DropDatabaseRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DropDatabaseRequest) ProtoMessage() {} + +func (x *DropDatabaseRequest) ProtoReflect() protoreflect.Message { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DropDatabaseRequest.ProtoReflect.Descriptor instead. +func (*DropDatabaseRequest) Descriptor() ([]byte, []int) { + return file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescGZIP(), []int{4} +} + +func (x *DropDatabaseRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +type DropDatabaseResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *DropDatabaseResponse) Reset() { + *x = DropDatabaseResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *DropDatabaseResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DropDatabaseResponse) ProtoMessage() {} + +func (x *DropDatabaseResponse) ProtoReflect() protoreflect.Message { + mi := &file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DropDatabaseResponse.ProtoReflect.Descriptor instead. +func (*DropDatabaseResponse) Descriptor() ([]byte, []int) { + return file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescGZIP(), []int{5} +} + var File_dolt_services_replicationapi_v1alpha1_replication_proto protoreflect.FileDescriptor var file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDesc = []byte{ @@ -230,35 +316,48 @@ var file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDesc = []byt 0x74, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x12, 0x73, 0x65, 0x72, 0x69, 0x61, 0x6c, 0x69, 0x7a, 0x65, 0x64, 0x43, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x1d, 0x0a, 0x1b, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, - 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0xd5, - 0x02, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x53, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x9f, 0x01, 0x0a, 0x14, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, - 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, 0x12, 0x42, - 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, - 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, - 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, - 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x43, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, - 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, - 0x65, 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x9c, 0x01, 0x0a, 0x13, 0x55, 0x70, 0x64, 0x61, - 0x74, 0x65, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x12, - 0x41, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, - 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, - 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x42, 0x72, - 0x61, 0x6e, 0x63, 0x68, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x42, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x29, + 0x0a, 0x13, 0x44, 0x72, 0x6f, 0x70, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x16, 0x0a, 0x14, 0x44, 0x72, 0x6f, + 0x70, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x32, 0xdf, 0x03, 0x0a, 0x12, 0x52, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x9f, 0x01, 0x0a, 0x14, 0x55, 0x70, 0x64, + 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, + 0x73, 0x12, 0x42, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, + 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, 0x74, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x43, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x55, 0x73, 0x65, 0x72, 0x73, 0x41, 0x6e, 0x64, 0x47, 0x72, 0x61, 0x6e, + 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x9c, 0x01, 0x0a, 0x13, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, 0x6f, 0x6e, 0x74, 0x72, + 0x6f, 0x6c, 0x12, 0x41, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, 0x6c, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x5b, 0x5a, 0x59, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, - 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x6f, 0x6c, 0x74, 0x68, 0x75, 0x62, 0x2f, 0x64, 0x6f, 0x6c, - 0x74, 0x2f, 0x67, 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x64, - 0x6f, 0x6c, 0x74, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x70, - 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x61, 0x6c, - 0x70, 0x68, 0x61, 0x31, 0x3b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, - 0x61, 0x70, 0x69, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x42, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x55, 0x70, + 0x64, 0x61, 0x74, 0x65, 0x42, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x43, 0x6f, 0x6e, 0x74, 0x72, 0x6f, + 0x6c, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x87, 0x01, 0x0a, 0x0c, 0x44, 0x72, + 0x6f, 0x70, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x12, 0x3a, 0x2e, 0x64, 0x6f, 0x6c, + 0x74, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, + 0x61, 0x31, 0x2e, 0x44, 0x72, 0x6f, 0x70, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x3b, 0x2e, 0x64, 0x6f, 0x6c, 0x74, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2e, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x44, + 0x72, 0x6f, 0x70, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x42, 0x5b, 0x5a, 0x59, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x64, 0x6f, 0x6c, 0x74, 0x68, 0x75, 0x62, 0x2f, 0x64, 0x6f, 0x6c, 0x74, 0x2f, 0x67, + 0x6f, 0x2f, 0x67, 0x65, 0x6e, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x64, 0x6f, 0x6c, 0x74, + 0x2f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, + 0x31, 0x3b, 0x72, 0x65, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x70, 0x69, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -273,20 +372,24 @@ func file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescGZIP() return file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDescData } -var file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_dolt_services_replicationapi_v1alpha1_replication_proto_goTypes = []interface{}{ (*UpdateUsersAndGrantsRequest)(nil), // 0: dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsRequest (*UpdateUsersAndGrantsResponse)(nil), // 1: dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsResponse (*UpdateBranchControlRequest)(nil), // 2: dolt.services.replicationapi.v1alpha1.UpdateBranchControlRequest (*UpdateBranchControlResponse)(nil), // 3: dolt.services.replicationapi.v1alpha1.UpdateBranchControlResponse + (*DropDatabaseRequest)(nil), // 4: dolt.services.replicationapi.v1alpha1.DropDatabaseRequest + (*DropDatabaseResponse)(nil), // 5: dolt.services.replicationapi.v1alpha1.DropDatabaseResponse } var file_dolt_services_replicationapi_v1alpha1_replication_proto_depIdxs = []int32{ 0, // 0: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateUsersAndGrants:input_type -> dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsRequest 2, // 1: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateBranchControl:input_type -> dolt.services.replicationapi.v1alpha1.UpdateBranchControlRequest - 1, // 2: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateUsersAndGrants:output_type -> dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsResponse - 3, // 3: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateBranchControl:output_type -> dolt.services.replicationapi.v1alpha1.UpdateBranchControlResponse - 2, // [2:4] is the sub-list for method output_type - 0, // [0:2] is the sub-list for method input_type + 4, // 2: dolt.services.replicationapi.v1alpha1.ReplicationService.DropDatabase:input_type -> dolt.services.replicationapi.v1alpha1.DropDatabaseRequest + 1, // 3: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateUsersAndGrants:output_type -> dolt.services.replicationapi.v1alpha1.UpdateUsersAndGrantsResponse + 3, // 4: dolt.services.replicationapi.v1alpha1.ReplicationService.UpdateBranchControl:output_type -> dolt.services.replicationapi.v1alpha1.UpdateBranchControlResponse + 5, // 5: dolt.services.replicationapi.v1alpha1.ReplicationService.DropDatabase:output_type -> dolt.services.replicationapi.v1alpha1.DropDatabaseResponse + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type 0, // [0:0] is the sub-list for extension type_name 0, // [0:0] is the sub-list for extension extendee 0, // [0:0] is the sub-list for field type_name @@ -346,6 +449,30 @@ func file_dolt_services_replicationapi_v1alpha1_replication_proto_init() { return nil } } + file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DropDatabaseRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_dolt_services_replicationapi_v1alpha1_replication_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*DropDatabaseResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -353,7 +480,7 @@ func file_dolt_services_replicationapi_v1alpha1_replication_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_dolt_services_replicationapi_v1alpha1_replication_proto_rawDesc, NumEnums: 0, - NumMessages: 4, + NumMessages: 6, NumExtensions: 0, NumServices: 1, }, diff --git a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go index 254605e2155..275cc34a395 100644 --- a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go +++ b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go @@ -45,6 +45,7 @@ type ReplicationServiceClient interface { // with OverwriteUsersAndGrantData. UpdateUsersAndGrants(ctx context.Context, in *UpdateUsersAndGrantsRequest, opts ...grpc.CallOption) (*UpdateUsersAndGrantsResponse, error) UpdateBranchControl(ctx context.Context, in *UpdateBranchControlRequest, opts ...grpc.CallOption) (*UpdateBranchControlResponse, error) + DropDatabase(ctx context.Context, in *DropDatabaseRequest, opts ...grpc.CallOption) (*DropDatabaseResponse, error) } type replicationServiceClient struct { @@ -73,6 +74,15 @@ func (c *replicationServiceClient) UpdateBranchControl(ctx context.Context, in * return out, nil } +func (c *replicationServiceClient) DropDatabase(ctx context.Context, in *DropDatabaseRequest, opts ...grpc.CallOption) (*DropDatabaseResponse, error) { + out := new(DropDatabaseResponse) + err := c.cc.Invoke(ctx, "/dolt.services.replicationapi.v1alpha1.ReplicationService/DropDatabase", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // ReplicationServiceServer is the server API for ReplicationService service. // All implementations must embed UnimplementedReplicationServiceServer // for forward compatibility @@ -85,6 +95,7 @@ type ReplicationServiceServer interface { // with OverwriteUsersAndGrantData. UpdateUsersAndGrants(context.Context, *UpdateUsersAndGrantsRequest) (*UpdateUsersAndGrantsResponse, error) UpdateBranchControl(context.Context, *UpdateBranchControlRequest) (*UpdateBranchControlResponse, error) + DropDatabase(context.Context, *DropDatabaseRequest) (*DropDatabaseResponse, error) mustEmbedUnimplementedReplicationServiceServer() } @@ -98,6 +109,9 @@ func (UnimplementedReplicationServiceServer) UpdateUsersAndGrants(context.Contex func (UnimplementedReplicationServiceServer) UpdateBranchControl(context.Context, *UpdateBranchControlRequest) (*UpdateBranchControlResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method UpdateBranchControl not implemented") } +func (UnimplementedReplicationServiceServer) DropDatabase(context.Context, *DropDatabaseRequest) (*DropDatabaseResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method DropDatabase not implemented") +} func (UnimplementedReplicationServiceServer) mustEmbedUnimplementedReplicationServiceServer() {} // UnsafeReplicationServiceServer may be embedded to opt out of forward compatibility for this service. @@ -147,6 +161,24 @@ func _ReplicationService_UpdateBranchControl_Handler(srv interface{}, ctx contex return interceptor(ctx, in, info, handler) } +func _ReplicationService_DropDatabase_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DropDatabaseRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ReplicationServiceServer).DropDatabase(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/dolt.services.replicationapi.v1alpha1.ReplicationService/DropDatabase", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ReplicationServiceServer).DropDatabase(ctx, req.(*DropDatabaseRequest)) + } + return interceptor(ctx, in, info, handler) +} + // ReplicationService_ServiceDesc is the grpc.ServiceDesc for ReplicationService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -162,6 +194,10 @@ var ReplicationService_ServiceDesc = grpc.ServiceDesc{ MethodName: "UpdateBranchControl", Handler: _ReplicationService_UpdateBranchControl_Handler, }, + { + MethodName: "DropDatabase", + Handler: _ReplicationService_DropDatabase_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "dolt/services/replicationapi/v1alpha1/replication.proto", diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index a5bc5f8e0b2..7930a6ae1d9 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -36,7 +36,9 @@ import ( gmstypes "github.com/dolthub/go-mysql-server/sql/types" "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" @@ -94,6 +96,8 @@ type Controller struct { branchControlController *branch_control.Controller branchControlFilesys filesys.Filesys bcReplication *branchControlReplication + + dropDatabase func(context.Context, string) error } type sqlvars interface { @@ -310,12 +314,31 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) { store.Register(newTransitionToStandbyProcedure(c)) } -func (c *Controller) DropDatabaseHook(dbname string) { +// Incoming drop database replication requests need a way to drop a database in +// the sqle.DatabaseProvider. This is our callback for that functionality. +func (c *Controller) SetDropDatabase(dropDatabase func(context.Context, string) error) { if c == nil { return } c.mu.Lock() defer c.mu.Unlock() + c.dropDatabase = dropDatabase +} + +// Our DropDatabaseHook gets called when the database provider drops a +// database. This is how we learn that we need to replicate a drop database. +func (c *Controller) DropDatabaseHook() func(string) { + if c == nil { + return nil + } + return c.dropDatabaseHook +} + +func (c *Controller) dropDatabaseHook(dbname string) { + c.mu.Lock() + defer c.mu.Unlock() + + // We always cleanup the commithooks associated with that database. j := 0 for i := 0; i < len(c.commithooks); i++ { @@ -329,6 +352,43 @@ func (c *Controller) DropDatabaseHook(dbname string) { j += 1 } c.commithooks = c.commithooks[:j] + + if c.role != RolePrimary { + return + } + + // If we are the primary, we will replicate the drop to our standby replicas. + + for _, client := range c.replicationClients { + client := client + go c.replicateDropDatabase(client, dbname) + } +} + +func (c *Controller) replicateDropDatabase(client *replicationServiceClient, dbname string) { + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = time.Millisecond + bo.MaxInterval = time.Minute + bo.MaxElapsedTime = 0 + for { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + _, err := client.client.DropDatabase(ctx, &replicationapi.DropDatabaseRequest{ + Name: dbname, + }) + cancel() + if err == nil { + c.lgr.Tracef("successfully replicated drop of [%s] to %s", dbname, client.remote) + return + } + if status.Code(err) == codes.FailedPrecondition { + c.lgr.Warnf("drop of [%s] to %s will note be replicated; FailedPrecondition", dbname, client.remote) + return + } + c.lgr.Warnf("failed to replicate drop of [%s] to %s: %v", dbname, client.remote, err) + d := bo.NextBackOff() + c.lgr.Tracef("sleeping %v before next drop attempt for database [%s] at %s", d, dbname, client.remote) + time.Sleep(d) + } } func (c *Controller) ClusterDatabase() sql.Database { @@ -639,6 +699,7 @@ func (c *Controller) RegisterGrpcServices(srv *grpc.Server) { mysqlDb: c.mysqlDb, branchControl: c.branchControlController, branchControlFilesys: c.branchControlFilesys, + dropDatabase: c.dropDatabase, lgr: c.lgr.WithFields(logrus.Fields{}), }) } diff --git a/go/libraries/doltcore/sqle/cluster/replication_service.go b/go/libraries/doltcore/sqle/cluster/replication_service.go index 5e569f5a61b..528503f7350 100644 --- a/go/libraries/doltcore/sqle/cluster/replication_service.go +++ b/go/libraries/doltcore/sqle/cluster/replication_service.go @@ -20,6 +20,8 @@ import ( "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/mysql_db" "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/utils/filesys" @@ -38,6 +40,8 @@ type replicationServiceServer struct { branchControl BranchControlPersistence branchControlFilesys filesys.Filesys + + dropDatabase func(context.Context, string) error } func (s *replicationServiceServer) UpdateUsersAndGrants(ctx context.Context, req *replicationapi.UpdateUsersAndGrantsRequest) (*replicationapi.UpdateUsersAndGrantsResponse, error) { @@ -66,3 +70,16 @@ func (s *replicationServiceServer) UpdateBranchControl(ctx context.Context, req } return &replicationapi.UpdateBranchControlResponse{}, nil } + +func (s *replicationServiceServer) DropDatabase(ctx context.Context, req *replicationapi.DropDatabaseRequest) (*replicationapi.DropDatabaseResponse, error) { + if s.dropDatabase == nil { + return nil, status.Error(codes.Unimplemented, "unimplemented") + } + + err := s.dropDatabase(ctx, req.Name) + s.lgr.Tracef("dropped database [%s] through sqle.DropDatabase. err: %v", req.Name, err) + if err != nil && !sql.ErrDatabaseNotFound.Is(err) { + return nil, err + } + return &replicationapi.DropDatabaseResponse{}, nil +} diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index 8ffc7a8d0d6..635d78a9cb6 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -1482,3 +1482,91 @@ tests: result: columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] rows: [] +- name: dropped database is no longer present on replica + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'SET @@GLOBAL.dolt_cluster_ack_writes_timeout_secs = 10' + - exec: 'create database repo1' + - exec: 'use repo1' + - exec: 'create table vals (i int primary key)' + - exec: 'create database repo2' + - exec: 'use repo2' + - exec: 'create table vals (i int primary key)' + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: + - ["repo1", "standby", "primary", "1", "1", "NULL"] + - ["repo2", "standby", "primary", "1", "1", "NULL"] + - exec: 'use repo1' + - exec: 'drop database repo2' + - exec: 'insert into vals values (0),(1),(2),(3),(4)' + - query: 'show databases' + result: + columns: ["Database"] + rows: + - ["dolt_cluster"] + - ["information_schema"] + - ["mysql"] + - ["repo1"] + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: + - ["repo1", "standby", "primary", "1", "1", "NULL"] + - on: server2 + queries: + - query: 'show databases' + result: + columns: ["Database"] + rows: + - ["dolt_cluster"] + - ["information_schema"] + - ["mysql"] + - ["repo1"] + retry_attempts: 100 + - query: "select `database`, standby_remote, role, epoch from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch",] + rows: + - ["repo1", "standby", "standby", "1"] diff --git a/proto/dolt/services/replicationapi/v1alpha1/replication.proto b/proto/dolt/services/replicationapi/v1alpha1/replication.proto index 4189b2ec7d6..e6aed57c888 100644 --- a/proto/dolt/services/replicationapi/v1alpha1/replication.proto +++ b/proto/dolt/services/replicationapi/v1alpha1/replication.proto @@ -28,6 +28,8 @@ service ReplicationService { rpc UpdateUsersAndGrants(UpdateUsersAndGrantsRequest) returns (UpdateUsersAndGrantsResponse); rpc UpdateBranchControl(UpdateBranchControlRequest) returns (UpdateBranchControlResponse); + + rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse); } message UpdateUsersAndGrantsRequest { @@ -46,3 +48,11 @@ message UpdateBranchControlRequest { message UpdateBranchControlResponse { } + +message DropDatabaseRequest { + // The name of the database to be dropped. + string name = 1; +} + +message DropDatabaseResponse { +}