Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GB28181: Support external SIP server. v6.0.144 #4101

Merged
merged 10 commits into from
Jul 27, 2024
152 changes: 146 additions & 6 deletions trunk/src/app/srs_app_gb28181.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
#include <srs_app_pithy_print.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_protocol_raw_avc.hpp>
#include <srs_app_server.hpp>
#include <srs_protocol_json.hpp>
#include <srs_app_http_api.hpp>
#include <srs_app_statistic.hpp>
duiniuluantanqin marked this conversation as resolved.
Show resolved Hide resolved

#include <sstream>
using namespace std;
Expand Down Expand Up @@ -421,12 +425,12 @@ srs_error_t SrsGbListener::initialize(SrsConfDirective* conf)

bool sip_enabled = _srs_config->get_stream_caster_sip_enable(conf);
if (!sip_enabled) {
return srs_error_new(ERROR_GB_CONFIG, "GB SIP is required");
srs_warn("GB SIP is disabled.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one more thing to review. About the conf directives of sip.enable:

srs/trunk/conf/full.conf

Lines 681 to 683 in ea7e2c2

# Whether enable embedded SIP server.
# Default: on
enabled on;

The code here means: GB internal SIP is disabled, so don't break the GB process, SRS can still support the external SIP.

So, there are a few thing to consider:

  1. improve the log: GB [internal|embedded] SIP is disabled;
  2. Internal SIP is no longer a necessary one, so use srs_trace to replace srs_warn;
  3. Maybe rename sip_enable to internal_sip_enabled;
  4. External sip is the future, so no need to enable or disable it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In essence, the external SIP server is not part of SRS. You can look at my implementation, even use it directly, but it's not mandatory. So, I don't want to modify it for now.

} else {
int port = _srs_config->get_stream_caster_sip_listen(conf);
sip_listener_->set_endpoint(ip, port)->set_label("SIP-TCP");
}

int port = _srs_config->get_stream_caster_sip_listen(conf);
sip_listener_->set_endpoint(ip, port)->set_label("SIP-TCP");

return err;
}

Expand All @@ -442,6 +446,24 @@ srs_error_t SrsGbListener::listen()
return srs_error_wrap(err, "listen");
}

if ((err = listen_api()) != srs_success) {
return srs_error_wrap(err, "listen api");
}

return err;
}

srs_error_t SrsGbListener::listen_api()
{
srs_error_t err = srs_success;

// TODO: FIXME: Fetch api from hybrid manager, not from SRS.
ISrsHttpServeMux* http_api_mux = _srs_hybrid->srs()->instance()->api_server();

if ((err = http_api_mux->handle("/gb/v1/publish/", new SrsGoApiGbPublish(conf_))) != srs_success) {
return srs_error_wrap(err, "handle publish");
}

return err;
}

Expand Down Expand Up @@ -550,11 +572,16 @@ std::string SrsGbSipTcpConn::device_id()
return register_->device_id();
}

void SrsGbSipTcpConn::set_device_id(const std::string &id)
{
register_->from_address_user_ = id;
}

void SrsGbSipTcpConn::set_cid(const SrsContextId& cid)
{
if (owner_cid_) owner_cid_->set_cid(cid);
receiver_->set_cid(cid);
sender_->set_cid(cid);
if (receiver_) receiver_->set_cid(cid);
if (sender_) sender_->set_cid(cid);
cid_ = cid;
}

Expand Down Expand Up @@ -2684,5 +2711,118 @@ void srs_sip_parse_address(const std::string& address, std::string& user, std::s
}
}

SrsGoApiGbPublish::SrsGoApiGbPublish(SrsConfDirective* conf)
{
conf_ = conf->copy();
}

SrsGoApiGbPublish::~SrsGoApiGbPublish()
{
srs_freep(conf_);
}

srs_error_t SrsGoApiGbPublish::serve_http(ISrsHttpResponseWriter *w, ISrsHttpMessage *r)
{
srs_error_t err = srs_success;

SrsJsonObject* res = SrsJsonAny::object();
SrsAutoFree(SrsJsonObject, res);

if ((err = do_serve_http(w, r, res)) != srs_success) {
srs_warn("GB error %s", srs_error_desc(err).c_str());
res->set("code", SrsJsonAny::integer(srs_error_code(err)));
res->set("desc", SrsJsonAny::str(srs_error_code_str(err).c_str()));
srs_freep(err);
return srs_api_response(w, r, res->dumps());
}

return srs_api_response(w, r, res->dumps());
}

