From 79161b91f09f93b0aa8c69d643966c21f355c278 Mon Sep 17 00:00:00 2001 From: winlin Date: Mon, 9 Sep 2024 11:12:28 +0800 Subject: [PATCH] Refine names. --- proxy/api.go | 17 +++++++++++------ proxy/http.go | 13 ++++++++----- proxy/main.go | 32 ++++++++++++++++---------------- proxy/rtc.go | 21 ++++++++++++--------- proxy/rtmp.go | 13 ++++++++----- proxy/srt.go | 15 +++++++++------ 6 files changed, 64 insertions(+), 47 deletions(-) diff --git a/proxy/api.go b/proxy/api.go index b12fddf240..04baa92526 100644 --- a/proxy/api.go +++ b/proxy/api.go @@ -16,26 +16,28 @@ import ( "srs-proxy/logger" ) -type httpAPI struct { +// srsHTTPAPIServer is the proxy for SRS HTTP API, to proxy the WebRTC HTTP API like WHIP and WHEP, +// to proxy other HTTP API of SRS like the streams and clients, etc. +type srsHTTPAPIServer struct { // The underlayer HTTP server. server *http.Server // The WebRTC server. - rtc *rtcServer + rtc *srsWebRTCServer // The gracefully quit timeout, wait server to quit. gracefulQuitTimeout time.Duration // The wait group for all goroutines. wg sync.WaitGroup } -func NewHttpAPI(opts ...func(*httpAPI)) *httpAPI { - v := &httpAPI{} +func NewSRSHTTPAPIServer(opts ...func(*srsHTTPAPIServer)) *srsHTTPAPIServer { + v := &srsHTTPAPIServer{} for _, opt := range opts { opt(v) } return v } -func (v *httpAPI) Close() error { +func (v *srsHTTPAPIServer) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() v.server.Shutdown(ctx) @@ -44,7 +46,7 @@ func (v *httpAPI) Close() error { return nil } -func (v *httpAPI) Run(ctx context.Context) error { +func (v *srsHTTPAPIServer) Run(ctx context.Context) error { // Parse address to listen. addr := envHttpAPI() if !strings.Contains(addr, ":") { @@ -111,6 +113,9 @@ func (v *httpAPI) Run(ctx context.Context) error { return nil } +// systemAPI is the system HTTP API of the proxy server, for SRS media server to register the service +// to proxy server. It also provides some other system APIs like the status of proxy server, like exporter +// for Prometheus metrics. type systemAPI struct { // The underlayer HTTP server. server *http.Server diff --git a/proxy/http.go b/proxy/http.go index 4bfa133b97..f02af02a30 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -19,7 +19,10 @@ import ( "srs-proxy/logger" ) -type httpServer struct { +// srsHTTPStreamServer is the proxy server for SRS HTTP stream server, for HTTP-FLV, HTTP-TS, +// HLS, etc. The proxy server will figure out which SRS origin server to proxy to, then proxy +// the request to the origin server. +type srsHTTPStreamServer struct { // The underlayer HTTP server. server *http.Server // The gracefully quit timeout, wait server to quit. @@ -28,15 +31,15 @@ type httpServer struct { wg stdSync.WaitGroup } -func NewHttpServer(opts ...func(*httpServer)) *httpServer { - v := &httpServer{} +func NewSRSHTTPStreamServer(opts ...func(*srsHTTPStreamServer)) *srsHTTPStreamServer { + v := &srsHTTPStreamServer{} for _, opt := range opts { opt(v) } return v } -func (v *httpServer) Close() error { +func (v *srsHTTPStreamServer) Close() error { ctx, cancel := context.WithTimeout(context.Background(), v.gracefulQuitTimeout) defer cancel() v.server.Shutdown(ctx) @@ -45,7 +48,7 @@ func (v *httpServer) Close() error { return nil } -func (v *httpServer) Run(ctx context.Context) error { +func (v *srsHTTPStreamServer) Run(ctx context.Context) error { // Parse address to listen. addr := envHttpServer() if !strings.Contains(addr, ":") { diff --git a/proxy/main.go b/proxy/main.go index ea87484744..6327a7cf80 100644 --- a/proxy/main.go +++ b/proxy/main.go @@ -68,32 +68,32 @@ func doMain(ctx context.Context) error { } // Start the RTMP server. - rtmpServer := NewRtmpServer() - defer rtmpServer.Close() - if err := rtmpServer.Run(ctx); err != nil { + srsRTMPServer := NewSRSRTMPServer() + defer srsRTMPServer.Close() + if err := srsRTMPServer.Run(ctx); err != nil { return errors.Wrapf(err, "rtmp server") } // Start the WebRTC server. - rtcServer := newRTCServer() - defer rtcServer.Close() - if err := rtcServer.Run(ctx); err != nil { + srsWebRTCServer := NewSRSWebRTCServer() + defer srsWebRTCServer.Close() + if err := srsWebRTCServer.Run(ctx); err != nil { return errors.Wrapf(err, "rtc server") } // Start the HTTP API server. - httpAPI := NewHttpAPI(func(server *httpAPI) { - server.gracefulQuitTimeout, server.rtc = gracefulQuitTimeout, rtcServer + srsHTTPAPIServer := NewSRSHTTPAPIServer(func(server *srsHTTPAPIServer) { + server.gracefulQuitTimeout, server.rtc = gracefulQuitTimeout, srsWebRTCServer }) - defer httpAPI.Close() - if err := httpAPI.Run(ctx); err != nil { + defer srsHTTPAPIServer.Close() + if err := srsHTTPAPIServer.Run(ctx); err != nil { return errors.Wrapf(err, "http api server") } // Start the SRT server. - srtServer := newSRTServer() - defer srtServer.Close() - if err := srtServer.Run(ctx); err != nil { + srsSRTServer := NewSRSSRTServer() + defer srsSRTServer.Close() + if err := srsSRTServer.Run(ctx); err != nil { return errors.Wrapf(err, "srt server") } @@ -107,11 +107,11 @@ func doMain(ctx context.Context) error { } // Start the HTTP web server. - httpServer := NewHttpServer(func(server *httpServer) { + srsHTTPStreamServer := NewSRSHTTPStreamServer(func(server *srsHTTPStreamServer) { server.gracefulQuitTimeout = gracefulQuitTimeout }) - defer httpServer.Close() - if err := httpServer.Run(ctx); err != nil { + defer srsHTTPStreamServer.Close() + if err := srsHTTPStreamServer.Run(ctx); err != nil { return errors.Wrapf(err, "http server") } diff --git a/proxy/rtc.go b/proxy/rtc.go index f63963de30..5a7d9936c7 100644 --- a/proxy/rtc.go +++ b/proxy/rtc.go @@ -19,7 +19,10 @@ import ( "srs-proxy/sync" ) -type rtcServer struct { +// srsWebRTCServer is the proxy for SRS WebRTC server via WHIP or WHEP protocol. It will figure out +// which backend server to proxy to. It will also replace the UDP port to the proxy server's in the +// SDP answer. +type srsWebRTCServer struct { // The UDP listener for WebRTC server. listener *net.UDPConn @@ -35,15 +38,15 @@ type rtcServer struct { wg stdSync.WaitGroup } -func newRTCServer(opts ...func(*rtcServer)) *rtcServer { - v := &rtcServer{} +func NewSRSWebRTCServer(opts ...func(*srsWebRTCServer)) *srsWebRTCServer { + v := &srsWebRTCServer{} for _, opt := range opts { opt(v) } return v } -func (v *rtcServer) Close() error { +func (v *srsWebRTCServer) Close() error { if v.listener != nil { _ = v.listener.Close() } @@ -52,7 +55,7 @@ func (v *rtcServer) Close() error { return nil } -func (v *rtcServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *srsWebRTCServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -89,7 +92,7 @@ func (v *rtcServer) HandleApiForWHIP(ctx context.Context, w http.ResponseWriter, return nil } -func (v *rtcServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { +func (v *srsWebRTCServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, r *http.Request) error { defer r.Body.Close() ctx = logger.WithContext(ctx) @@ -126,7 +129,7 @@ func (v *rtcServer) HandleApiForWHEP(ctx context.Context, w http.ResponseWriter, return nil } -func (v *rtcServer) proxyApiToBackend( +func (v *srsWebRTCServer) proxyApiToBackend( ctx context.Context, w http.ResponseWriter, r *http.Request, backend *SRSServer, remoteSDPOffer string, streamURL string, ) error { @@ -226,7 +229,7 @@ func (v *rtcServer) proxyApiToBackend( return nil } -func (v *rtcServer) Run(ctx context.Context) error { +func (v *srsWebRTCServer) Run(ctx context.Context) error { // Parse address to listen. endpoint := envWebRTCServer() if !strings.Contains(endpoint, ":") { @@ -268,7 +271,7 @@ func (v *rtcServer) Run(ctx context.Context) error { return nil } -func (v *rtcServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { +func (v *srsWebRTCServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { var connection *RTCConnection // If STUN binding request, parse the ufrag and identify the connection. diff --git a/proxy/rtmp.go b/proxy/rtmp.go index bf1c4ebea5..d93f04b3a6 100644 --- a/proxy/rtmp.go +++ b/proxy/rtmp.go @@ -18,7 +18,10 @@ import ( "srs-proxy/rtmp" ) -type rtmpServer struct { +// srsRTMPServer is the proxy for SRS RTMP server, to proxy the RTMP stream to backend SRS +// server. It will figure out the backend server to proxy to. Unlike the edge server, it will +// not cache the stream, but just proxy the stream to backend. +type srsRTMPServer struct { // The TCP listener for RTMP server. listener *net.TCPListener // The random number generator. @@ -27,8 +30,8 @@ type rtmpServer struct { wg sync.WaitGroup } -func NewRtmpServer(opts ...func(*rtmpServer)) *rtmpServer { - v := &rtmpServer{ +func NewSRSRTMPServer(opts ...func(*srsRTMPServer)) *srsRTMPServer { + v := &srsRTMPServer{ rd: rand.New(rand.NewSource(time.Now().UnixNano())), } for _, opt := range opts { @@ -37,7 +40,7 @@ func NewRtmpServer(opts ...func(*rtmpServer)) *rtmpServer { return v } -func (v *rtmpServer) Close() error { +func (v *srsRTMPServer) Close() error { if v.listener != nil { v.listener.Close() } @@ -46,7 +49,7 @@ func (v *rtmpServer) Close() error { return nil } -func (v *rtmpServer) Run(ctx context.Context) error { +func (v *srsRTMPServer) Run(ctx context.Context) error { endpoint := envRtmpServer() if !strings.Contains(endpoint, ":") { endpoint = ":" + endpoint diff --git a/proxy/srt.go b/proxy/srt.go index 758081117b..e4c629af8d 100644 --- a/proxy/srt.go +++ b/proxy/srt.go @@ -18,7 +18,10 @@ import ( "srs-proxy/sync" ) -type srtServer struct { +// srsSRTServer is the proxy for SRS server via SRT. It will figure out which backend server to +// proxy to. It only parses the SRT handshake messages, parses the stream id, and proxy to the +// backend server. +type srsSRTServer struct { // The UDP listener for SRT server. listener *net.UDPConn @@ -31,8 +34,8 @@ type srtServer struct { wg stdSync.WaitGroup } -func newSRTServer(opts ...func(*srtServer)) *srtServer { - v := &srtServer{ +func NewSRSSRTServer(opts ...func(*srsSRTServer)) *srsSRTServer { + v := &srsSRTServer{ start: time.Now(), } @@ -42,7 +45,7 @@ func newSRTServer(opts ...func(*srtServer)) *srtServer { return v } -func (v *srtServer) Close() error { +func (v *srsSRTServer) Close() error { if v.listener != nil { v.listener.Close() } @@ -51,7 +54,7 @@ func (v *srtServer) Close() error { return nil } -func (v *srtServer) Run(ctx context.Context) error { +func (v *srsSRTServer) Run(ctx context.Context) error { // Parse address to listen. endpoint := envSRTServer() if !strings.Contains(endpoint, ":") { @@ -93,7 +96,7 @@ func (v *srtServer) Run(ctx context.Context) error { return nil } -func (v *srtServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { +func (v *srsSRTServer) handleClientUDP(ctx context.Context, addr *net.UDPAddr, data []byte) error { socketID := srtParseSocketID(data) var pkt *SRTHandshakePacket