Skip to content

Commit

Permalink
For ossrs#1500, move all 28181 codes to new files
Browse files Browse the repository at this point in the history
  • Loading branch information
basson099 committed Mar 14, 2020
1 parent e4dee82 commit a3be0c8
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 1,093 deletions.
204 changes: 40 additions & 164 deletions trunk/src/app/srs_app_gb28181.cpp

Large diffs are not rendered by default.

82 changes: 26 additions & 56 deletions trunk/src/app/srs_app_gb28181.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

class SrsStSocket;
class SrsConfDirective;
class SrsRtpPacket;
class Srs2SRtpPacket;
class SrsRequest;
class SrsStSocket;
class SrsRtmpClient;
Expand Down Expand Up @@ -92,26 +92,21 @@ class Srs28181StreamServer
virtual void remove();
};

// A common listener, for RTMP/HTTP server.
// A base listener
class Srs28181Listener
{
protected:
//SrsListenerType type;
protected:
std::string ip;
int port;
//SrsServer* server;
public:
Srs28181Listener();
//Srs28181Listener(SrsServer* svr, SrsListenerType t);
virtual ~Srs28181Listener();
public:
//virtual SrsListenerType listen_type();
virtual srs_error_t listen(std::string i, int p) = 0;
};

// A TCP listener
//class Srs28181TcpStreamListener : public Srs28181Listener, public SrsTcpListener
class Srs28181TcpStreamListener : public Srs28181Listener, public ISrsTcpHandler
{
private:
Expand All @@ -138,8 +133,9 @@ class Srs28181TcpStreamListener : public Srs28181Listener, public ISrsTcpHandler
// problem in multiple threads. For SRS, we only use single thread module,
// like NGINX to get very high performance, with asynchronous and non-blocking
// sockets.

/*
*
* SrsOneCycleCoroutine has these features:
* 1.will exit thread if it returns from cycle function
* 2.no pull function
* 3.can destory itself in cycle
Expand Down Expand Up @@ -187,7 +183,7 @@ class SrsOneCycleCoroutine: public ISrsCoroutineHandler
static void* pfn(void* arg);
};

// Bind a udp port, start thread to recv packet and handler it.
// Bind a udp port, start a thread to recv packet and handler it.
class SrsLiveUdpListener : public ISrsCoroutineHandler
{
private:
Expand All @@ -209,18 +205,16 @@ class SrsLiveUdpListener : public ISrsCoroutineHandler
public:
virtual int fd();
virtual srs_netfd_t stfd();
// set timeout value
//virtual srs_error_t wait(srs_utime_t tm);
public:
uint64_t nb_packet();
public:
virtual srs_error_t listen();
// Interface ISrsReusableThreadHandler.
public:
virtual srs_error_t cycle();
};


// A guard thread
class SrsLifeGuardThread : public SrsOneCycleCoroutine
{
private:
Expand All @@ -235,7 +229,6 @@ class SrsLifeGuardThread : public SrsOneCycleCoroutine
virtual void wait(srs_utime_t tm);
};


// 28181 udp stream linstener
class Srs28181UdpStreamListener : public Srs28181Listener, public ISrsUdpHandler, public ISrsCoroutineHandler
{
Expand All @@ -247,7 +240,6 @@ class Srs28181UdpStreamListener : public Srs28181Listener, public ISrsUdpHandler
uint64_t nb_packet;
bool workdone;
public:
//Srs28181UdpStreamListener(SrsServer* svr, SrsListenerType t, ISrsUdpHandler* c);
Srs28181UdpStreamListener(Srs28181StreamServer * srv, std::string suuid);
virtual ~Srs28181UdpStreamListener();
private:
Expand Down Expand Up @@ -303,20 +295,11 @@ class Srs28181StreamCore
// video stream.
int video_id;
std::string video_codec;
//SrsRtpConn* video_rtp;
// audio stream.
int audio_id;
std::string audio_codec;
int audio_sample_rate;
int audio_channel;
//SrsRtpConn* audio_rtp;
private:
//srs_netfd_t stfd;
//SrsStSocket* skt;
////SrsRtspStack* rtsp;
////SrsRtspCaster* caster;
//Srs28181TcpStreamListener* listener;
//SrsCoroutine* trd;
private:
////SrsRequest* req;
SrsSimpleRtmpClient* sdk;
Expand All @@ -339,45 +322,34 @@ class Srs28181StreamCore

int stream_id;

SrsRtpPacket* cache_;
Srs2SRtpPacket* cache_;

// the timestamp of a rtp group
uint32_t group_timestamp;
// if timestamp boundary flag enabled
// true says using rtp timestamp decode rtp group
// true says using timestamp boundary
bool first_rtp_tsb_enabled_;
// first rtp with new timestamp in a rtp group
SrsRtpPacket * first_rtp_tsb_;
Srs2SRtpPacket * first_rtp_tsb_;

// indicates rtp group boundary decode type: marker or timestamp
int boundary_type_;
public:
//Srs28181StreamCore(Srs28181TcpStreamListener* l, srs_netfd_t fd, std::string o);
Srs28181StreamCore(std::string suuid);
virtual ~Srs28181StreamCore();

// used in tcp but not needed in udp
//public:
//virtual srs_error_t init();
//private:
//virtual srs_error_t do_cycle();

public:
// decode rtp using MB boundary
virtual int decode_packet(char* buf, int nb_buf);
// decode rtp using TSB/MB boundary
virtual int decode_packet_v2(char* buf, int nb_buf);
// internal methods
public:
virtual srs_error_t on_stream_packet(SrsRtpPacket* pkt, int stream_id);
virtual srs_error_t on_stream_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts);
// Interface ISrsOneCycleThreadHandler
//public:
//virtual srs_error_t cycle();
virtual srs_error_t on_stream_packet(Srs2SRtpPacket* pkt, int stream_id);
virtual srs_error_t on_stream_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts);
private:
virtual srs_error_t on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts);
virtual srs_error_t on_rtp_audio(SrsRtpPacket* pkt, int64_t dts);
virtual srs_error_t kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts);
virtual srs_error_t on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts);
virtual srs_error_t on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts);
virtual srs_error_t kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts);
private:
virtual srs_error_t write_sequence_header();
virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts);
Expand All @@ -391,32 +363,29 @@ class Srs28181StreamCore
virtual void close();
};

// TODO: will rewrite blow codes for TCP mode in future
// The 28181 tcp stream connection
class Srs28181TcpStreamConn : public ISrsCoroutineHandler
{
private:
std::string output;
std::string output_template;
std::string target_tcUrl;//rtsp_tcUrl;
std::string stream_name;//rtsp_stream;
std::string target_tcUrl;
std::string stream_name;
SrsPithyPrint* pprint;
private:
std::string session;
// video stream.
int video_id;
std::string video_codec;
//SrsRtpConn* video_rtp;
// audio stream.
int audio_id;
std::string audio_codec;
int audio_sample_rate;
int audio_channel;
//SrsRtpConn* audio_rtp;
private:
srs_netfd_t stfd;
SrsStSocket* skt;
//SrsRtspStack* rtsp;
//SrsRtspCaster* caster;
Srs28181TcpStreamListener* listener;
SrsCoroutine* trd;
private:
Expand All @@ -441,15 +410,15 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler

int stream_id;

SrsRtpPacket* cache_;
Srs2SRtpPacket* cache_;

// the timestamp of a rtp group
uint32_t group_timestamp;
// if timestamp boundary flag enabled
// true says using rtp timestamp decode rtp group
bool first_rtp_tsb_enabled_;
// first rtp with new timestamp in a rtp group
SrsRtpPacket * first_rtp_tsb_;
Srs2SRtpPacket * first_rtp_tsb_;

// indicates rtp group boundary decode type: marker or timestamp
int boundary_type_;
Expand All @@ -468,15 +437,15 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler
virtual int decode_packet_v2(char* buf, int nb_buf);
// internal methods
public:
virtual srs_error_t on_rtp_packet(SrsRtpPacket* pkt, int stream_id);
virtual srs_error_t on_rtp_video_adv(SrsRtpPacket* pkt, int64_t dts, int64_t pts);
virtual srs_error_t on_rtp_packet(Srs2SRtpPacket* pkt, int stream_id);
virtual srs_error_t on_rtp_video_adv(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts);
// Interface ISrsOneCycleThreadHandler
public:
virtual srs_error_t cycle();
private:
virtual srs_error_t on_rtp_video(SrsRtpPacket* pkt, int64_t dts, int64_t pts);
virtual srs_error_t on_rtp_audio(SrsRtpPacket* pkt, int64_t dts);
virtual srs_error_t kickoff_audio_cache(SrsRtpPacket* pkt, int64_t dts);
virtual srs_error_t on_rtp_video(Srs2SRtpPacket* pkt, int64_t dts, int64_t pts);
virtual srs_error_t on_rtp_audio(Srs2SRtpPacket* pkt, int64_t dts);
virtual srs_error_t kickoff_audio_cache(Srs2SRtpPacket* pkt, int64_t dts);
private:
virtual srs_error_t write_sequence_header();
virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts);
Expand All @@ -488,4 +457,5 @@ class Srs28181TcpStreamConn : public ISrsCoroutineHandler
virtual srs_error_t connect();
// Close the connection to RTMP server.
virtual void close();
};
};
#endif
4 changes: 4 additions & 0 deletions trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1301,22 +1301,26 @@ SrsGoApi28181StreamCreation::~SrsGoApi28181StreamCreation()
srs_error_t SrsGoApi28181StreamCreation::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r)
{
srs_error_t err = srs_success;

std::string suuid = "";
int port = 0;
std::stringstream sstream;
SrsJsonObject* content = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, content);

if((err = server->create_28181stream_listener(SrsListener28181UdpStream,port,suuid))!=srs_success)
{
srs_warn("SrsGoApi28181StreamCreation - create listener failed[%d]",srs_error_code(err));
content->set("status",SrsJsonAny::str("failed"));
content->set("desc",SrsJsonAny::str("create listener failed"));
return srs_api_response(w, r, content->dumps());
}

sstream<<port;
content->set("status",SrsJsonAny::str("successful"));
content->set("stream-uuid",SrsJsonAny::str(suuid.c_str()));
content->set("listen-port",SrsJsonAny::str(sstream.str().c_str()));

return srs_api_response(w, r, content->dumps());
}

Expand Down
22 changes: 20 additions & 2 deletions trunk/src/app/srs_app_rtsp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ srs_error_t SrsRtspConn::serve()
return err;
}

std::string SrsRtspConn::remote_ip()
{
// TODO: FIXME: Implement it.
return "";
}

srs_error_t SrsRtspConn::do_cycle()
{
srs_error_t err = srs_success;
Expand Down Expand Up @@ -684,17 +690,29 @@ SrsRtspCaster::SrsRtspCaster(SrsConfDirective* c)
output = _srs_config->get_stream_caster_output(c);
local_port_min = _srs_config->get_stream_caster_rtp_port_min(c);
local_port_max = _srs_config->get_stream_caster_rtp_port_max(c);
manager = new SrsCoroutineManager();
}

SrsRtspCaster::~SrsRtspCaster()
{
std::vector<SrsRtspConn*>::iterator it;
for (it = clients.begin(); it != clients.end(); ++it) {
SrsRtspConn* conn = *it;
srs_freep(conn);
manager->remove(conn);
}
clients.clear();
used_ports.clear();

srs_freep(manager);
}

srs_error_t SrsRtspCaster::initialize()
{
srs_error_t err = srs_success;
if ((err = manager->start()) != srs_success) {
return srs_error_wrap(err, "start manager");
}
return err;
}

srs_error_t SrsRtspCaster::alloc_port(int* pport)
Expand Down Expand Up @@ -747,6 +765,6 @@ void SrsRtspCaster::remove(SrsRtspConn* conn)
}
srs_info("rtsp: remove connection from caster.");

srs_freep(conn);
manager->remove(conn);
}

5 changes: 4 additions & 1 deletion trunk/src/app/srs_app_rtsp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class SrsRtspJitter
};

// The rtsp connection serve the fd.
class SrsRtspConn : public ISrsCoroutineHandler
class SrsRtspConn : public ISrsCoroutineHandler, public ISrsConnection
{
private:
std::string output_template;
Expand Down Expand Up @@ -143,6 +143,7 @@ class SrsRtspConn : public ISrsCoroutineHandler
virtual ~SrsRtspConn();
public:
virtual srs_error_t serve();
virtual std::string remote_ip();
private:
virtual srs_error_t do_cycle();
// internal methods
Expand Down Expand Up @@ -179,6 +180,7 @@ class SrsRtspCaster : public ISrsTcpHandler
std::map<int, bool> used_ports;
private:
std::vector<SrsRtspConn*> clients;
SrsCoroutineManager* manager;
public:
SrsRtspCaster(SrsConfDirective* c);
virtual ~SrsRtspCaster();
Expand All @@ -188,6 +190,7 @@ class SrsRtspCaster : public ISrsTcpHandler
virtual srs_error_t alloc_port(int* pport);
// Free the alloced rtp port.
virtual void free_port(int lpmin, int lpmax);
virtual srs_error_t initialize();
// Interface ISrsTcpHandler
public:
virtual srs_error_t on_tcp_client(srs_netfd_t stfd);
Expand Down
Loading

0 comments on commit a3be0c8

Please sign in to comment.