srs_error_t SrsGoApiGbPublish::do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res)
{
srs_error_t err = srs_success;

// For each GB session, we use short-term HTTP connection.
w->header()->set("Connection", "Close");

// Parse req, the request json object, from body.
SrsJsonObject* req = NULL;
SrsAutoFree(SrsJsonObject, req);
if (true) {
string req_json;
if ((err = r->body_read_all(req_json)) != srs_success) {
return srs_error_wrap(err, "read body");
}

SrsJsonAny* json = SrsJsonAny::loads(req_json);
if (!json || !json->is_object()) {
return srs_error_new(ERROR_HTTP_DATA_INVALID, "invalid body %s", req_json.c_str());
}

req = json->to_object();
}

// Fetch params from req object.
SrsJsonAny* prop = NULL;
if ((prop = req->ensure_property_string("id")) == NULL) {
return srs_error_wrap(err, "not id");
}
string id = prop->to_str();

if ((prop = req->ensure_property_string("ssrc")) == NULL) {
return srs_error_wrap(err, "not ssrc");
}
uint64_t ssrc = atoi(prop->to_str().c_str());

if ((err = bind_session(id, ssrc)) != srs_success) {
return srs_error_wrap(err, "bind session");
}

res->set("code", SrsJsonAny::integer(ERROR_SUCCESS));
int port = _srs_config->get_stream_caster_listen(conf_);
res->set("port", SrsJsonAny::integer(port));
res->set("is_tcp", SrsJsonAny::boolean(true)); // only tcp supported

srs_trace("GB publish id: %s, ssrc=%lu", id.c_str(), ssrc);

return err;
}

srs_error_t SrsGoApiGbPublish::bind_session(std::string id, uint64_t ssrc)
duiniuluantanqin marked this conversation as resolved.
Show resolved Hide resolved
{
srs_error_t err = srs_success;

SrsSharedResource<SrsGbSession>* session = NULL;
session = dynamic_cast<SrsSharedResource<SrsGbSession>*>(_srs_gb_manager->find_by_id(id));
if (session) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "stream already exists");
}

session = dynamic_cast<SrsSharedResource<SrsGbSession>*>(_srs_gb_manager->find_by_fast_id(ssrc));
if (session) {
return srs_error_new(ERROR_SYSTEM_STREAM_BUSY, "ssrc already exists");
}

// Create new GB session.
SrsGbSession* raw_session = new SrsGbSession();
raw_session->setup(conf_);

session = new SrsSharedResource<SrsGbSession>(raw_session);
_srs_gb_manager->add_with_id(id, session);
_srs_gb_manager->add_with_fast_id(ssrc, session);

SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_gb_manager, session, raw_session, raw_session);
raw_session->setup_owner(session, executor, executor);
raw_session->sip_transport()->set_device_id(id);

if ((err = executor->start()) != srs_success) {
srs_freep(executor);
return srs_error_wrap(err, "gb session");
}

return err;
}

SrsResourceManager* _srs_gb_manager = NULL;

34 changes: 34 additions & 0 deletions trunk/src/app/srs_app_gb28181.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ enum SrsGbSipState
};
std::string srs_gb_sip_state(SrsGbSipState state);

// For external SIP server mode, where SRS acts only as a media relay server
// 1. SIP server POST request via HTTP API with stream ID and SSRC
// 2. SRS create session using ID and SSRC, return a port for receiving media streams (indicated in conf).
// 3. External streaming service connect to the port, and send RTP stream (with the above SSRC)
// 4. SRS forward the stream to RTMP stream, named after ID
//
// Request:
// POST /gb/v1/publish/
// {
// "id": "...",
// "ssrc": "..."
// }
// Response:
// {"port":9000, "is_tcp": true}
class SrsGoApiGbPublish : public ISrsHttpHandler
{
private:
SrsConfDirective* conf_;
public:
SrsGoApiGbPublish(SrsConfDirective* conf);
virtual ~SrsGoApiGbPublish();
public:
virtual srs_error_t serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r);
private:
virtual srs_error_t do_serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage* r, SrsJsonObject* res);
srs_error_t bind_session(std::string stream, uint64_t ssrc);
};

// The main logic object for GB, the session.
// Each session contains a SIP object and a media object, that are managed by session. This means session always
// lives longer than SIP and media, and session will dispose SIP and media when session disposed. In another word,
Expand Down Expand Up @@ -191,6 +219,8 @@ class SrsGbListener : public ISrsListener, public ISrsTcpHandler
// Interface ISrsTcpHandler
public:
virtual srs_error_t on_tcp_client(ISrsListener* listener, srs_netfd_t stfd);
private:
srs_error_t listen_api();
};

// A GB28181 TCP SIP connection.
Expand Down Expand Up @@ -234,6 +264,10 @@ class SrsGbSipTcpConn : public ISrsResource, public ISrsCoroutineHandler, public
public:
// Get the SIP device id.
std::string device_id();
// For use with external SIP signaling server ONLY
// When using an external SIP signaling server, device id are not available, so manual configuration is required
// This id will be used as the stream name in the RTMP protocol
void set_device_id(const std::string& id);
// Set the cid of all coroutines.
virtual void set_cid(const SrsContextId& cid);
private:
Expand Down
Loading