From cbe9c1dabcb9c35e27b7bd4f34ad056c6a908c4e Mon Sep 17 00:00:00 2001 From: xialixin Date: Fri, 20 Mar 2020 00:01:48 +0800 Subject: [PATCH 1/2] Support gb28181 sip over udp, ps steam over rtp --- trunk/conf/push.gb28181.conf | 71 ++ trunk/configure | 4 +- trunk/src/app/srs_app_config.cpp | 174 +++- trunk/src/app/srs_app_config.hpp | 12 + trunk/src/app/srs_app_gb28181.cpp | 1438 ++++++++++++++++++++++++++ trunk/src/app/srs_app_gb28181.hpp | 330 ++++++ trunk/src/app/srs_app_listener.cpp | 10 +- trunk/src/app/srs_app_listener.hpp | 2 + trunk/src/app/srs_app_server.cpp | 23 +- trunk/src/app/srs_app_server.hpp | 10 + trunk/src/protocol/srs_sip_stack.cpp | 553 ++++++++++ trunk/src/protocol/srs_sip_stack.hpp | 143 +++ trunk/src/service/srs_service_st.cpp | 5 + trunk/src/service/srs_service_st.hpp | 1 + 14 files changed, 2771 insertions(+), 5 deletions(-) create mode 100644 trunk/conf/push.gb28181.conf create mode 100644 trunk/src/app/srs_app_gb28181.cpp create mode 100644 trunk/src/app/srs_app_gb28181.hpp create mode 100644 trunk/src/protocol/srs_sip_stack.cpp create mode 100644 trunk/src/protocol/srs_sip_stack.hpp diff --git a/trunk/conf/push.gb28181.conf b/trunk/conf/push.gb28181.conf new file mode 100644 index 0000000000..b76887d987 --- /dev/null +++ b/trunk/conf/push.gb28181.conf @@ -0,0 +1,71 @@ +# push gb28281 stream to SRS. + +listen 1935; +max_connections 1000; +daemon off; +pid ./objs/srs28181.pid +srs_log_tank console; +stream_caster { + enabled on; + caster gb28181; + + #rtmp输出地址,可以参数化 + #[stream] 代表客户端sip设备编号 + #[timestamp] 时间戳 + output rtmp://127.0.0.1/live/[stream]; + #sip监听udp端口 + listen 15060; + + #服务器主机号,可以域名或ip地址 + #也就是设备端将媒体发送的地址,如果是服务器是内外网 + #需要写外网地址 + host 192.168.1.27; + + #服务器端编号 + #设备端配置编号需要与该值一致,否则无法注册 + serial 34020000002020000001; + + #服务器端域 + realm 3402000000; + + #是否转发音频流 + #目前只支持aac格式,所以需要设备支持aac格式 + #on:转发音频 + #off:不转发音频,只有视频 + #*注意*!!!:flv 只支持11025 22050 44100 三种 + #如果设备端没有三种中任何一个,转发时为自动选择一种格式 + #同时也会将adts的头封装在flv aac raw数据中 + #这样的话播放器为自动通过adts头自动选择采样频率 + #像ffplay, vlc都可以,但是flash是没有声音, + #因为flash,只支持11025 22050 44100 + audio_enable on; + + #服务端发送ack后,接收回应的超时时间,单位为秒 + #如果指定时间没有回应,认为失败 + ack_timeout 30; + + #设备心跳维持时间,如果指定时间内(秒)没有接收一个心跳 + #认为设备离线 + keepalive_timeout 30; + + #是否等待关键帧之后,再转发, + #off:不需等待,直接转发 + #on:等第一个关键帧后,再转发 + wait_keyframe off; + + #日志打印是否打印sip信息 + #off:不打印 + #on:打印接收或发送sip命令信息 + print_sip_message off; + + #rtp包空闲等待时间,如果指定时间没有收到任何包 + #rtp监听连接自动停止,发送BYE命令 + rtp_idle_timeout 30; + + #rtp接收监听端口范围,最小值 + rtp_port_min 58200; + #rtp接收监听端口范围,最大值 + rtp_port_max 58300; +} +vhost __defaultVhost__ { +} diff --git a/trunk/configure b/trunk/configure index d233f66dbb..0b50507e7f 100755 --- a/trunk/configure +++ b/trunk/configure @@ -213,7 +213,7 @@ MODULE_DEPENDS=("CORE" "KERNEL") ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot}) MODULE_FILES=("srs_protocol_amf0" "srs_protocol_io" "srs_rtmp_stack" "srs_rtmp_handshake" "srs_protocol_utility" "srs_rtmp_msg_array" "srs_protocol_stream" - "srs_raw_avc" "srs_rtsp_stack" "srs_http_stack" "srs_protocol_kbps" "srs_protocol_json" + "srs_raw_avc" "srs_rtsp_stack" "srs_sip_stack" "srs_http_stack" "srs_protocol_kbps" "srs_protocol_json" "srs_protocol_format") PROTOCOL_INCS="src/protocol"; MODULE_DIR=${PROTOCOL_INCS} . auto/modules.sh PROTOCOL_OBJS="${MODULE_OBJS[@]}" @@ -257,7 +257,7 @@ if [ $SRS_EXPORT_LIBRTMP_PROJECT = NO ]; then "srs_app_mpegts_udp" "srs_app_rtsp" "srs_app_listener" "srs_app_async_call" "srs_app_caster_flv" "srs_app_process" "srs_app_ng_exec" "srs_app_hourglass" "srs_app_dash" "srs_app_fragment" "srs_app_dvr" - "srs_app_coworkers" "srs_app_hybrid") + "srs_app_coworkers" "srs_app_hybrid" "srs_app_gb28181") DEFINES="" # add each modules for app for SRS_MODULE in ${SRS_MODULES[*]}; do diff --git a/trunk/src/app/srs_app_config.cpp b/trunk/src/app/srs_app_config.cpp index 7a9f86a0ac..bb4fc793a6 100644 --- a/trunk/src/app/srs_app_config.cpp +++ b/trunk/src/app/srs_app_config.cpp @@ -275,6 +275,11 @@ bool srs_stream_caster_is_flv(string caster) return caster == "flv"; } +bool srs_stream_caster_is_gb28181(string caster) +{ + return caster == "gb28181"; +} + bool srs_config_apply_filter(SrsConfDirective* dvr_apply, SrsRequest* req) { static bool DEFAULT = true; @@ -2137,7 +2142,26 @@ srs_error_t SrsConfig::global_to_json(SrsJsonObject* obj) sobj->set(sdir->name, sdir->dumps_arg0_to_integer()); } else if (sdir->name == "rtp_port_max") { sobj->set(sdir->name, sdir->dumps_arg0_to_integer()); + } else if (sdir->name == "rtp_idle_timeout") { + sobj->set(sdir->name, sdir->dumps_arg0_to_integer()); + } else if (sdir->name == "ack_timeout") { + sobj->set(sdir->name, sdir->dumps_arg0_to_integer()); + } else if (sdir->name == "keepalive_timeout") { + sobj->set(sdir->name, sdir->dumps_arg0_to_integer()); + } else if (sdir->name == "audio_enable") { + sobj->set(sdir->name, sdir->dumps_arg0_to_boolean()); + } else if (sdir->name == "host") { + sobj->set(sdir->name, sdir->dumps_arg0_to_str()); + } else if (sdir->name == "serial") { + sobj->set(sdir->name, sdir->dumps_arg0_to_str()); + } else if (sdir->name == "realm") { + sobj->set(sdir->name, sdir->dumps_arg0_to_str()); + } else if (sdir->name == "wait_keyframe") { + sobj->set(sdir->name, sdir->dumps_arg0_to_str()); + } else if (sdir->name == "print_sip_message") { + sobj->set(sdir->name, sdir->dumps_arg0_to_str()); } + } obj->set(dir->name, sobj); } else { @@ -3647,7 +3671,11 @@ srs_error_t SrsConfig::check_normal_config() SrsConfDirective* conf = stream_caster->at(i); string n = conf->name; if (n != "enabled" && n != "caster" && n != "output" - && n != "listen" && n != "rtp_port_min" && n != "rtp_port_max") { + && n != "listen" && n != "rtp_port_min" && n != "rtp_port_max" + && n != "rtp_idle_timeout" && n != "ack_timeout" && n != "keepalive_timeout" + && n != "host" && n != "serial" && n != "realm" + && n != "audio_enable" && n != "wait_keyframe" + && n != "print_sip_message") { return srs_error_new(ERROR_SYSTEM_CONFIG_INVALID, "illegal stream_caster.%s", n.c_str()); } } @@ -4179,6 +4207,150 @@ int SrsConfig::get_stream_caster_rtp_port_max(SrsConfDirective* conf) return ::atoi(conf->arg0().c_str()); } +int SrsConfig::get_stream_caster_gb28181_rtp_ide_timeout(SrsConfDirective* conf) +{ + static int DEFAULT = 30; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("rtp_ide_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf) +{ + static int DEFAULT = 30; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("ack_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +int SrsConfig::get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* conf) +{ + static int DEFAULT = 30; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("keepalive_timeout"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return ::atoi(conf->arg0().c_str()); +} + +string SrsConfig::get_stream_caster_gb28181_host(SrsConfDirective* conf) +{ + static string DEFAULT = "127.0.0.1"; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("host"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_stream_caster_gb28181_serial(SrsConfDirective* conf) +{ + static string DEFAULT = ""; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("serial"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return conf->arg0(); +} + +string SrsConfig::get_stream_caster_gb28181_realm(SrsConfDirective* conf) +{ + static string DEFAULT = ""; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("realm"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return conf->arg0(); +} + +bool SrsConfig::get_stream_caster_gb28181_audio_enable(SrsConfDirective* conf) +{ + static bool DEFAULT = true; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("audio_enable"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +bool SrsConfig::get_stream_caster_gb28181_print_sip_message(SrsConfDirective* conf) +{ + static bool DEFAULT = false; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("print_sip_message"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + +bool SrsConfig::get_stream_caster_gb28181_wait_keyframe(SrsConfDirective* conf) +{ + static bool DEFAULT = false; + + if (!conf) { + return DEFAULT; + } + + conf = conf->get("wait_keyframe"); + if (!conf || conf->arg0().empty()) { + return DEFAULT; + } + + return SRS_CONF_PERFER_FALSE(conf->arg0()); +} + SrsConfDirective* SrsConfig::get_vhost(string vhost, bool try_default_vhost) { srs_assert(root); diff --git a/trunk/src/app/srs_app_config.hpp b/trunk/src/app/srs_app_config.hpp index 90a57e0c30..ffae3baff5 100644 --- a/trunk/src/app/srs_app_config.hpp +++ b/trunk/src/app/srs_app_config.hpp @@ -119,6 +119,7 @@ extern bool srs_config_dvr_is_plan_session(std::string plan); extern bool srs_stream_caster_is_udp(std::string caster); extern bool srs_stream_caster_is_rtsp(std::string caster); extern bool srs_stream_caster_is_flv(std::string caster); +extern bool srs_stream_caster_is_gb28181(std::string caster); // Whether the dvr_apply active the stream specified by req. extern bool srs_config_apply_filter(SrsConfDirective* dvr_apply, SrsRequest* req); @@ -484,6 +485,17 @@ class SrsConfig virtual int get_stream_caster_rtp_port_min(SrsConfDirective* conf); // Get the max udp port for rtp of stream caster rtsp. virtual int get_stream_caster_rtp_port_max(SrsConfDirective* conf); + + virtual int get_stream_caster_gb28181_rtp_ide_timeout(SrsConfDirective* conf); + virtual int get_stream_caster_gb28181_ack_timeout(SrsConfDirective* conf); + virtual int get_stream_caster_gb28181_keepalive_timeout(SrsConfDirective* conf); + virtual bool get_stream_caster_gb28181_audio_enable(SrsConfDirective* conf); + virtual std::string get_stream_caster_gb28181_host(SrsConfDirective* conf); + virtual std::string get_stream_caster_gb28181_serial(SrsConfDirective* conf); + virtual std::string get_stream_caster_gb28181_realm(SrsConfDirective* conf); + virtual bool get_stream_caster_gb28181_print_sip_message(SrsConfDirective* conf); + virtual bool get_stream_caster_gb28181_wait_keyframe(SrsConfDirective* conf); + // vhost specified section public: // Get the vhost directive by vhost name. diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp new file mode 100644 index 0000000000..b4d647ce1c --- /dev/null +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -0,0 +1,1438 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#include +#include +#include +#include +#include + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +//#define W_PS_FILE +//#define W_VIDEO_FILE +//#define W_AUDIO_FILE + +SrsPsRtpPacket::SrsPsRtpPacket() +{ +} + +SrsPsRtpPacket::~SrsPsRtpPacket() +{ +} + +srs_error_t SrsPsRtpPacket::decode(SrsBuffer* stream) +{ + srs_error_t err = srs_success; + + // 12bytes header + if (!stream->require(12)) { + return srs_error_new(ERROR_RTP_HEADER_CORRUPT, "requires 12 only %d bytes", stream->left()); + } + + int8_t vv = stream->read_1bytes(); + version = (vv >> 6) & 0x03; + padding = (vv >> 5) & 0x01; + extension = (vv >> 4) & 0x01; + csrc_count = vv & 0x0f; + + int8_t mv = stream->read_1bytes(); + marker = (mv >> 7) & 0x01; + payload_type = mv & 0x7f; + + sequence_number = stream->read_2bytes(); + timestamp = stream->read_4bytes(); + ssrc = stream->read_4bytes(); + + // TODO: FIXME: check sequence number. + + // video codec. + if (payload_type == 96) { + // ps stream atleast 4bytes content. + if (!stream->require(4)) { + return srs_error_new(ERROR_RTP_TYPE96_CORRUPT, "requires 4 only %d bytes", stream->left()); + } + + // append left bytes to payload. + payload->append(stream->data() + stream->pos() , stream->size()-stream->pos()); + + } + return err; +} + + +SrsPsRtpConn::SrsPsRtpConn(SrsGb28181Conn* conn, int p, std::string sid, bool b, bool k) +{ + gb28181 = conn; + _port = p; + session_id = sid; + // TODO: support listen at <[ip:]port> + listener = new SrsUdpListener(this, srs_any_address_for_listener(), p); + cache = new SrsPsRtpPacket(); + pprint = SrsPithyPrint::create_caster(); + pre_timestamp = -1; + + audio_enable = b; + first_keyframe_flag = false; + wait_first_keyframe = k; +} + +SrsPsRtpConn::~SrsPsRtpConn() +{ + ps_fw.close(); + video_fw.close(); + audio_fw.close(); + + dispose(); + + srs_freep(listener); + srs_freep(cache); + srs_freep(pprint); +} + +int SrsPsRtpConn::port() +{ + return _port; +} + +srs_error_t SrsPsRtpConn::listen() +{ + return listener->listen(); +} + +void SrsPsRtpConn::dispose() +{ + map::iterator it; + for (it = cache_payload.begin(); it != cache_payload.end(); ++it) { + srs_freep(it->second); + } + + cache_payload.clear(); + return; +} + +int64_t SrsPsRtpConn::parse_ps_timestamp(const uint8_t* p) +{ + unsigned long b; + //total 33 bits + unsigned long val, val2, val3; + + //1st byte, 5、6、7 bit + b = *p++; + val = (b & 0x0e); + + //2 byte, all bit + b = (*(p++)) << 8; + //3 bytes 1--7 bit + b += *(p++); + val2 = (b & 0xfffe) >> 1; + + //4 byte, all bit + b = (*(p++)) << 8; + //5 byte 1--7 bit + b += *(p++); + val3 = (b & 0xfffe) >> 1; + + //<32--val--30> <29----val2----15> <14----val3----0> + val = (val << 29) | (val2 << 15) | val3; + return val; +} + +srs_error_t SrsPsRtpConn::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + srs_error_t err = srs_success; + bool completed = false; + int keyframe = 0; + + pprint->elapse(); + + if (true) { + SrsBuffer stream(buf, nb_buf); + + SrsPsRtpPacket pkt; + + if ((err = pkt.decode(&stream)) != srs_success) { + srs_trace("decode error"); + return srs_success; + //return srs_error_wrap(err, "decode"); + } + + if (pre_timestamp == -1) { + pre_timestamp = pkt.timestamp; + } + + //cache pkt payload by timestamp + if (cache_payload.find(pkt.timestamp) == cache_payload.end()) { + cache_payload[pkt.timestamp] = new SrsSimpleStream(); + } + + cache_payload[pkt.timestamp]->append(pkt.payload); + + uint32_t cur_timestamp = pkt.timestamp; + + + if (pkt.marker) { + completed = true; + }else if (pre_timestamp != pkt.timestamp){ + if (cache_payload.find(pre_timestamp) != cache_payload.end()) { + completed = true; + cur_timestamp = pre_timestamp; + } + } + + pre_timestamp = pkt.timestamp; + + if (pprint->can_print()) { + srs_trace("<- " SRS_CONSTS_LOG_STREAM_CASTER " gb28181: client_id %s, ps rtp packet %dB, age=%d, vt=%d/%u, sts=%u/%u/%#x, paylod=%dB", + session_id.c_str(), nb_buf, pprint->age(), pkt.version, pkt.payload_type, pkt.sequence_number, pkt.timestamp, pkt.ssrc, + pkt.payload->length() + ); + } + + if (!completed){ + return err; + } + + //process completed frame data + char *payload = cache_payload[cur_timestamp]->bytes(); + int payload_len = cache_payload[cur_timestamp]->length(); + + on_ps_stream(payload, payload_len, cur_timestamp); + + //clear processed one ps frame + map::iterator key = cache_payload.find(cur_timestamp); + if(key!=cache_payload.end()) + { + srs_freep(key->second); + cache_payload.erase(key); + } + } + + return err; +} + +bool SrsPsRtpConn::can_send_ps_av_packet(){ + if (!wait_first_keyframe) + return true; + + if (first_keyframe_flag) + return true; + + return false; +} + + +srs_error_t SrsPsRtpConn::on_ps_stream(char* ps_data, int ps_size, uint32_t timestamp) +{ + srs_error_t err = srs_success; + + int complete_len = 0; + int incomplete_len = ps_size; + char *next_ps_pack = ps_data; + + SrsSimpleStream video_stream; + SrsSimpleStream audio_stream; + uint64_t audio_pts = 0; + uint64_t video_pts = 0; + int keyframe = 0; + int pse_index = 0; + +#ifdef W_PS_FILE + if (!ps_fw.is_open()) { + std::string filename = "test_ps_" + session_id + ".mpg"; + ps_fw.open(filename.c_str()); + } + ps_fw.write(ps_data, ps_size, NULL); +#endif + + while(incomplete_len >= sizeof(SrsPsPacketStartCode)) + { + if (next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x01 + && next_ps_pack[3] == (char)0xBA) + { + //ps header + SrsPsPacketHeader *head = (SrsPsPacketHeader *)next_ps_pack; + unsigned char pack_stuffing_length = head->stuffing_length & 0x07; + + next_ps_pack = next_ps_pack + sizeof(SrsPsPacketHeader) + pack_stuffing_length; + complete_len = complete_len + sizeof(SrsPsPacketHeader) + pack_stuffing_length; + incomplete_len = ps_size - complete_len; + } + else if(next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x01 + && next_ps_pack[3] == (char)0xBB) + { + //ps system header + SrsPsPacketBBHeader *bbhead=(SrsPsPacketBBHeader *)(next_ps_pack); + int bbheaderlen = htons(bbhead->length); + next_ps_pack = next_ps_pack + sizeof(SrsPsPacketBBHeader) + bbheaderlen; + complete_len = complete_len + sizeof(SrsPsPacketBBHeader) + bbheaderlen; + incomplete_len = ps_size - complete_len; + + first_keyframe_flag = true; + } + else if(next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x01 + && next_ps_pack[3] == (char)0xBC) + { + //program stream map + + SrsPsMapPacket* psmap_pack = (SrsPsMapPacket*)next_ps_pack; + + psmap_pack->length = htons(psmap_pack->length); + + next_ps_pack = next_ps_pack + psmap_pack->length + sizeof(SrsPsMapPacket); + complete_len = complete_len + psmap_pack->length + sizeof(SrsPsMapPacket); + incomplete_len = ps_size - complete_len; + + } + else if(next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x01 + && next_ps_pack[3] == (char)0xE0) + { + //pse + + SrsPsePacket* pse_pack = (SrsPsePacket*)next_ps_pack; + + unsigned char pts_dts_flags = (pse_pack->info[0] & 0xF0) >> 6; + int64_t pts = 0; + if (pse_index == 0 && pts_dts_flags > 0) { + pts = parse_ps_timestamp((unsigned char*)next_ps_pack + 9); + //srs_trace("vvvvvvvvvvvvvvvvvvvvvvv ts=%u pkt_ts=%u", pts, timestamp); + } + + if (pse_index == 0) video_pts = pts; + + pse_index +=1; + + int packlength = htons(pse_pack->length); + int payloadlen = packlength - 2 - 1 - pse_pack->stuffing_length; + + next_ps_pack = next_ps_pack + 9 + pse_pack->stuffing_length; + complete_len = complete_len + 9 + pse_pack->stuffing_length; + + video_stream.append(next_ps_pack, payloadlen); + +#ifdef W_VIDEO_FILE + if (!video_fw.is_open()) { + std::string filename = "test_video_" + session_id + ".h264"; + video_fw.open(filename.c_str()); + } + video_fw.write(next_ps_pack, payloadlen, NULL); +#endif + + next_ps_pack = next_ps_pack + payloadlen; + complete_len = complete_len + payloadlen; + incomplete_len = ps_size - complete_len; + + + //srs_trace("====================== V pts=%u", pts); + } + else if (next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x01 + && next_ps_pack[3] == (char)0xBD) + { + //private stream + + SrsPsePacket* pse_pack = (SrsPsePacket*)next_ps_pack; + + int packlength = htons(pse_pack->length); + int payload_len = packlength - 2 - 1 - pse_pack->stuffing_length; + + next_ps_pack = next_ps_pack + payload_len + 9 + pse_pack->stuffing_length; + complete_len = complete_len + (payload_len + 9 + pse_pack->stuffing_length); + incomplete_len = ps_size - complete_len; + } + else if (next_ps_pack + && next_ps_pack[0] == (char)0x00 + && next_ps_pack[1] == (char)0x00 + && next_ps_pack[2] == (char)0x01 + && next_ps_pack[3] == (char)0xC0) + { + //audio stream + + SrsPsePacket* pse_pack = (SrsPsePacket*)next_ps_pack; + + unsigned char pts_dts_flags = (pse_pack->info[0] & 0xF0) >> 6; + if (pts_dts_flags > 0 ) { + audio_pts = parse_ps_timestamp((unsigned char*)next_ps_pack + 9); + //srs_trace("aaaaaaaaaaaaaaaaaaaaaaaaaa ts=%u pkt_ts=%u", audio_pts, timestamp); + } + + int packlength = htons(pse_pack->length); + int payload_len = packlength - 2 - 1 - pse_pack->stuffing_length; + next_ps_pack = next_ps_pack + 9 + pse_pack->stuffing_length; + + audio_stream.append(next_ps_pack, payload_len); + +#ifdef W_AUDIO_FILE + if (!audio_fw.is_open()) { + std::string filename = "test_audio_" + session_id + ".aac"; + audio_fw.open(filename.c_str()); + } + audio_fw.write(next_ps_pack, payload_len, NULL); +#endif + + next_ps_pack = next_ps_pack + payload_len; + complete_len = complete_len + (payload_len + 9 + pse_pack->stuffing_length); + incomplete_len = ps_size - complete_len; + + if (audio_enable && audio_stream.length() && can_send_ps_av_packet()) { + if ((err = gb28181->on_rtp_audio(&audio_stream, audio_pts)) != srs_success) { + srs_trace("process ps audio packet error %s", err); + //return srs_success; + //return srs_error_wrap(err, "process rtp packet"); + } + } + } + else + { + srs_trace("gb28181: client_id %s, unkonw ps data %02x %02x %02x %02x\n", + session_id.c_str(), next_ps_pack[0], next_ps_pack[1], next_ps_pack[2], next_ps_pack[3]); + //srs_trace(" ps ps_size=%d complete=%d h264len=%d\n", ps_size, complete_len, *h264length); + break; + } + } + + if (complete_len != ps_size){ + srs_trace("gb28181: client_id %s decode ps packet error! ps_size=%d complete=%d \n", + session_id.c_str(), ps_size, complete_len); + }else if (video_stream.length() && can_send_ps_av_packet()) { + if ((err = gb28181->on_rtp_video(&video_stream, video_pts, keyframe)) != srs_success) { + srs_trace("process ps video packet error"); + //return srs_success; + return srs_error_wrap(err, "process ps video packet error"); + } + } + + return err; +} + +SrsGb28281ClientInfo::SrsGb28281ClientInfo() +{ + req = new SrsSipRequest(); +} + +SrsGb28281ClientInfo::~SrsGb28281ClientInfo() +{ + srs_freep(req); +} + + +SrsGb28181Conn::SrsGb28181Conn(SrsGb28181Caster* c, std::string id) +{ + session_id = id; + + video_rtp = NULL; + audio_rtp = NULL; + + req = NULL; + caster = c; + info = new SrsGb28281ClientInfo; + pprint = SrsPithyPrint::create_caster(); + + skt = new SrsStSocket(); + sip = new SrsSipStack(); + trd = new SrsSTCoroutine("gb28181conn", this); + + sdk = NULL; + vjitter = new SrsRtspJitter(); + ajitter = new SrsRtspJitter(); + + avc = new SrsRawH264Stream(); + aac = new SrsRawAacStream(); + + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = false; + + reg_expires = 3600; + register_time = 0; + alive_time = 0; + invite_time = 0; + register_status = Srs28181Unkonw; + alive_status = Srs28181Unkonw; + invite_status = Srs28181Unkonw; +} + +SrsGb28181Conn::~SrsGb28181Conn() +{ + close(); + + srs_freep(info); + + srs_freep(video_rtp); + srs_freep(audio_rtp); + + srs_freep(trd); + srs_freep(skt); + srs_freep(sip); + + srs_freep(sdk); + srs_freep(req); + + srs_freep(vjitter); + srs_freep(ajitter); + + srs_freep(pprint); +} + +srs_error_t SrsGb28181Conn::serve() +{ + srs_error_t err = srs_success; + + if ((err = trd->start()) != srs_success) { + return srs_error_wrap(err, "gb28181 connection"); + } + + return err; +} + +std::string SrsGb28181Conn::remote_ip() +{ + return ""; +} + +void SrsGb28181Conn::set_request_info(SrsSipRequest *req) +{ + srs_assert(req != NULL); + info->req->copy(req); +} + +std::string SrsGb28181Conn::get_session_id() +{ + return session_id; +} + +srs_error_t SrsGb28181Conn::do_cycle() +{ + srs_error_t err = srs_success; + + // consume all sip messages. + while (true) { + + pprint->elapse(); + + if ((err = trd->pull()) != srs_success) { + //srs_trace("pull faild %s")); + //return srs_error_wrap(err, "sip cycle"); + } + + srs_utime_t now = srs_get_system_time(); + SrsGb28181Config config = caster->GetGb28181Config(); + + srs_utime_t reg_duration = 0; + srs_utime_t invite_duration = 0; + srs_utime_t alive_duration = 0; + srs_utime_t recv_rtp_duration = 0; + + if (register_status == Srs28181RegisterOk && register_time > 0) { + reg_duration = (now - register_time) / (1000*1000); + if (reg_duration > reg_expires) { + register_status = Srs28181Unkonw; + alive_status = Srs28181Unkonw; + invite_status = Srs28181Unkonw; + stop_rtp_listen(); + } + } + + if (alive_status == Srs28181AliveOk && alive_time > 0){ + alive_duration = (now - alive_time) / (1000*1000); + if (alive_duration > config.sip_keepalive_timeout) { + srs_trace("gb28181: client id=%s alive timeout, remove conn", session_id.c_str()); + break; + } + } + + if (invite_status && invite_time > 0) { + invite_duration = (now - invite_time) / (1000*1000); + if (invite_status == Srs28181Trying && invite_duration > config.sip_ack_timeout) { + invite_status = Srs28181Unkonw; + stop_rtp_listen(); + } + + recv_rtp_duration = (now - recv_rtp_time) / (1000*1000); + if (recv_rtp_duration > config.rtp_idle_timeout) { + invite_status = Srs28181Unkonw; + stop_rtp_listen(); + } + } + + if (pprint->can_print()) { + srs_trace("gb28181: client id=%s, status, druation reg=%u alive=%u invite=%u", + session_id.c_str(), reg_duration, alive_duration, invite_duration); + } + + srs_usleep(1000 * 1000); + } + + return err; +} + +srs_error_t SrsGb28181Conn::start_rtp_listen(int port) +{ + srs_error_t err = srs_success; + + SrsPsRtpConn* rtp = NULL; + srs_freep(video_rtp); + SrsGb28181Config config = caster->GetGb28181Config(); + rtp = video_rtp = new SrsPsRtpConn(this, port, session_id, + config.audio_enable, config.wait_keyframe); + + if ((err = rtp->listen()) != srs_success) { + return srs_error_wrap(err, "rtp listen"); + } + srs_trace("gb28181conn: start rtp ps stream over server-port=%d", port); + + return err; + +} + +srs_error_t SrsGb28181Conn::stop_rtp_listen() +{ + srs_error_t err = srs_success; + + if (video_rtp) { + caster->free_port(video_rtp->port(), video_rtp->port() + 1); + srs_freep(video_rtp); + } + + if (audio_rtp) { + caster->free_port(audio_rtp->port(), audio_rtp->port() + 1); + srs_freep(audio_rtp); + } + + //stop rtmp publish + close(); + + return err; +} + + +srs_error_t SrsGb28181Conn::cycle() +{ + // serve the sip client. + srs_error_t err = do_cycle(); + + stop_rtp_listen(); + + caster->remove(this); + srs_trace("gb28181conn: client id=%d conn is remove", session_id.c_str()); + + if (err == srs_success) { + srs_trace("client finished."); + } else if (srs_is_client_gracefully_close(err)) { + srs_warn("client disconnect peer. code=%d", srs_error_code(err)); + srs_freep(err); + } + + return err; +} + +srs_error_t SrsGb28181Conn::on_rtp_video(SrsSimpleStream *stream, int64_t fpts, int keyframe) +{ + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + if ((err = vjitter->correct(fpts)) != srs_success) { + return srs_error_wrap(err, "jitter"); + } + + + // ts tbn to flv tbn. + uint32_t dts = (uint32_t)(fpts / 90); + uint32_t pts = (uint32_t)(fpts / 90); + + recv_rtp_time = srs_get_system_time(); + + //srs_trace("==========================================VVV pts=%u", dts); + SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length()); + SrsAutoFree(SrsBuffer, avs); + // send each frame. + while (!avs->empty()) { + char* frame = NULL; + int frame_size = 0; + if ((err = avc->annexb_demux(avs, &frame, &frame_size)) != srs_success) { + return srs_error_wrap(err, "demux annexb"); + } + + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // ignore the nalu type sei(6) aud(9) + if (nal_unit_type == SrsAvcNaluTypeAccessUnitDelimiter || + nal_unit_type == SrsAvcNaluTypeSEI) { + continue; + } + + // for sps + if (avc->is_sps(frame, frame_size)) { + std::string sps; + if ((err = avc->sps_demux(frame, frame_size, sps)) != srs_success) { + return srs_error_wrap(err, "demux sps"); + } + + if (h264_sps == sps) { + continue; + } + h264_sps_changed = true; + h264_sps = sps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + continue; + } + + // for pps + if (avc->is_pps(frame, frame_size)) { + std::string pps; + if ((err = avc->pps_demux(frame, frame_size, pps)) != srs_success) { + return srs_error_wrap(err, "demux pps"); + } + + if (h264_pps == pps) { + continue; + } + h264_pps_changed = true; + h264_pps = pps; + + if ((err = write_h264_sps_pps(dts, pts)) != srs_success) { + return srs_error_wrap(err, "write sps/pps"); + } + continue; + } + + // ibp frame. + // TODO: FIXME: we should group all frames to a rtmp/flv message from one ts message. + srs_info("gb28181: demux avc ibp frame size=%d, dts=%d", frame_size, dts); + if ((err = write_h264_ipb_frame(frame, frame_size, dts, pts)) != srs_success) { + return srs_error_wrap(err, "write frame"); + } + } + + return err; +} + +srs_error_t SrsGb28181Conn::on_rtp_audio(SrsSimpleStream* stream, int64_t fdts) +{ + srs_error_t err = srs_success; + + // ensure rtmp connected. + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + // sip tbn is ts tbn. + if ((err = ajitter->correct(fdts)) != srs_success) { + return srs_error_wrap(err, "jitter"); + } + + recv_rtp_time = srs_get_system_time(); + + // ts tbn to flv tbn. + uint32_t dts = (uint32_t)(fdts / 90); + + // send each frame. + SrsBuffer *avs = new SrsBuffer(stream->bytes(), stream->length()); + SrsAutoFree(SrsBuffer, avs); + if (!avs->empty()) { + char* frame = NULL; + int frame_size = 0; + SrsRawAacStreamCodec codec; + if ((err = aac->adts_demux(avs, &frame, &frame_size, codec)) != srs_success) { + return srs_error_wrap(err, "demux adts"); + } + + if (frame_size <= 0) { + return err; + } + + bool send_adts = false; + static int srs_aac_srates[] = { + 96000, 88200, 64000, 48000, + 44100, 32000, 24000, 22050, + 16000, 12000, 11025, 8000, + 7350, 0, 0, 0 + }; + switch (srs_aac_srates[codec.sampling_frequency_index]) { + case 11025: + codec.sound_rate = SrsAudioSampleRate11025; + break; + case 22050: + codec.sound_rate = SrsAudioSampleRate22050; + break; + case 44100: + codec.sound_rate = SrsAudioSampleRate44100; + break; + default: + send_adts = true; //raw with adts + break; + }; + + std::string sh; + if ((err = aac->mux_sequence_header(&codec, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + + if (aac_specific_config != sh){ + std::string sh; + if ((err = aac->mux_sequence_header(&codec, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + aac_specific_config = sh; + codec.aac_packet_type = 0; + if ((err = write_audio_raw_frame((char*)sh.data(), (int)sh.length(), &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write raw audio frame"); + } + } + + codec.aac_packet_type = 1; + if (send_adts) { // audio raw data. with adts header + if ((err = write_audio_raw_frame(stream->bytes(), stream->length(), &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + }else { // audio raw data. without adts header + if ((err = write_audio_raw_frame(frame, frame_size, &codec, dts)) != srs_success) { + return srs_error_wrap(err, "write audio raw frame"); + } + } + }//end if (!avs->empty()) + + return err; +} + +srs_error_t SrsGb28181Conn::write_h264_sps_pps(uint32_t dts, uint32_t pts) +{ + srs_error_t err = srs_success; + + if (!h264_sps_changed || !h264_pps_changed) { + return err; + } + + // h264 raw to h264 packet. + std::string sh; + if ((err = avc->mux_sequence_header(h264_sps, h264_pps, dts, pts, sh)) != srs_success) { + return srs_error_wrap(err, "mux sequence header"); + } + + // h264 packet to flv packet. + int8_t frame_type = SrsVideoAvcFrameTypeKeyFrame; + int8_t avc_packet_type = SrsVideoAvcFrameTraitSequenceHeader; + char* flv = NULL; + int nb_flv = 0; + if ((err = avc->mux_avc2flv(sh, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + if ((err = rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv)) != srs_success) { + return srs_error_wrap(err, "write packet"); + } + + // reset sps and pps. + h264_sps_changed = false; + h264_pps_changed = false; + h264_sps_pps_sent = true; + + return err; +} + +srs_error_t SrsGb28181Conn::write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts) +{ + srs_error_t err = srs_success; + + // 5bits, 7.3.1 NAL unit syntax, + // ISO_IEC_14496-10-AVC-2003.pdf, page 44. + // 7: SPS, 8: PPS, 5: I Frame, 1: P Frame + SrsAvcNaluType nal_unit_type = (SrsAvcNaluType)(frame[0] & 0x1f); + + // for IDR frame, the frame is keyframe. + SrsVideoAvcFrameType frame_type = SrsVideoAvcFrameTypeInterFrame; + if (nal_unit_type == SrsAvcNaluTypeIDR) { + frame_type = SrsVideoAvcFrameTypeKeyFrame; + } + + std::string ibp; + if ((err = avc->mux_ipb_frame(frame, frame_size, ibp)) != srs_success) { + return srs_error_wrap(err, "mux ibp frame"); + } + + int8_t avc_packet_type = SrsVideoAvcFrameTraitNALU; + char* flv = NULL; + int nb_flv = 0; + if ((err = avc->mux_avc2flv(ibp, frame_type, avc_packet_type, dts, pts, &flv, &nb_flv)) != srs_success) { + return srs_error_wrap(err, "mux avc to flv"); + } + + // the timestamp in rtmp message header is dts. + uint32_t timestamp = dts; + return rtmp_write_packet(SrsFrameTypeVideo, timestamp, flv, nb_flv); +} + +srs_error_t SrsGb28181Conn::write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts) +{ + srs_error_t err = srs_success; + + char* data = NULL; + int size = 0; + if ((err = aac->mux_aac2flv(frame, frame_size, codec, dts, &data, &size)) != srs_success) { + return srs_error_wrap(err, "mux aac to flv"); + } + + return rtmp_write_packet(SrsFrameTypeAudio, dts, data, size); +} + +srs_error_t SrsGb28181Conn::rtmp_write_packet(char type, uint32_t timestamp, char* data, int size) +{ + srs_error_t err = srs_success; + + if ((err = connect()) != srs_success) { + return srs_error_wrap(err, "connect"); + } + + SrsSharedPtrMessage* msg = NULL; + + if ((err = srs_rtmp_create_msg(type, timestamp, data, size, sdk->sid(), &msg)) != srs_success) { + return srs_error_wrap(err, "create message"); + } + srs_assert(msg); + + // send out encoded msg. + if ((err = sdk->send_and_free_message(msg)) != srs_success) { + close(); + return srs_error_wrap(err, "write message"); + } + + return err; +} + +srs_error_t SrsGb28181Conn::connect() +{ + srs_error_t err = srs_success; + + // Ignore when connected. + if (sdk) { + return err; + } + + // generate rtmp url to connect to. + std::string url; + if (true) { + std::string schema, host, vhost, app, param; + int port; + //srs_discovery_tc_url(rtmp_url, schema, host, vhost, app, rtsp_stream, port, param); + + // generate output by template. + std::string output = rtmp_url; + output = srs_string_replace(output, "[app]", "live"); + output = srs_string_replace(output, "[stream]", session_id); + output = srs_path_build_timestamp(output); + url = output; + } + // connect host. + srs_utime_t cto = SRS_CONSTS_RTMP_TIMEOUT; + srs_utime_t sto = SRS_CONSTS_RTMP_PULSE; + sdk = new SrsSimpleRtmpClient(url, cto, sto); + + srs_trace("gb28181: rtmp connect url=%s", url.c_str()); + if ((err = sdk->connect()) != srs_success) { + close(); + return srs_error_wrap(err, "connect %s failed, cto=%dms, sto=%dms.", url.c_str(), srsu2msi(cto), srsu2msi(sto)); + } + + // publish. + if ((err = sdk->publish(SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE)) != srs_success) { + close(); + return srs_error_wrap(err, "publish %s failed", url.c_str()); + } + + return err; +} + +void SrsGb28181Conn::close() +{ + srs_freep(sdk); +} + +SrsGb28181Config::SrsGb28181Config(SrsConfDirective* c) +{ + // TODO: FIXME: support reload. + output = _srs_config->get_stream_caster_output(c); + rtp_port_min = _srs_config->get_stream_caster_rtp_port_min(c); + rtp_port_max = _srs_config->get_stream_caster_rtp_port_max(c); + rtp_idle_timeout = _srs_config->get_stream_caster_gb28181_rtp_ide_timeout(c); + sip_ack_timeout = _srs_config->get_stream_caster_gb28181_ack_timeout(c); + sip_keepalive_timeout = _srs_config->get_stream_caster_gb28181_keepalive_timeout(c); + listen_port = _srs_config->get_stream_caster_listen(c); + sip_host = _srs_config->get_stream_caster_gb28181_host(c); + sip_realm = _srs_config->get_stream_caster_gb28181_realm(c); + sip_serial = _srs_config->get_stream_caster_gb28181_serial(c); + audio_enable = _srs_config->get_stream_caster_gb28181_audio_enable(c); + print_sip_message = _srs_config->get_stream_caster_gb28181_print_sip_message(c); + wait_keyframe = _srs_config->get_stream_caster_gb28181_wait_keyframe(c); +} + +SrsGb28181Config::~SrsGb28181Config() +{ + +} + +//gb28181 caster +SrsGb28181Caster::SrsGb28181Caster(SrsConfDirective* c) +{ + // TODO: FIXME: support reload. + //output = _srs_config->get_stream_caster_output(c); + //local_port_min = _srs_config->get_stream_caster_rtp_port_min(c); + //local_port_max = _srs_config->get_stream_caster_rtp_port_max(c); + sip = new SrsSipStack(); + manager = new SrsCoroutineManager(); + config = new SrsGb28181Config(c); + lfd = NULL; +} + +SrsGb28181Caster::~SrsGb28181Caster() +{ + used_ports.clear(); + + srs_freep(manager); + srs_freep(sip); + srs_freep(config); + + destroy(); +} + +srs_error_t SrsGb28181Caster::initialize() +{ + srs_error_t err = srs_success; + if ((err = manager->start()) != srs_success) { + return srs_error_wrap(err, "start manager"); + } + + return err; +} + +void SrsGb28181Caster::set_stfd(srs_netfd_t fd) +{ + lfd = fd; +} + +SrsGb28181Config SrsGb28181Caster::GetGb28181Config() +{ + return *config; +} + +srs_error_t SrsGb28181Caster::alloc_port(int* pport) +{ + srs_error_t err = srs_success; + + // use a pair of port. + for (int i = config->rtp_port_min; i < config->rtp_port_max - 1; i += 2) { + if (!used_ports[i]) { + used_ports[i] = true; + used_ports[i + 1] = true; + *pport = i; + break; + } + } + srs_info("gb28181: alloc port=%d-%d", *pport, *pport + 1); + + return err; +} + +void SrsGb28181Caster::free_port(int lpmin, int lpmax) +{ + for (int i = lpmin; i < lpmax; i++) { + used_ports[i] = false; + } + srs_trace("gb28181: free rtp port=%d-%d", lpmin, lpmax); +} + +srs_error_t SrsGb28181Caster::on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf) +{ + char address_string[64]; + char port_string[16]; + if(getnameinfo(from, fromlen, + (char*)&address_string, sizeof(address_string), + (char*)&port_string, sizeof(port_string), + NI_NUMERICHOST|NI_NUMERICSERV)) { + return srs_error_new(ERROR_SYSTEM_IP_INVALID, "bad address"); + } + std::string peer_ip = std::string(address_string); + int peer_port = atoi(port_string); + + // append to buffer. + //buffer->append(buf, nb_buf); + srs_error_t err = on_udp_bytes(peer_ip, peer_port, buf, nb_buf, (sockaddr*)from, fromlen); + if (err != srs_success) { + return srs_error_wrap(err, "process udp"); + } + return err; +} + +srs_error_t SrsGb28181Caster::on_udp_bytes(string peer_ip, int peer_port, + char* buf, int nb_buf, sockaddr* from, const int fromlen) +{ + srs_error_t err = srs_success; + + if (config->print_sip_message) + { + srs_trace("gb28181: request peer_ip=%s, peer_port=%d nbbuf=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request recv message=%s", buf); + } + + if (nb_buf < 10) { + return err; + } + + SrsSipRequest* req = NULL; + + if ((err = sip->parse_request(&req, buf, nb_buf)) != srs_success) { + return srs_error_wrap(err, "recv message"); + } + + if (config->print_sip_message) + { + srs_trace("gb28181: %s method=%s, uri=%s, version=%s ", + req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); + srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); + } + + req->peer_ip = peer_ip; + req->peer_port = peer_port; + + SrsAutoFree(SrsSipRequest, req); + + if (req->is_register()) { + std::vector serial = srs_string_split(srs_string_replace(req->uri,"sip:", ""), "@"); + if (serial.at(0) != config->sip_serial){ + srs_trace("gb28181: client:%s request serial and server serial inconformity(%s:%s)", + req->sip_auth_id.c_str(), serial.at(0).c_str(), config->sip_serial.c_str()); + return srs_success; + } + + srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", + req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); + srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); + + SrsGb28181Conn* conn = NULL; + + if ((err = fetch_or_create(req, &conn)) != srs_success) { + srs_trace("gb28181: conn create faild:%s", req->uri.c_str()); + return srs_error_wrap(err, "conn create faild");; + } + srs_assert(conn != NULL); + + if (conn->register_status == Srs28181Unkonw) + { + conn->serve(); + }else{ + srs_trace("gb28181: %s client is register", req->sip_auth_id.c_str()); + } + + send_status(req, from, fromlen); + conn->register_status = Srs28181RegisterOk; + conn->register_time = srs_get_system_time(); + conn->reg_expires = req->expires; + + + }else if (req->is_message()) { + SrsGb28181Conn* conn = fetch(req); + if (!conn){ + srs_trace("gb28181: %s client not found", req->sip_auth_id.c_str()); + return srs_success; + } + + if (conn->register_status == Srs28181Unkonw) { + send_bye(req, from, fromlen); + srs_trace("gb28181: %s client not register", req->sip_auth_id.c_str()); + return srs_success; + } + + send_status(req, from, fromlen); + conn->alive_status = Srs28181AliveOk; + conn->alive_time = srs_get_system_time(); + + if (conn->register_status == Srs28181RegisterOk && + conn->alive_status == Srs28181AliveOk && + conn->invite_status != Srs28181InviteOk) + { + int lpm = 0; + if (alloc_port(&lpm) != srs_success) { + return srs_error_wrap(err, "alloc port"); + } + + if (lpm){ + send_invite(req, from, fromlen, lpm); + conn->rtmp_url = config->output; + conn->start_rtp_listen(lpm); + conn->invite_status == Srs28181Trying; + conn->invite_time = srs_get_system_time(); + } + + } + + }else if (req->is_invite()) { + SrsGb28181Conn* conn = fetch(req); + + srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", + req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); + srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); + + if (!conn){ + send_bye(req, from, fromlen); + srs_trace("gb28181: %s client not found", req->sip_auth_id.c_str()); + return srs_success; + } + + if (conn->register_status == Srs28181Unkonw || + conn->alive_status == Srs28181Unkonw) { + send_bye(req, from, fromlen); + srs_trace("gb28181: %s client not register or alive", req->sip_auth_id.c_str()); + return srs_success; + } + + if (req->cmdtype == SrsSipCmdRespone && req->status == "200") { + srs_trace("gb28181: INVITE response %s client status=%s", req->sip_auth_id.c_str(), req->status.c_str()); + send_ack(req, from, fromlen); + conn->invite_status = Srs28181InviteOk; + conn->invite_time = srs_get_system_time(); + } + }else if (req->is_bye()) { + srs_trace("gb28181: request peer_ip=%s, peer_port=%d", peer_ip.c_str(), peer_port, nb_buf); + srs_trace("gb28181: request %s method=%s, uri=%s, version=%s ", + req->get_cmdtype_str().c_str(), req->method.c_str(), req->uri.c_str(), req->version.c_str()); + srs_trace("gb28281: request client id=%s", req->sip_auth_id.c_str()); + + SrsGb28181Conn* conn = fetch(req); + send_status(req, from, fromlen); + + if (!conn){ + srs_trace("gb28181: %s client not found", req->sip_auth_id.c_str()); + return srs_success; + } + + conn->stop_rtp_listen(); + conn->invite_status = Srs28181Bye; + conn->invite_time = 0; + + }else{ + srs_trace("gb28181: ingor request method=%s", req->method.c_str()); + } + + return err; +} + +srs_error_t SrsGb28181Caster::send_message(sockaddr* from, int fromlen, std::stringstream& ss) +{ + srs_error_t err = srs_success; + + std::string str = ss.str(); + if (config->print_sip_message) + srs_trace("gb28181: send_message:%s", str.c_str()); + srs_assert(!str.empty()); + + int ret = srs_sendto(lfd, (char*)str.c_str(), (int)str.length(), from, fromlen, SRS_UTIME_NO_TIMEOUT); + if (ret <= 0){ + return srs_error_wrap(err, "gb28181: send_message falid"); + } + + return err; +} + +srs_error_t SrsGb28181Caster::send_bye(SrsSipRequest *req, sockaddr *f, int l) +{ + srs_error_t err = srs_success; + srs_assert(req); + + std::stringstream ss; + + req->host = config->sip_host; + req->host_port = config->listen_port; + req->realm = config->sip_realm; + req->serial = config->sip_serial; + + sip->req_bye(ss, req); + send_message(f, l, ss); + + return err; + +} +srs_error_t SrsGb28181Caster::send_ack(SrsSipRequest *req, sockaddr *f, int l) +{ + srs_error_t err = srs_success; + srs_assert(req); + + std::stringstream ss; + + req->host = config->sip_host; + req->host_port = config->listen_port; + req->realm = config->sip_realm; + req->serial = config->sip_serial; + + sip->resp_ack(ss, req); + send_message(f, l, ss); + + return err; +} + +srs_error_t SrsGb28181Caster::send_invite(SrsSipRequest *req, sockaddr *f, int l, int port) +{ + srs_error_t err = srs_success; + srs_assert(req); + + std::stringstream ss; + + req->host = config->sip_host; + req->host_port = config->listen_port; + req->realm = config->sip_realm; + req->serial = config->sip_serial; + + sip->req_invite(ss, req, port); + send_message(f, l, ss); + + return err; + +} + +srs_error_t SrsGb28181Caster::send_status(SrsSipRequest *req, sockaddr *f, int l) +{ + srs_error_t err = srs_success; + srs_assert(req); + + std::stringstream ss; + + req->host = config->sip_host; + req->host_port = config->listen_port; + req->realm = config->sip_realm; + req->serial = config->sip_serial; + + sip->resp_status(ss, req); + send_message(f, l, ss); + + return err; + +} + + +srs_error_t SrsGb28181Caster::fetch_or_create(SrsSipRequest* r, SrsGb28181Conn** gb28181) +{ + srs_error_t err = srs_success; + + SrsGb28181Conn* conn = NULL; + if ((conn = fetch(r)) != NULL) { + *gb28181 = conn; + return err; + } + + string key = r->sip_auth_id; + conn = new SrsGb28181Conn(this, key); + conn->set_request_info(r); + if ((err = conn->serve()) != srs_success) { + return srs_error_wrap(err, "sipconn serve %s", key.c_str()); + } + clients[key] = conn; + *gb28181 = conn; + + return err; +} + +SrsGb28181Conn* SrsGb28181Caster::fetch(const SrsSipRequest* r) +{ + SrsGb28181Conn* conn = NULL; + + string key = r->sip_auth_id; + if (clients.find(key) == clients.end()) { + return NULL; + } + + conn = clients[key]; + return conn; +} + + +void SrsGb28181Caster::destroy() +{ + std::map::iterator it; + for (it = clients.begin(); it != clients.end(); ++it) { + SrsGb28181Conn* conn = it->second; + manager->remove(conn); + } + clients.clear(); +} + +void SrsGb28181Caster::remove(SrsGb28181Conn* conn) +{ + std::string id = conn->get_session_id(); + map::iterator key = clients.find(id); + if (key != clients.end()) { + clients.erase(key); + } + manager->remove(conn); +} diff --git a/trunk/src/app/srs_app_gb28181.hpp b/trunk/src/app/srs_app_gb28181.hpp new file mode 100644 index 0000000000..3d8bd13b0a --- /dev/null +++ b/trunk/src/app/srs_app_gb28181.hpp @@ -0,0 +1,330 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_APP_GB28181_HPP +#define SRS_APP_GB28181_HPP + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +class SrsStSocket; +class SrsRtpConn; +class SrsRtspConn; +class SrsRtspStack; +class SrsRtspCaster; +class SrsConfDirective; +class SrsRtpPacket; +class SrsRequest; +class SrsStSocket; +class SrsRtmpClient; +class SrsRawH264Stream; +class SrsRawAacStream; +struct SrsRawAacStreamCodec; +class SrsSharedPtrMessage; +class SrsAudioFrame; +class SrsSimpleStream; +class SrsPithyPrint; +class SrsSimpleRtmpClient; +class SrsSipStack; +class SrsGb28181Caster; +class SrsRtspJitter; +class SrsRtspAudioCache; +class SrsSipRequest; +class SrsGb28181Conn; +class SrsGb28281ClientInfo; + +/* gb28181 program stream struct define + +*/ + +struct SrsPsPacketStartCode +{ + uint8_t start_code[3]; + uint8_t stream_id[1]; +}; + +struct SrsPsPacketHeader +{ + SrsPsPacketStartCode start;// 4 + uint8_t info[9]; + uint8_t stuffing_length; +}; + +struct SrsPsPacketBBHeader +{ + SrsPsPacketStartCode start; + uint16_t length; +}; + +struct SrsPsePacket +{ + SrsPsPacketStartCode start; + uint16_t length; + uint8_t info[2]; + uint8_t stuffing_length; +}; + +struct SrsPsMapPacket +{ + SrsPsPacketStartCode start; + uint16_t length; +}; + + + +class SrsPsRtpPacket: public SrsRtpPacket +{ +public: + SrsPsRtpPacket(); + virtual ~SrsPsRtpPacket(); +public: + virtual srs_error_t decode(SrsBuffer* stream); +}; + +// A rtp connection which transport a stream. +class SrsPsRtpConn: public ISrsUdpHandler +{ +private: + SrsPithyPrint* pprint; + SrsUdpListener* listener; + SrsGb28181Conn* gb28181; + SrsPsRtpPacket* cache; + std::map cache_payload; + std::string session_id; + int _port; + uint32_t pre_timestamp; + + SrsFileWriter ps_fw; + SrsFileWriter video_fw; + SrsFileWriter audio_fw; + + bool first_keyframe_flag; + bool wait_first_keyframe; + bool audio_enable; + +public: + SrsPsRtpConn(SrsGb28181Conn* r, int p, std::string sid, bool a, bool k); + virtual ~SrsPsRtpConn(); + +private: + int64_t parse_ps_timestamp(const uint8_t* p); + +private: + bool can_send_ps_av_packet(); + void dispose(); +public: + virtual int port(); + virtual srs_error_t listen(); +// Interface ISrsUdpHandler +public: + virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); + virtual srs_error_t on_ps_stream(char* ps_data, int ps_size, uint32_t timestamp); +}; + +class SrsGb28281ClientInfo { +public: + SrsGb28281ClientInfo(); + virtual ~SrsGb28281ClientInfo(); + +public: + sockaddr* sock_from; + int sock_fromlen; + srs_netfd_t stfd; + SrsSipRequest *req; +}; + +enum Srs28181CtrlStatusType{ + Srs28181Unkonw = 0, + Srs28181RegisterOk = 1, + Srs28181AliveOk = 2, + Srs28181InviteOk = 3, + Srs28181Trying = 4, + Srs28181Bye = 5, +}; + +class SrsGb28181Conn : public ISrsCoroutineHandler, public ISrsConnection +{ +private: + std::string output_template; + SrsPithyPrint* pprint; +public: + Srs28181CtrlStatusType register_status; + Srs28181CtrlStatusType alive_status; + Srs28181CtrlStatusType invite_status; + srs_utime_t register_time; + srs_utime_t alive_time; + srs_utime_t invite_time; + srs_utime_t recv_rtp_time; + + std::string rtmp_url; + int reg_expires; + +private: + std::string session_id; + // video stream. + int video_id; + std::string video_codec; + SrsPsRtpConn* video_rtp; + // audio stream. + int audio_id; + std::string audio_codec; + int audio_sample_rate; + int audio_channel; + SrsPsRtpConn* audio_rtp; +public: + SrsGb28281ClientInfo* info; +private: + SrsStSocket* skt; + SrsSipStack* sip; + SrsGb28181Caster* caster; + SrsCoroutine* trd; +private: + SrsSipRequest* req; + SrsSimpleRtmpClient* sdk; + SrsRtspJitter* vjitter; + SrsRtspJitter* ajitter; +private: + SrsRawH264Stream* avc; + std::string h264_sps; + std::string h264_pps; + bool h264_sps_changed; + bool h264_pps_changed; + bool h264_sps_pps_sent; +private: + SrsRawAacStream* aac; + std::string aac_specific_config; +public: + SrsGb28181Conn(SrsGb28181Caster* c, std::string id); + virtual ~SrsGb28181Conn(); +public: + virtual srs_error_t serve(); + virtual std::string remote_ip(); + virtual void set_request_info(SrsSipRequest *req); + virtual std::string get_session_id(); +private: + virtual srs_error_t do_cycle(); +// internal methods +public: + virtual srs_error_t start_rtp_listen(int port); + virtual srs_error_t stop_rtp_listen(); +// Interface ISrsOneCycleThreadHandler +public: + virtual srs_error_t cycle(); +public: + virtual srs_error_t on_rtp_video(SrsSimpleStream* stream, int64_t dts, int keyframe); + virtual srs_error_t on_rtp_audio(SrsSimpleStream* stream, int64_t dts); +private: + virtual srs_error_t write_h264_sps_pps(uint32_t dts, uint32_t pts); + virtual srs_error_t write_h264_ipb_frame(char* frame, int frame_size, uint32_t dts, uint32_t pts); + virtual srs_error_t write_audio_raw_frame(char* frame, int frame_size, SrsRawAacStreamCodec* codec, uint32_t dts); + virtual srs_error_t rtmp_write_packet(char type, uint32_t timestamp, char* data, int size); +private: + // Connect to RTMP server. + virtual srs_error_t connect(); + // Close the connection to RTMP server. + virtual void close(); +}; + +class SrsGb28181Config +{ +public: + std::string sip_host; + std::string sip_port; + std::string sip_serial; + std::string sip_realm; + int sip_ack_timeout; + int sip_keepalive_timeout; + int rtp_idle_timeout; + bool audio_enable; + std::string output; + int rtp_port_min; + int rtp_port_max; + int listen_port; + bool print_sip_message; + bool wait_keyframe; +public: + SrsGb28181Config(SrsConfDirective* c); + virtual ~SrsGb28181Config(); +}; + +//gb28181 conn manager +class SrsGb28181Caster : public ISrsUdpHandler +{ +private: + SrsGb28181Config *config; + // The key: port, value: whether used. + std::map used_ports; + SrsSipStack *sip; + srs_netfd_t lfd; +private: + std::map clients; + SrsCoroutineManager* manager; + +public: + SrsGb28181Caster(SrsConfDirective* c); + virtual ~SrsGb28181Caster(); + +private: + srs_error_t fetch_or_create(SrsSipRequest* r, SrsGb28181Conn** gb28181); + virtual SrsGb28181Conn* fetch(const SrsSipRequest* r); + virtual void destroy(); +public: + // Alloc a rtp port from local ports pool. + // @param pport output the rtp port. + virtual srs_error_t alloc_port(int* pport); + // Free the alloced rtp port. + virtual void free_port(int lpmin, int lpmax); + virtual srs_error_t initialize(); + + virtual void set_stfd(srs_netfd_t fd); + virtual SrsGb28181Config GetGb28181Config(); + +// Interface ISrsUdpHandler +public: + virtual srs_error_t on_udp_packet(const sockaddr* from, const int fromlen, char* buf, int nb_buf); +private: + virtual srs_error_t on_udp_bytes(std::string host, int port, char* buf, int nb_buf, sockaddr* from, int fromlen); +// internal methods. +public: + virtual srs_error_t send_message(sockaddr* f, int l, std::stringstream& ss); + virtual srs_error_t send_bye(SrsSipRequest *req, sockaddr *f, int l); + virtual srs_error_t send_ack(SrsSipRequest *req, sockaddr *f, int l); + virtual srs_error_t send_invite(SrsSipRequest *req, sockaddr *f, int l, int port); + virtual srs_error_t send_status(SrsSipRequest *req, sockaddr *f, int l); + virtual void remove(SrsGb28181Conn* conn); +}; + +#endif + diff --git a/trunk/src/app/srs_app_listener.cpp b/trunk/src/app/srs_app_listener.cpp index b33b4f6f8e..0634e2855b 100755 --- a/trunk/src/app/srs_app_listener.cpp +++ b/trunk/src/app/srs_app_listener.cpp @@ -59,6 +59,13 @@ srs_error_t ISrsUdpHandler::on_stfd_change(srs_netfd_t /*fd*/) return srs_success; } +void ISrsUdpHandler::set_stfd(srs_netfd_t /*fd*/) +{ + +} + + + ISrsTcpHandler::ISrsTcpHandler() { } @@ -104,6 +111,8 @@ srs_error_t SrsUdpListener::listen() if ((err = srs_udp_listen(ip, port, &lfd)) != srs_success) { return srs_error_wrap(err, "listen %s:%d", ip.c_str(), port); } + + handler->set_stfd(lfd); srs_freep(trd); trd = new SrsSTCoroutine("udp", this); @@ -206,4 +215,3 @@ srs_error_t SrsTcpListener::cycle() return err; } - diff --git a/trunk/src/app/srs_app_listener.hpp b/trunk/src/app/srs_app_listener.hpp index d7d930e91c..5f112ae121 100644 --- a/trunk/src/app/srs_app_listener.hpp +++ b/trunk/src/app/srs_app_listener.hpp @@ -43,6 +43,8 @@ class ISrsUdpHandler // When fd changed, for instance, reload the listen port, // notify the handler and user can do something. virtual srs_error_t on_stfd_change(srs_netfd_t fd); + + virtual void set_stfd(srs_netfd_t fd); public: // When udp listener got a udp packet, notice server to process it. // @param type, the client type, used to create concrete connection, diff --git a/trunk/src/app/srs_app_server.cpp b/trunk/src/app/srs_app_server.cpp index 2edc525aa6..26106feaa9 100644 --- a/trunk/src/app/srs_app_server.cpp +++ b/trunk/src/app/srs_app_server.cpp @@ -51,6 +51,7 @@ using namespace std; #include #include #include +#include // system interval in srs_utime_t, // all resolution times should be times togother, @@ -108,6 +109,8 @@ std::string srs_listener_type2string(SrsListenerType type) return "RTSP"; case SrsListenerFlv: return "HTTP-FLV"; + case SrsListenerGb28181: + return "GB28181-SIP over UDP"; default: return "UNKONWN"; } @@ -298,7 +301,7 @@ srs_error_t SrsUdpStreamListener::listen(string i, int p) // the caller already ensure the type is ok, // we just assert here for unknown stream caster. - srs_assert(type == SrsListenerMpegTsOverUdp); + srs_assert(type == SrsListenerMpegTsOverUdp || type == SrsListenerGb28181); ip = i; port = p; @@ -336,6 +339,22 @@ SrsUdpCasterListener::~SrsUdpCasterListener() srs_freep(caster); } + +SrsGb28181Listener::SrsGb28181Listener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c) : SrsUdpStreamListener(svr, t, NULL) +{ + // the caller already ensure the type is ok, + // we just assert here for unknown stream caster. + srs_assert(type == SrsListenerGb28181); + if (type == SrsListenerGb28181) { + caster = new SrsGb28181Caster(c); + } +} + +SrsGb28181Listener::~SrsGb28181Listener() +{ + srs_freep(caster); +} + SrsSignalManager* SrsSignalManager::instance = NULL; SrsSignalManager::SrsSignalManager(SrsServer* s) @@ -1093,6 +1112,8 @@ srs_error_t SrsServer::listen_stream_caster() listener = new SrsRtspListener(this, SrsListenerRtsp, stream_caster); } else if (srs_stream_caster_is_flv(caster)) { listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster); + } else if (srs_stream_caster_is_gb28181(caster)) { + listener = new SrsGb28181Listener(this, SrsListenerGb28181, stream_caster); } else { return srs_error_new(ERROR_STREAM_CASTER_ENGINE, "invalid caster %s", caster.c_str()); } diff --git a/trunk/src/app/srs_app_server.hpp b/trunk/src/app/srs_app_server.hpp index f43d7fc049..46127b6c67 100644 --- a/trunk/src/app/srs_app_server.hpp +++ b/trunk/src/app/srs_app_server.hpp @@ -69,6 +69,8 @@ enum SrsListenerType SrsListenerRtsp = 4, // TCP stream, FLV stream over HTTP. SrsListenerFlv = 5, + // UDP stream, gb28181 stream + SrsListenerGb28181 = 6, }; // A common tcp listener, for RTMP/HTTP server. @@ -156,6 +158,14 @@ class SrsUdpCasterListener : public SrsUdpStreamListener virtual ~SrsUdpCasterListener(); }; +// A UDP sip listener, for sip server. +class SrsGb28181Listener : public SrsUdpStreamListener +{ +public: + SrsGb28181Listener(SrsServer* svr, SrsListenerType t, SrsConfDirective* c); + virtual ~SrsGb28181Listener(); +}; + // Convert signal to io, // @see: st-1.9/docs/notes.html class SrsSignalManager : public ISrsCoroutineHandler diff --git a/trunk/src/protocol/srs_sip_stack.cpp b/trunk/src/protocol/srs_sip_stack.cpp new file mode 100644 index 0000000000..aed80de1e8 --- /dev/null +++ b/trunk/src/protocol/srs_sip_stack.cpp @@ -0,0 +1,553 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#include + +#if !defined(SRS_EXPORT_LIBRTMP) + +#include +#include +#include +#include + +using namespace std; + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define SIP_MAX_HEADER_LEN 2049 + +unsigned int srs_sip_random(int min,int max) +{ + srand(int(time(0))); + return rand() % (max - min + 1) + min; +} + +std::string srs_sip_get_form_to_uri(std::string msg) +{ + //;tag=536961166 + //sip:34020000002000000001@3402000000 + + size_t pos = msg.find("<"); + if (pos == string::npos) { + return msg; + } + + msg = msg.substr(pos+1); + + size_t pos2 = msg.find(">"); + if (pos2 == string::npos) { + return msg; + } + + msg = msg.substr(0, pos2-1); + return msg; +} + + +std::string srs_sip_get_param(std::string msg, std::string param) +{ + std::vector vec_params = srs_string_split(msg, ";"); + + size_t min_pos = string::npos; + for (vector::iterator it = vec_params.begin(); it != vec_params.end(); ++it) { + string value = *it; + + size_t pos = value.find(param); + if (pos == string::npos) { + continue; + } + + std::vector v_pram = srs_string_split(value, "="); + + if (v_pram.size() > 0) { + return v_pram.at(1); + } + } + return ""; +} + +SrsSipRequest::SrsSipRequest() +{ + seq = 0; + content_length = 0; + sdp = NULL; + transport = NULL; + + method = ""; + uri = "";; + version = "";; + seq = 0; + content_type = ""; + content_length = 0; + call_id = ""; + from = ""; + to = ""; + via = ""; + from_tag = ""; + to_tag = ""; + contact = ""; + user_agent = ""; + branch = ""; + status = ""; + expires = 3600; + max_forwards = 70; + cmdtype = SrsSipCmdRequest; + + host = "127.0.0.1";; + host_port = 5060; + + serial = "";; + realm = "";; + + sip_auth_id = ""; + sip_auth_pwd = ""; + sip_username = ""; + peer_ip = ""; + peer_port = 0; +} + +SrsSipRequest::~SrsSipRequest() +{ + srs_freep(sdp); + srs_freep(transport); +} + +bool SrsSipRequest::is_register() +{ + return method == SRS_SIP_METHOD_REGISTER; +} + +bool SrsSipRequest::is_invite() +{ + return method == SRS_SIP_METHOD_INVITE; +} + +bool SrsSipRequest::is_ack() +{ + return method == SRS_SIP_METHOD_ACK; +} + +bool SrsSipRequest::is_message() +{ + return method == SRS_SIP_METHOD_MESSAGE; +} + +bool SrsSipRequest::is_bye() +{ + return method == SRS_SIP_METHOD_BYE; +} + +std::string SrsSipRequest::get_cmdtype_str() +{ + switch(cmdtype) { + case SrsSipCmdRequest: + return "request"; + case SrsSipCmdRespone: + return "respone"; + } + + return ""; +} + +void SrsSipRequest::copy(SrsSipRequest* src) +{ + if (!src){ + return; + } + + method = src->method; + uri = src->uri; + version = src->version; + seq = src->seq; + content_type = src->content_type; + content_length = src->content_length; + call_id = src->call_id; + from = src->from; + to = src->to; + via = src->via; + from_tag = src->from_tag; + to_tag = src->to_tag; + contact = src->contact; + user_agent = src->user_agent; + branch = src->branch; + status = src->status; + expires = src->expires; + max_forwards = src->max_forwards; + cmdtype = src->cmdtype; + + host = src->host; + host_port = src->host_port; + + serial = src->serial; + realm = src->realm; + + sip_auth_id = src->sip_auth_id; + sip_auth_pwd = src->sip_auth_pwd; + sip_username = src->sip_username; + peer_ip = src->peer_ip; + peer_port = src->peer_port; + +} + + +SrsSipStack::SrsSipStack() +{ + buf = new SrsSimpleStream(); +} + +SrsSipStack::~SrsSipStack() +{ + srs_freep(buf); +} + +srs_error_t SrsSipStack::parse_request(SrsSipRequest** preq, const char* recv_msg, int nb_len) +{ + srs_error_t err = srs_success; + + SrsSipRequest* req = new SrsSipRequest(); + if ((err = do_parse_request(req, recv_msg)) != srs_success) { + srs_freep(req); + return srs_error_wrap(err, "recv message"); + } + + *preq = req; + + return err; +} + +srs_error_t SrsSipStack::do_parse_request(SrsSipRequest* req, const char* recv_msg) +{ + srs_error_t err = srs_success; + + std::vector header_body = srs_string_split(recv_msg, SRS_RTSP_CRLFCRLF); + std::string header = header_body.at(0); + std::string body = ""; + + if (header_body.size() > 1){ + body = header_body.at(1); + } + + //srs_trace("sip: header=%s\n", header.c_str()); + //srs_trace("sip: body=%s\n", body.c_str()); + + // parse one by one. + char* start = (char*)header.c_str(); + char* end = start + header.size(); + char* p = start; + char* newline_start = start; + std::string firstline = ""; + while (p < end) { + if (p[0] == '\r' && p[1] == '\n'){ + p +=2; + int linelen = (int)(p - newline_start); + std::string oneline(newline_start, linelen); + newline_start = p; + + if (firstline == ""){ + firstline = oneline; + //srs_trace("=== first line=%s", firstline.c_str()); + }else{ + size_t pos = oneline.find(":"); + if (pos != string::npos){ + if (pos != 0) { + //ex: CSeq: 100 MESSAGE header is 'CSeq:',content is '100 MESSAGE' + std::string head = oneline.substr(0, pos+1); + std::string content = oneline.substr(pos+1, oneline.length()-pos-1); + content = srs_string_replace(content, "\r\n", ""); + content = srs_string_trim_start(content, " "); + char *phead = (char*)head.c_str(); + + if (!strcasecmp(phead, "call-id:")) { + std::vector vec_callid = srs_string_split(content, " "); + req->call_id = vec_callid.at(0); + } + else if (!strcasecmp(phead, "contact:")) { + req->contact = content; + } + else if (!strcasecmp(phead, "content-encoding:")) { + srs_trace("sip: message head %s content=%s", phead, content.c_str()); + } + else if (!strcasecmp(phead, "content-length:")) { + srs_trace("sip: message head %s content=%s", phead, content.c_str()); + } + else if (!strcasecmp(phead, "content-type:")) { + srs_trace("sip: message head %s content=%s", phead, content.c_str()); + } + else if (!strcasecmp(phead, "cseq:")) { + std::vector vec_seq = srs_string_split(content, " "); + req->seq = strtoul(vec_seq.at(0).c_str(), NULL, 10); + req->method = vec_seq.at(1); + } + else if (!strcasecmp(phead, "from:")) { + content = srs_string_replace(content, "sip:", ""); + req->from = srs_sip_get_form_to_uri(content.c_str()); + if (srs_string_contains(content, "tag")) { + req->from_tag = srs_sip_get_param(content.c_str(), "tag"); + } + } + else if (!strcasecmp(phead, "to:")) { + content = srs_string_replace(content, "sip:", ""); + req->to = srs_sip_get_form_to_uri(content.c_str()); + if (srs_string_contains(content, "tag")) { + req->to_tag = srs_sip_get_param(content.c_str(), "tag"); + } + } + else if (!strcasecmp(phead, "via:")) { + req->via = content; + req->branch = srs_sip_get_param(content.c_str(), "branch"); + } + else if (!strcasecmp(phead, "expires:")){ + req->expires = strtoul(content.c_str(), NULL, 10); + } + else if (!strcasecmp(phead, "user-agent:")){ + req->user_agent = content; + } + else { + srs_trace("sip: unkonw message head %s content=%s", phead, content.c_str()); + } + } + } + //srs_trace("====new line=%s", oneline.c_str()); + } + }else{ + p++; + } + } + + std::vector method_uri_ver = srs_string_split(firstline, " "); + //respone first line text:SIP/2.0 200 OK + if (!strcasecmp(method_uri_ver.at(0).c_str(), "sip/2.0")) { + req->cmdtype = SrsSipCmdRespone; + //req->method= vec_seq.at(1); + req->status = method_uri_ver.at(1); + req->version = method_uri_ver.at(0); + req->uri = req->from; + + vector str = srs_string_split(req->to, "@"); + req->sip_auth_id = srs_string_replace(str.at(0), "sip:", ""); + + }else {//request first line text :MESSAGE sip:34020000002000000001@3402000000 SIP/2.0 + req->cmdtype = SrsSipCmdRequest; + req->method= method_uri_ver.at(0); + req->uri = method_uri_ver.at(1); + req->version = method_uri_ver.at(2); + + vector str = srs_string_split(req->from, "@"); + req->sip_auth_id = srs_string_replace(str.at(0), "sip:", ""); + } + + req->sip_username = req->sip_auth_id; + + //srs_trace("sip: method=%s uri=%s version=%s cmdtype=%s", + // req->method.c_str(), req->uri.c_str(), req->version.c_str(), req->get_cmdtype_str().c_str()); + // srs_trace("via=%s", req->via.c_str()); + // srs_trace("via_branch=%s", req->branch.c_str()); + //srs_trace("cseq=%d", req->seq); + // srs_trace("contact=%s", req->contact.c_str()); + //srs_trace("from=%s", req->from.c_str()); + //srs_trace("to=%s", req->to.c_str()); + //srs_trace("callid=%s", req->call_id.c_str()); + // srs_trace("status=%s", req->status.c_str()); + // srs_trace("from_tag=%s", req->from_tag.c_str()); + // srs_trace("to_tag=%s", req->to_tag.c_str()); + //srs_trace("sip_auth_id=%s", req->sip_auth_id.c_str()); + + return err; +} + +srs_error_t SrsSipStack::resp_keepalive(std::stringstream& ss, SrsSipRequest *req){ + ss << SRS_SIP_VERSION <<" 200 OK" << SRS_RTSP_CRLF + << "Via: " << SRS_SIP_VERSION << "/UDP " << req->host << ":" << req->host_port << ";branch=" << req->branch << SRS_RTSP_CRLF + << "From: from.c_str() << ">;tag=" << req->from_tag << SRS_RTSP_CRLF + << "To: to.c_str() << ">\r\n" + << "Call-ID: " << req->call_id << SRS_RTSP_CRLF + << "CSeq: " << req->seq << " " << req->method << SRS_RTSP_CRLF + << "Contact: "<< req->contact << SRS_RTSP_CRLF + << "Max-Forwards: 70" << SRS_RTSP_CRLF + << "User-Agent: "<< SRS_SIP_USER_AGENT << SRS_RTSP_CRLF + << "Content-Length: 0" << SRS_RTSP_CRLFCRLF; + + return srs_success; +} + +srs_error_t SrsSipStack::resp_ack(std::stringstream& ss, SrsSipRequest *req){ + + ss << "ACK " << "sip:" << req->sip_auth_id << "@" << req->realm << " "<< SRS_SIP_VERSION << SRS_RTSP_CRLF + << "Via: " << SRS_SIP_VERSION << "/UDP " << req->host << ":" << req->host_port << ";branch=" << req->branch << SRS_RTSP_CRLF + << "From: serial << "@" << req->host + ":" << req->host_port << ">;tag=" << req->from_tag << SRS_RTSP_CRLF + << "To: sip_auth_id << "@" << req->realm << ">\r\n" + << "Call-ID: " << req->call_id << SRS_RTSP_CRLF + << "CSeq: " << req->seq << " " << req->method << SRS_RTSP_CRLF + << "Max-Forwards: 70" << SRS_RTSP_CRLF + << "User-Agent: "<< SRS_SIP_USER_AGENT << SRS_RTSP_CRLF + << "Content-Length: 0" << SRS_RTSP_CRLFCRLF; + + return srs_success; +} + +srs_error_t SrsSipStack::resp_status(stringstream& ss, SrsSipRequest *req) +{ + srs_error_t err = srs_success; + + if (req->method == "REGISTER"){ + ss << SRS_SIP_VERSION <<" 200 OK" << SRS_RTSP_CRLF + << "Via: " << req->via << SRS_RTSP_CRLF + << "From: from << ">" << SRS_RTSP_CRLF + << "To: to << ">" << SRS_RTSP_CRLF + << "CSeq: "<< req->seq << " " << req->method << SRS_RTSP_CRLF + << "Call-ID: " << req->call_id << SRS_RTSP_CRLF + << "Contact: " << req->contact << SRS_RTSP_CRLF + << "User-Agent: " << SRS_SIP_USER_AGENT << SRS_RTSP_CRLF + << "Content-Length: 0" << SRS_RTSP_CRLFCRLF; + }else{ + ss << SRS_SIP_VERSION <<" 200 OK" << SRS_RTSP_CRLF + << "Via: " << req->via << SRS_RTSP_CRLF + << "From: from << ">" << SRS_RTSP_CRLF + << "To: to << ">" << SRS_RTSP_CRLF + << "CSeq: "<< req->seq << " " << req->method << SRS_RTSP_CRLF + << "Call-ID: " << req->call_id << SRS_RTSP_CRLF + << "User-Agent: " << SRS_SIP_USER_AGENT << SRS_RTSP_CRLF + << "Content-Length: 0" << SRS_RTSP_CRLFCRLF; + } + + return err; +} + +srs_error_t SrsSipStack::req_invite(stringstream& ss, SrsSipRequest *req, int port) +{ + /* + INVITE sip:34020000001320000001@3402000000 SIP/2.0 + Via: SIP/2.0/UDP 192.168.1.22:15060;rport;branch=z9hG4bK369961166 + From: ;tag=536961166 + To: + Call-ID: 929961057 + CSeq: 3 INVITE + Content-Type: APPLICATION/SDP + Contact: + Max-Forwards: 70 + User-Agent: XXXXXXX XXXXXXX + Subject: 34020000001320000001:0200000001,34020000002020000001:0 + Content-Length: 247 + + v=0 + o=34020000002000000001 0 0 IN IP4 192.168.1.23 + s=Play + c=IN IP4 192.168.1.23 + t=0 0 + m=video 30000 RTP/AVP 96 97 98 99 + a=recvonly + a=rtpmap:96 PS/90000 + a=rtpmap:97 MPEG4/90000 + a=rtpmap:98 H264/90000 + a=rtpmap:99 H265/90000 + y=0200000001 + */ + + srs_error_t err = srs_success; + int ssrc = srs_sip_random(10000, 99999); + std::stringstream sdp; + sdp << "v=0" << SRS_RTSP_CRLF + << "o=" << req->sip_auth_id << " 0 0 IN IP4 " << req->host << SRS_RTSP_CRLF + << "s=Play" << SRS_RTSP_CRLF + << "c=IN IP4 " << req->host << SRS_RTSP_CRLF + << "t=0 0" << SRS_RTSP_CRLF + << "m=video " << port <<" RTP/AVP 96 97 98 99" << SRS_RTSP_CRLF + << "a=recvonly" << SRS_RTSP_CRLF + << "a=rtpmap:96 PS/90000" << SRS_RTSP_CRLF + << "a=rtpmap:97 MPEG4/90000" << SRS_RTSP_CRLF + << "a=rtpmap:98 H264/90000" << SRS_RTSP_CRLF + << "a=rtpmap:99 H265/90000" << SRS_RTSP_CRLF + << "y=00181" << ssrc << SRS_RTSP_CRLF; + + //<< "a=streamMode:MAIN\r\n" + //<< "a=filesize:0\r\n" + + + int rand = srs_sip_random(1000, 9999); + std::stringstream from, to, uri; + //"INVITE sip:34020000001320000001@3402000000 SIP/2.0\r\n + uri << "sip:" << req->sip_auth_id << "@" << req->realm; + //From: ;tag=500485%d\r\n + from << req->serial << "@" << req->host << ":" << req->host_port; + to << req->sip_auth_id << "@" << req->realm; + + req->from = from.str(); + req->to = to.str(); + req->uri = uri.str(); + + ss << "INVITE " << req->uri << " " << SRS_SIP_VERSION << SRS_RTSP_CRLF + << "Via: " << SRS_SIP_VERSION << "/UDP "<< req->host << ":" << req->host_port << ";rport;branch=z9hG4bK3420" << rand << SRS_RTSP_CRLF + << "From: from << ">;tag=51235" << rand << SRS_RTSP_CRLF + << "To: to << ">" << SRS_RTSP_CRLF + << "Call-ID: 20000" << rand <to << ">" << SRS_RTSP_CRLF + << "Max-Forwards: 70" << " \r\n" + << "User-Agent: " << SRS_SIP_USER_AGENT <sip_auth_id << ":00181" << ssrc << "," << req->serial << ":0" << SRS_RTSP_CRLF + << "Content-Length: " << sdp.str().length() << SRS_RTSP_CRLFCRLF + << sdp.str(); + + return err; +} + +srs_error_t SrsSipStack::req_bye(std::stringstream& ss, SrsSipRequest *req) +{ + srs_error_t err = srs_success; + + std::stringstream from, to, uri; + uri << "sip:" << req->sip_auth_id << "@" << req->realm; + from << req->serial << "@" << req->host << ":" << req->host_port; + to << req->sip_auth_id << "@" << req->realm; + + req->from = from.str(); + req->to = to.str(); + req->uri = uri.str(); + + int rand = srs_sip_random(1000, 9999); + ss << "BYE " << req->uri << " "<< SRS_SIP_VERSION << SRS_RTSP_CRLF + << "Via: "<< SRS_SIP_VERSION << "/UDP "<< req->host << ":" << req->host_port << ";branch=z9hG4bK3420" << rand << SRS_RTSP_CRLF + << "From: from << ">;tag=51235" << rand << SRS_RTSP_CRLF + << "To: to << ">" << SRS_RTSP_CRLF + << "Call-ID: 20000" << rand << SRS_RTSP_CRLF + << "CSeq: 21 BYE" << SRS_RTSP_CRLF + << "Max-Forwards: 70" << SRS_RTSP_CRLF + << "User-Agent: " << SRS_SIP_USER_AGENT << SRS_RTSP_CRLF + << "Content-Length: 0" << SRS_RTSP_CRLFCRLF; + + return err; +} + + +#endif + diff --git a/trunk/src/protocol/srs_sip_stack.hpp b/trunk/src/protocol/srs_sip_stack.hpp new file mode 100644 index 0000000000..f4868a7244 --- /dev/null +++ b/trunk/src/protocol/srs_sip_stack.hpp @@ -0,0 +1,143 @@ +/** + * The MIT License (MIT) + * + * Copyright (c) 2013-2020 Winlin + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +#ifndef SRS_PROTOCOL_SIP_HPP +#define SRS_PROTOCOL_SIP_HPP + +#include + +#if !defined(SRS_EXPORT_LIBRTMP) + +#include +#include + +#include +#include + +class SrsBuffer; +class SrsSimpleStream; +class SrsAudioFrame; + +// SIP methods +#define SRS_SIP_METHOD_REGISTER "REGISTER" +#define SRS_SIP_METHOD_MESSAGE "MESSAGE" +#define SRS_SIP_METHOD_INVITE "INVITE" +#define SRS_SIP_METHOD_ACK "ACK" +#define SRS_SIP_METHOD_BYE "BYE" + +// SIP-Version +#define SRS_SIP_VERSION "SIP/2.0" +#define SRS_SIP_USER_AGENT RTMP_SIG_SRS_SERVER + + +enum SrsSipCmdType{ + SrsSipCmdRequest=0, + SrsSipCmdRespone=1 +}; + +class SrsSipRequest +{ +public: + //sip header member + std::string method; + std::string uri; + std::string version; + std::string status; + + std::string via; + std::string from; + std::string to; + std::string from_tag; + std::string to_tag; + std::string branch; + + std::string call_id; + long seq; + + std::string contact; + std::string user_agent; + + std::string content_type; + long content_length; + + long expires; + int max_forwards; + +public: + std::string serial; + std::string realm; + std::string sip_auth_id; + std::string sip_auth_pwd; + std::string sip_username; + std::string peer_ip; + int peer_port; + std::string host; + int host_port; + SrsSipCmdType cmdtype; + +public: + SrsRtspSdp* sdp; + SrsRtspTransport* transport; +public: + SrsSipRequest(); + virtual ~SrsSipRequest(); +public: + virtual bool is_register(); + virtual bool is_invite(); + virtual bool is_message(); + virtual bool is_ack(); + virtual bool is_bye(); + + virtual void copy(SrsSipRequest* src); +public: + virtual std::string get_cmdtype_str(); +}; + +// The gb28181 sip protocol stack. +class SrsSipStack +{ +private: + // The cached bytes buffer. + SrsSimpleStream* buf; +public: + SrsSipStack(); + virtual ~SrsSipStack(); +public: + virtual srs_error_t parse_request(SrsSipRequest** preq, const char *recv_msg, int nb_buf); +protected: + virtual srs_error_t do_parse_request(SrsSipRequest* req, const char *recv_msg); + +public: + virtual srs_error_t resp_status(std::stringstream& ss, SrsSipRequest *req); + virtual srs_error_t resp_keepalive(std::stringstream& ss, SrsSipRequest *req); + virtual srs_error_t resp_ack(std::stringstream& ss, SrsSipRequest *req); + + virtual srs_error_t req_invite(std::stringstream& ss, SrsSipRequest *req, int port); + virtual srs_error_t req_bye(std::stringstream& ss, SrsSipRequest *req); + +}; + +#endif + +#endif + diff --git a/trunk/src/service/srs_service_st.cpp b/trunk/src/service/srs_service_st.cpp index f63cd42794..27d860efb5 100644 --- a/trunk/src/service/srs_service_st.cpp +++ b/trunk/src/service/srs_service_st.cpp @@ -397,6 +397,11 @@ int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, in return st_recvfrom((st_netfd_t)stfd, buf, len, from, fromlen, (st_utime_t)timeout); } +int srs_sendto(srs_netfd_t stfd, void *msg, int len, struct sockaddr *to, int tolen, srs_utime_t timeout) +{ + return st_sendto((st_netfd_t)stfd, (const void*)msg, len, (const struct sockaddr *)to, tolen, (st_utime_t)timeout); +} + srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout) { return (srs_netfd_t)st_accept((st_netfd_t)stfd, addr, addrlen, (st_utime_t)timeout); diff --git a/trunk/src/service/srs_service_st.hpp b/trunk/src/service/srs_service_st.hpp index 510b9ba8ac..234b3a9a8c 100644 --- a/trunk/src/service/srs_service_st.hpp +++ b/trunk/src/service/srs_service_st.hpp @@ -88,6 +88,7 @@ extern srs_netfd_t srs_netfd_open_socket(int osfd); extern srs_netfd_t srs_netfd_open(int osfd); extern int srs_recvfrom(srs_netfd_t stfd, void *buf, int len, struct sockaddr *from, int *fromlen, srs_utime_t timeout); +extern int srs_sendto(srs_netfd_t stfd, void *msg, int len, struct sockaddr *to, int tolen, srs_utime_t timeout); extern srs_netfd_t srs_accept(srs_netfd_t stfd, struct sockaddr *addr, int *addrlen, srs_utime_t timeout); From 8b4f84e3364fb4ac8a646e4d287f82b78d8c5cd1 Mon Sep 17 00:00:00 2001 From: xialixin Date: Fri, 20 Mar 2020 10:38:38 +0800 Subject: [PATCH 2/2] fix push.gb28181.conf, gb28181conn repeat call serve() --- trunk/conf/push.gb28181.conf | 5 +++-- trunk/src/app/srs_app_gb28181.cpp | 29 ++++++++++++++-------------- trunk/src/protocol/srs_sip_stack.cpp | 7 +++++-- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/trunk/conf/push.gb28181.conf b/trunk/conf/push.gb28181.conf index b76887d987..cb2f848b2b 100644 --- a/trunk/conf/push.gb28181.conf +++ b/trunk/conf/push.gb28181.conf @@ -3,7 +3,8 @@ listen 1935; max_connections 1000; daemon off; -pid ./objs/srs28181.pid +pid ./objs/srs28181.pid; +srs_log_file ./objs/srs28181.log; srs_log_tank console; stream_caster { enabled on; @@ -23,7 +24,7 @@ stream_caster { #服务器端编号 #设备端配置编号需要与该值一致,否则无法注册 - serial 34020000002020000001; + serial 34020000002000000001; #服务器端域 realm 3402000000; diff --git a/trunk/src/app/srs_app_gb28181.cpp b/trunk/src/app/srs_app_gb28181.cpp index b4d647ce1c..46f7950ffc 100644 --- a/trunk/src/app/srs_app_gb28181.cpp +++ b/trunk/src/app/srs_app_gb28181.cpp @@ -571,8 +571,7 @@ srs_error_t SrsGb28181Conn::do_cycle() pprint->elapse(); if ((err = trd->pull()) != srs_success) { - //srs_trace("pull faild %s")); - //return srs_error_wrap(err, "sip cycle"); + return srs_error_wrap(err, "gb28181conn cycle"); } srs_utime_t now = srs_get_system_time(); @@ -616,8 +615,11 @@ srs_error_t SrsGb28181Conn::do_cycle() } if (pprint->can_print()) { - srs_trace("gb28181: client id=%s, status, druation reg=%u alive=%u invite=%u", + srs_trace("gb28181: client id=%s, druation reg=%u alive=%u invite=%u", session_id.c_str(), reg_duration, alive_duration, invite_duration); + + srs_trace("gb28181: client id=%s, status reg_status=%u alive_status=%u invite_status=%u", + session_id.c_str(), register_status, alive_status, invite_status); } srs_usleep(1000 * 1000); @@ -1188,12 +1190,11 @@ srs_error_t SrsGb28181Caster::on_udp_bytes(string peer_ip, int peer_port, } srs_assert(conn != NULL); - if (conn->register_status == Srs28181Unkonw) - { - conn->serve(); - }else{ - srs_trace("gb28181: %s client is register", req->sip_auth_id.c_str()); - } + // if (conn->register_status == Srs28181Unkonw) + // { + // }else{ + // srs_trace("gb28181: %s client is register", req->sip_auth_id.c_str()); + // } send_status(req, from, fromlen); conn->register_status = Srs28181RegisterOk; @@ -1204,13 +1205,13 @@ srs_error_t SrsGb28181Caster::on_udp_bytes(string peer_ip, int peer_port, }else if (req->is_message()) { SrsGb28181Conn* conn = fetch(req); if (!conn){ - srs_trace("gb28181: %s client not found", req->sip_auth_id.c_str()); + srs_trace("gb28181: %s client not registered", req->sip_auth_id.c_str()); return srs_success; } if (conn->register_status == Srs28181Unkonw) { send_bye(req, from, fromlen); - srs_trace("gb28181: %s client not register", req->sip_auth_id.c_str()); + srs_trace("gb28181: %s client not registered", req->sip_auth_id.c_str()); return srs_success; } @@ -1247,14 +1248,14 @@ srs_error_t SrsGb28181Caster::on_udp_bytes(string peer_ip, int peer_port, if (!conn){ send_bye(req, from, fromlen); - srs_trace("gb28181: %s client not found", req->sip_auth_id.c_str()); + srs_trace("gb28181: %s client not registered", req->sip_auth_id.c_str()); return srs_success; } if (conn->register_status == Srs28181Unkonw || conn->alive_status == Srs28181Unkonw) { send_bye(req, from, fromlen); - srs_trace("gb28181: %s client not register or alive", req->sip_auth_id.c_str()); + srs_trace("gb28181: %s client not registered or not alive", req->sip_auth_id.c_str()); return srs_success; } @@ -1274,7 +1275,7 @@ srs_error_t SrsGb28181Caster::on_udp_bytes(string peer_ip, int peer_port, send_status(req, from, fromlen); if (!conn){ - srs_trace("gb28181: %s client not found", req->sip_auth_id.c_str()); + srs_trace("gb28181: %s client not registered", req->sip_auth_id.c_str()); return srs_success; } diff --git a/trunk/src/protocol/srs_sip_stack.cpp b/trunk/src/protocol/srs_sip_stack.cpp index aed80de1e8..e3031fd207 100644 --- a/trunk/src/protocol/srs_sip_stack.cpp +++ b/trunk/src/protocol/srs_sip_stack.cpp @@ -297,10 +297,10 @@ srs_error_t SrsSipStack::do_parse_request(SrsSipRequest* req, const char* recv_m srs_trace("sip: message head %s content=%s", phead, content.c_str()); } else if (!strcasecmp(phead, "content-length:")) { - srs_trace("sip: message head %s content=%s", phead, content.c_str()); + req->content_length = strtoul(content.c_str(), NULL, 10); } else if (!strcasecmp(phead, "content-type:")) { - srs_trace("sip: message head %s content=%s", phead, content.c_str()); + req->content_type = content; } else if (!strcasecmp(phead, "cseq:")) { std::vector vec_seq = srs_string_split(content, " "); @@ -331,6 +331,9 @@ srs_error_t SrsSipStack::do_parse_request(SrsSipRequest* req, const char* recv_m else if (!strcasecmp(phead, "user-agent:")){ req->user_agent = content; } + else if (!strcasecmp(phead, "max-forwards:")){ + req->max_forwards = strtoul(content.c_str(), NULL, 10); + } else { srs_trace("sip: unkonw message head %s content=%s", phead, content.c_str()); }