Skip to content

Commit

Permalink
DM/Openapi: use reverse proxy instead of redirect (#5390) (#5465)
Browse files Browse the repository at this point in the history
close #5179
  • Loading branch information
ti-chi-bot authored May 24, 2022
1 parent 0e16e79 commit 1feef71
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 13 deletions.
36 changes: 31 additions & 5 deletions dm/dm/master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ package master

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/http/httputil"

"github.com/pingcap/failpoint"

"github.com/deepmap/oapi-codegen/pkg/middleware"
"github.com/labstack/echo/v4"
Expand All @@ -45,7 +48,7 @@ const (

// redirectRequestToLeaderMW a middleware auto redirect request to leader.
// because the leader has some data in memory, only the leader can process the request.
func (s *Server) redirectRequestToLeaderMW() echo.MiddlewareFunc {
func (s *Server) reverseRequestToLeaderMW(tlsCfg *tls.Config) echo.MiddlewareFunc {
return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(ctx echo.Context) error {
ctx2 := ctx.Request().Context()
Expand All @@ -58,13 +61,36 @@ func (s *Server) redirectRequestToLeaderMW() echo.MiddlewareFunc {
if err != nil {
return err
}
return ctx.Redirect(http.StatusTemporaryRedirect, fmt.Sprintf("http://%s%s", leaderOpenAPIAddr, ctx.Request().RequestURI))

failpoint.Inject("MockNotSetTls", func() {
tlsCfg = nil
})
// simpleProxy just reverses to leader host
simpleProxy := httputil.ReverseProxy{
Director: func(req *http.Request) {
if tlsCfg != nil {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
req.URL.Host = leaderOpenAPIAddr
req.Host = leaderOpenAPIAddr
},
}
if tlsCfg != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsCfg
simpleProxy.Transport = transport
}
log.L().Info("reverse request to leader", zap.String("Request URL", ctx.Request().URL.String()), zap.String("leader", leaderOpenAPIAddr), zap.Bool("hasTLS", tlsCfg != nil))
simpleProxy.ServeHTTP(ctx.Response(), ctx.Request())
return nil
}
}
}

// InitOpenAPIHandles init openapi handlers.
func (s *Server) InitOpenAPIHandles() error {
func (s *Server) InitOpenAPIHandles(tlsCfg *tls.Config) error {
swagger, err := openapi.GetSwagger()
if err != nil {
return err
Expand All @@ -77,7 +103,7 @@ func (s *Server) InitOpenAPIHandles() error {
// set logger
e.Use(openapi.ZapLogger(logger))
e.Use(echomiddleware.Recover())
e.Use(s.redirectRequestToLeaderMW())
e.Use(s.reverseRequestToLeaderMW(tlsCfg))
// disables swagger server name validation. it seems to work poorly
swagger.Servers = nil
// use our validation middleware to check all requests against the OpenAPI schema.
Expand Down
103 changes: 100 additions & 3 deletions dm/dm/master/openapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"context"
"fmt"
"net/http"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/DATA-DOG/go-sqlmock"
"github.com/deepmap/oapi-codegen/pkg/testutil"
"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -75,7 +78,7 @@ func (t *openAPISuite) SetUpTest(c *check.C) {
c.Assert(ha.ClearTestInfoOperation(t.etcdTestCli), check.IsNil)
}

func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
func (t *openAPISuite) TestReverseRequestToLeader(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -134,9 +137,103 @@ func (t *openAPISuite) TestRedirectRequestToLeader(c *check.C) {
c.Assert(resultListSource.Data, check.HasLen, 0)
c.Assert(resultListSource.Total, check.Equals, 0)

// list source not from leader will get a redirect
// list source from non-leader will get result too
result2 := testutil.NewRequest().Get(baseURL).Go(t.testT, s2.echo)
c.Assert(result2.Code(), check.Equals, http.StatusTemporaryRedirect)
c.Assert(result2.Code(), check.Equals, http.StatusOK)
var resultListSource2 openapi.GetSourceListResponse
err = result2.UnmarshalBodyToObject(&resultListSource2)
c.Assert(err, check.IsNil)
c.Assert(resultListSource2.Data, check.HasLen, 0)
c.Assert(resultListSource2.Total, check.Equals, 0)
}

func (t *openAPISuite) TestReverseRequestToHttpsLeader(c *check.C) {
pwd, err := os.Getwd()
require.NoError(t.testT, err)
caPath := pwd + "/tls_for_test/ca.pem"
certPath := pwd + "/tls_for_test/dm.pem"
keyPath := pwd + "/tls_for_test/dm.key"

// master1
masterAddr1 := tempurl.Alloc()[len("http://"):]
peerAddr1 := tempurl.Alloc()[len("http://"):]
cfg1 := NewConfig()
require.NoError(t.testT, cfg1.Parse([]string{
"--name=dm-master-tls-1",
fmt.Sprintf("--data-dir=%s", t.testT.TempDir()),
fmt.Sprintf("--master-addr=https://%s", masterAddr1),
fmt.Sprintf("--advertise-addr=https://%s", masterAddr1),
fmt.Sprintf("--peer-urls=https://%s", peerAddr1),
fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr1),
fmt.Sprintf("--initial-cluster=dm-master-tls-1=https://%s", peerAddr1),
"--ssl-ca=" + caPath,
"--ssl-cert=" + certPath,
"--ssl-key=" + keyPath,
}))
cfg1.ExperimentalFeatures.OpenAPI = true
s1 := NewServer(cfg1)
ctx1, cancel1 := context.WithCancel(context.Background())
require.NoError(t.testT, s1.Start(ctx1))
defer func() {
cancel1()
s1.Close()
}()
// wait the first one become the leader
require.True(t.testT, utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s1.election.IsLeader() && s1.scheduler.Started()
}))

// master2
masterAddr2 := tempurl.Alloc()[len("http://"):]
peerAddr2 := tempurl.Alloc()[len("http://"):]
cfg2 := NewConfig()
require.NoError(t.testT, cfg2.Parse([]string{
"--name=dm-master-tls-2",
fmt.Sprintf("--data-dir=%s", t.testT.TempDir()),
fmt.Sprintf("--master-addr=https://%s", masterAddr2),
fmt.Sprintf("--advertise-addr=https://%s", masterAddr2),
fmt.Sprintf("--peer-urls=https://%s", peerAddr2),
fmt.Sprintf("--advertise-peer-urls=https://%s", peerAddr2),
"--ssl-ca=" + caPath,
"--ssl-cert=" + certPath,
"--ssl-key=" + keyPath,
}))
cfg2.ExperimentalFeatures.OpenAPI = true
cfg2.Join = s1.cfg.MasterAddr // join to an existing cluster
s2 := NewServer(cfg2)
ctx2, cancel2 := context.WithCancel(context.Background())
require.NoError(t.testT, s2.Start(ctx2))
defer func() {
cancel2()
s2.Close()
}()
// wait the second master ready
require.False(t.testT, utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s2.election.IsLeader()
}))

baseURL := "/api/v1/sources"
// list source from leader
result := testutil.NewRequest().Get(baseURL).Go(t.testT, s1.echo)
require.Equal(t.testT, http.StatusOK, result.Code())
var resultListSource openapi.GetSourceListResponse
require.NoError(t.testT, result.UnmarshalBodyToObject(&resultListSource))
require.Len(t.testT, resultListSource.Data, 0)
require.Equal(t.testT, 0, resultListSource.Total)

// with tls, list source not from leader will get result too
result = testutil.NewRequest().Get(baseURL).Go(t.testT, s2.echo)
require.Equal(t.testT, http.StatusOK, result.Code())
var resultListSource2 openapi.GetSourceListResponse
require.NoError(t.testT, result.UnmarshalBodyToObject(&resultListSource2))
require.Len(t.testT, resultListSource2.Data, 0)
require.Equal(t.testT, 0, resultListSource2.Total)

// without tls, list source not from leader will be 502
require.NoError(t.testT, failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls", `return()`))
result = testutil.NewRequest().Get(baseURL).Go(t.testT, s2.echo)
require.Equal(t.testT, http.StatusBadGateway, result.Code())
require.NoError(t.testT, failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/MockNotSetTls"))
}

func (t *openAPISuite) TestOpenAPIWillNotStartInDefaultConfig(c *check.C) {
Expand Down
8 changes: 7 additions & 1 deletion dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,14 @@ func (s *Server) Start(ctx context.Context) (err error) {
"/status": getStatusHandle(),
"/debug/": getDebugHandler(),
}

if s.cfg.ExperimentalFeatures.OpenAPI {
if initOpenAPIErr := s.InitOpenAPIHandles(); initOpenAPIErr != nil {
// tls3 is used to openapi reverse proxy
tls3, err1 := toolutils.NewTLS(s.cfg.SSLCA, s.cfg.SSLCert, s.cfg.SSLKey, s.cfg.AdvertiseAddr, s.cfg.CertAllowedCN)
if err1 != nil {
return terror.ErrMasterTLSConfigNotValid.Delegate(err1)
}
if initOpenAPIErr := s.InitOpenAPIHandles(tls3.TLSConfig()); initOpenAPIErr != nil {
return terror.ErrOpenAPICommonError.Delegate(initOpenAPIErr)
}
userHandles["/api/v1/"] = s.echo
Expand Down
39 changes: 36 additions & 3 deletions dm/tests/openapi/client/openapi_source_check
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python
import sys
import requests
import ssl

SOURCE1_NAME = "mysql-01"
SOURCE2_NAME = "mysql-02"
Expand All @@ -11,6 +12,10 @@ WORKER2_NAME = "worker2"
API_ENDPOINT = "http://127.0.0.1:8261/api/v1/sources"
API_ENDPOINT_NOT_LEADER = "http://127.0.0.1:8361/api/v1/sources"

API_ENDPOINT_HTTPS = "https://127.0.0.1:8261/api/v1/sources"
API_ENDPOINT_NOT_LEADER_HTTPS = "https://127.0.0.1:8361/api/v1/sources"



def create_source_failed():
resp = requests.post(url=API_ENDPOINT)
Expand Down Expand Up @@ -47,6 +52,19 @@ def create_source2_success():
print("create_source1_success resp=", resp.json())
assert resp.status_code == 201

def create_source_success_https(ssl_ca, ssl_cert, ssl_key):
req = {
"case_sensitive": False,
"enable_gtid": False,
"host": "127.0.0.1",
"password": "123456",
"port": 3306,
"source_name": SOURCE1_NAME,
"user": "root",
}
resp = requests.post(url=API_ENDPOINT_HTTPS, json=req, verify=ssl_ca, cert=(ssl_cert, ssl_key))
print("create_source_success_https resp=", resp.json())
assert resp.status_code == 201

def list_source_success(source_count):
resp = requests.get(url=API_ENDPOINT)
Expand All @@ -55,6 +73,12 @@ def list_source_success(source_count):
print("list_source_by_openapi_success resp=", data)
assert data["total"] == int(source_count)

def list_source_success_https(source_count, ssl_ca, ssl_cert, ssl_key):
resp = requests.get(url=API_ENDPOINT_HTTPS, verify=ssl_ca, cert=(ssl_cert, ssl_key))
assert resp.status_code == 200
data = resp.json()
print("list_source_success_https resp=", data)
assert data["total"] == int(source_count)

def list_source_with_status_success(source_count, status_count):
resp = requests.get(url=API_ENDPOINT + "?with_status=true")
Expand All @@ -66,13 +90,19 @@ def list_source_with_status_success(source_count, status_count):
assert len(data["data"][i]["status_list"]) == int(status_count)


def list_source_with_redirect(source_count):
def list_source_with_reverse(source_count):
resp = requests.get(url=API_ENDPOINT_NOT_LEADER)
assert resp.status_code == 200
data = resp.json()
print("list_source_by_openapi_redirect resp=", data)
print("list_source_with_reverse resp=", data)
assert data["total"] == int(source_count)

def list_source_with_reverse_https(source_count, ssl_ca, ssl_cert, ssl_key):
resp = requests.get(url=API_ENDPOINT_NOT_LEADER_HTTPS, verify=ssl_ca, cert=(ssl_cert, ssl_key))
assert resp.status_code == 200
data = resp.json()
print("list_source_with_reverse_https resp=", data)
assert data["total"] == int(source_count)

def delete_source_success(source_name):
resp = requests.delete(url=API_ENDPOINT + "/" + source_name)
Expand Down Expand Up @@ -215,8 +245,11 @@ if __name__ == "__main__":
"create_source_failed": create_source_failed,
"create_source1_success": create_source1_success,
"create_source2_success": create_source2_success,
"create_source_success_https": create_source_success_https,
"list_source_success": list_source_success,
"list_source_with_redirect": list_source_with_redirect,
"list_source_success_https": list_source_success_https,
"list_source_with_reverse_https": list_source_with_reverse_https,
"list_source_with_reverse": list_source_with_reverse,
"list_source_with_status_success": list_source_with_status_success,
"delete_source_failed": delete_source_failed,
"delete_source_success": delete_source_success,
Expand Down
61 changes: 60 additions & 1 deletion dm/tests/openapi/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function test_source() {
openapi_source_check "delete_source_failed" "mysql-01"

# send request to not leader node
openapi_source_check "list_source_with_redirect" 0
openapi_source_check "list_source_with_reverse" 0

echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: SOURCE SUCCESS"
}
Expand Down Expand Up @@ -294,6 +294,62 @@ function test_noshard_task() {
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: NO SHARD TASK SUCCESS"
}

function test_reverse_https() {
echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>START TEST OPENAPI: REVERSE HTTPS"
cleanup_data openapi
cleanup_process

cp $cur/tls_conf/dm-master1.toml $WORK_DIR/
cp $cur/tls_conf/dm-master2.toml $WORK_DIR/
cp $cur/tls_conf/dm-worker1.toml $WORK_DIR/
cp $cur/tls_conf/dm-worker2.toml $WORK_DIR/
sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-master1.toml
sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-master2.toml
sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-worker1.toml
sed -i "s%dir-placeholer%$cur\/tls_conf%g" $WORK_DIR/dm-worker2.toml

# run dm-master1
run_dm_master $WORK_DIR/master1 $MASTER_PORT1 $WORK_DIR/dm-master1.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key"
# join master2
run_dm_master $WORK_DIR/master2 $MASTER_PORT2 $WORK_DIR/dm-master2.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT2 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key"
# run dm-worker1
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $WORK_DIR/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key"
# run dm-worker2
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $WORK_DIR/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key"

prepare_database
# create source successfully
openapi_source_check "create_source_success_https" "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key"

# get source list success
openapi_source_check "list_source_success_https" 1 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key"

# send request to not leader node
openapi_source_check "list_source_with_reverse_https" 1 "$cur/tls_conf/ca.pem" "$cur/tls_conf/dm.pem" "$cur/tls_conf/dm.key"

cleanup_data openapi
cleanup_process

# run dm-master1
run_dm_master $WORK_DIR/master1 $MASTER_PORT1 $cur/conf/dm-master1.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT1
# join master2
run_dm_master $WORK_DIR/master2 $MASTER_PORT2 $cur/conf/dm-master2.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT2
# run dm-worker1
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
# run dm-worker2
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

echo ">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>TEST OPENAPI: REVERSE HTTPS"
}

function test_cluster() {
# list master and worker node
openapi_cluster_check "list_master_success" 2
Expand Down Expand Up @@ -333,6 +389,9 @@ function run() {

test_shard_task
test_noshard_task
test_reverse_https

# NOTE: this test case MUST running at last, because it will offline some members of cluster
test_cluster
}

Expand Down
8 changes: 8 additions & 0 deletions dm/tests/openapi/tls_conf/ca.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-----BEGIN CERTIFICATE-----
MIIBGDCBwAIJAOjYXLFw5V1HMAoGCCqGSM49BAMCMBQxEjAQBgNVBAMMCWxvY2Fs
aG9zdDAgFw0yMDAzMTcxMjAwMzNaGA8yMjkzMTIzMTEyMDAzM1owFDESMBAGA1UE
AwwJbG9jYWxob3N0MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEglCIJD8uVBfD
kuM+UQP+VA7Srbz17WPLA0Sqc+sQ2p6fT6HYKCW60EXiZ/yEC0925iyVbXEEbX4J
xCc2Heow5TAKBggqhkjOPQQDAgNHADBEAiAILL3Zt/3NFeDW9c9UAcJ9lc92E0ZL
GNDuH6i19Fex3wIgT0ZMAKAFSirGGtcLu0emceuk+zVKjJzmYbsLdpj/JuQ=
-----END CERTIFICATE-----
Loading

0 comments on commit 1feef71

Please sign in to comment.