From 1445086451d70d20738f04bf9896c437e1d5c24c Mon Sep 17 00:00:00 2001 From: winlin Date: Sat, 21 Feb 2015 19:14:05 +0800 Subject: [PATCH] for #179, update the metadata of flv dvr file. --- trunk/src/app/srs_app_dvr.cpp | 224 ++++++++++++++++++++++++-- trunk/src/app/srs_app_dvr.hpp | 36 ++++- trunk/src/kernel/srs_kernel_file.cpp | 5 + trunk/src/kernel/srs_kernel_file.hpp | 1 + trunk/src/protocol/srs_rtmp_amf0.cpp | 9 +- trunk/src/protocol/srs_rtmp_amf0.hpp | 4 + trunk/src/protocol/srs_rtmp_stack.cpp | 13 +- trunk/src/protocol/srs_rtmp_stack.hpp | 1 + 8 files changed, 269 insertions(+), 24 deletions(-) diff --git a/trunk/src/app/srs_app_dvr.cpp b/trunk/src/app/srs_app_dvr.cpp index bdacd0c14e..68a6620553 100644 --- a/trunk/src/app/srs_app_dvr.cpp +++ b/trunk/src/app/srs_app_dvr.cpp @@ -38,6 +38,11 @@ using namespace std; #include #include #include +#include +#include + +// update the flv duration and filesize every this interval in ms. +#define __SRS_DVR_UPDATE_DURATION_INTERVAL 60000 SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p) { @@ -58,6 +63,9 @@ SrsFlvSegment::SrsFlvSegment(SrsDvrPlan* p) stream_previous_pkt_time = -1; stream_duration = 0; + duration_offset = 0; + filesize_offset = 0; + _srs_config->subscribe(this); } @@ -150,6 +158,10 @@ int SrsFlvSegment::open(bool use_tmp_file) return ret; } } + + // update the duration and filesize offset. + duration_offset = 0; + filesize_offset = 0; srs_trace("dvr stream %s to file %s", req->stream.c_str(), path.c_str()); @@ -164,6 +176,11 @@ int SrsFlvSegment::close() if (!fs->is_open()) { return ret; } + + // update duration and filesize. + if ((ret = update_flv_metadata()) != ERROR_SUCCESS) { + return ret; + } fs->close(); @@ -203,17 +220,61 @@ int SrsFlvSegment::close() return ret; } -int SrsFlvSegment::write_metadata(SrsOnMetaDataPacket* metadata) +int SrsFlvSegment::write_metadata(SrsSharedPtrMessage* metadata) { int ret = ERROR_SUCCESS; - - int size = 0; - char* payload = NULL; - if ((ret = metadata->encode(size, payload)) != ERROR_SUCCESS) { + + if (duration_offset || filesize_offset) { + return ret; + } + + SrsStream stream; + if ((ret = stream.initialize(metadata->payload, metadata->size)) != ERROR_SUCCESS) { + return ret; + } + + SrsAmf0Any* name = SrsAmf0Any::str(); + SrsAutoFree(SrsAmf0Any, name); + if ((ret = name->read(&stream)) != ERROR_SUCCESS) { return ret; } + + SrsAmf0Object* obj = SrsAmf0Any::object(); + SrsAutoFree(SrsAmf0Object, obj); + if ((ret = obj->read(&stream)) != ERROR_SUCCESS) { + return ret; + } + + // remove duration and filesize. + obj->set("filesize", NULL); + obj->set("duration", NULL); + + // add properties. + obj->set("service", SrsAmf0Any::str(RTMP_SIG_SRS_SERVER)); + obj->set("filesize", SrsAmf0Any::number(0)); + obj->set("duration", SrsAmf0Any::number(0)); + + int size = name->total_size() + obj->total_size(); + char* payload = new char[size]; SrsAutoFree(char, payload); + + // 11B flv header, 3B object EOF, 8B number value, 1B number flag. + duration_offset = fs->tellg() + size + 11 - SrsAmf0Size::object_eof() - SrsAmf0Size::number(); + // 2B string flag, 8B number value, 8B string 'duration', 1B number flag + filesize_offset = duration_offset - SrsAmf0Size::utf8("duration") - SrsAmf0Size::number(); + + // convert metadata to bytes. + if ((ret = stream.initialize(payload, size)) != ERROR_SUCCESS) { + return ret; + } + if ((ret = name->write(&stream)) != ERROR_SUCCESS) { + return ret; + } + if ((ret = obj->write(&stream)) != ERROR_SUCCESS) { + return ret; + } + // to flv file. if ((ret = enc->write_metadata(18, payload, size)) != ERROR_SUCCESS) { return ret; } @@ -287,6 +348,62 @@ int SrsFlvSegment::write_video(SrsSharedPtrMessage* __video) return ret; } +int SrsFlvSegment::update_flv_metadata() +{ + int ret = ERROR_SUCCESS; + + // no duration or filesize specified. + if (!duration_offset || !filesize_offset) { + return ret; + } + + int64_t cur = fs->tellg(); + + // buffer to write the size. + char* buf = new char[SrsAmf0Size::number()]; + SrsAutoFree(char, buf); + + SrsStream stream; + if ((ret = stream.initialize(buf, SrsAmf0Size::number())) != ERROR_SUCCESS) { + return ret; + } + + // filesize to buf. + SrsAmf0Any* size = SrsAmf0Any::number((double)cur); + SrsAutoFree(SrsAmf0Any, size); + + stream.skip(-1 * stream.pos()); + if ((ret = size->write(&stream)) != ERROR_SUCCESS) { + return ret; + } + + // update the flesize. + fs->lseek(filesize_offset); + if ((ret = fs->write(buf, SrsAmf0Size::number(), NULL)) != ERROR_SUCCESS) { + return ret; + } + + // duration to buf + SrsAmf0Any* dur = SrsAmf0Any::number((double)duration / 1000.0); + SrsAutoFree(SrsAmf0Any, dur); + + stream.skip(-1 * stream.pos()); + if ((ret = dur->write(&stream)) != ERROR_SUCCESS) { + return ret; + } + + // update the duration + fs->lseek(duration_offset); + if ((ret = fs->write(buf, SrsAmf0Size::number(), NULL)) != ERROR_SUCCESS) { + return ret; + } + + // reset the offset. + fs->lseek(cur); + + return ret; +} + string SrsFlvSegment::generate_path() { // the path in config, for example, @@ -498,7 +615,7 @@ int64_t SrsDvrPlan::filter_timestamp(int64_t timestamp) return timestamp; } -int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata) +int SrsDvrPlan::on_meta_data(SrsSharedPtrMessage* __metadata) { int ret = ERROR_SUCCESS; @@ -506,7 +623,7 @@ int SrsDvrPlan::on_meta_data(SrsOnMetaDataPacket* metadata) return ret; } - return segment->write_metadata(metadata); + return segment->write_metadata(__metadata); } int SrsDvrPlan::on_audio(SrsSharedPtrMessage* __audio) @@ -606,6 +723,7 @@ void SrsDvrSessionPlan::on_unpublish() SrsDvrAppendPlan::SrsDvrAppendPlan() { + last_update_time = 0; } SrsDvrAppendPlan::~SrsDvrAppendPlan() @@ -638,16 +756,74 @@ void SrsDvrAppendPlan::on_unpublish() { } +int SrsDvrAppendPlan::on_audio(SrsSharedPtrMessage* audio) +{ + int ret = ERROR_SUCCESS; + + if ((ret = update_duration(audio)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = SrsDvrPlan::on_audio(audio)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDvrAppendPlan::on_video(SrsSharedPtrMessage* video) +{ + int ret = ERROR_SUCCESS; + + if ((ret = update_duration(video)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = SrsDvrPlan::on_video(video)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + +int SrsDvrAppendPlan::update_duration(SrsSharedPtrMessage* msg) +{ + int ret = ERROR_SUCCESS; + + if (last_update_time <= 0) { + last_update_time = msg->timestamp; + return ret; + } + + if (msg->timestamp < last_update_time) { + last_update_time = msg->timestamp; + return ret; + } + + if (__SRS_DVR_UPDATE_DURATION_INTERVAL > msg->timestamp - last_update_time) { + return ret; + } + last_update_time = msg->timestamp; + + srs_assert(segment); + if (!segment->update_flv_metadata()) { + return ret; + } + + return ret; +} + SrsDvrSegmentPlan::SrsDvrSegmentPlan() { segment_duration = -1; - sh_video = sh_audio = NULL; + metadata = sh_video = sh_audio = NULL; } SrsDvrSegmentPlan::~SrsDvrSegmentPlan() { srs_freep(sh_video); srs_freep(sh_audio); + srs_freep(metadata); } int SrsDvrSegmentPlan::initialize(SrsSource* source, SrsRequest* req) @@ -695,6 +871,20 @@ void SrsDvrSegmentPlan::on_unpublish() { } +int SrsDvrSegmentPlan::on_meta_data(SrsSharedPtrMessage* __metadata) +{ + int ret = ERROR_SUCCESS; + + srs_freep(metadata); + metadata = __metadata->copy(); + + if ((ret = SrsDvrPlan::on_meta_data(__metadata)) != ERROR_SUCCESS) { + return ret; + } + + return ret; +} + int SrsDvrSegmentPlan::on_audio(SrsSharedPtrMessage* audio) { int ret = ERROR_SUCCESS; @@ -774,6 +964,9 @@ int SrsDvrSegmentPlan::update_duration(SrsSharedPtrMessage* msg) } // update sequence header + if (metadata && (ret = SrsDvrPlan::on_meta_data(metadata)) != ERROR_SUCCESS) { + return ret; + } if (sh_video && (ret = SrsDvrPlan::on_video(sh_video)) != ERROR_SUCCESS) { return ret; } @@ -828,8 +1021,19 @@ void SrsDvr::on_unpublish() int SrsDvr::on_meta_data(SrsOnMetaDataPacket* m) { int ret = ERROR_SUCCESS; - - if ((ret = plan->on_meta_data(m)) != ERROR_SUCCESS) { + + int size = 0; + char* payload = NULL; + if ((ret = m->encode(size, payload)) != ERROR_SUCCESS) { + return ret; + } + + SrsSharedPtrMessage metadata; + if ((ret = metadata.create(NULL, payload, size)) != ERROR_SUCCESS) { + return ret; + } + + if ((ret = plan->on_meta_data(&metadata)) != ERROR_SUCCESS) { return ret; } diff --git a/trunk/src/app/srs_app_dvr.hpp b/trunk/src/app/srs_app_dvr.hpp index a2eb8ec74f..e66ea9eb3d 100644 --- a/trunk/src/app/srs_app_dvr.hpp +++ b/trunk/src/app/srs_app_dvr.hpp @@ -66,6 +66,17 @@ class SrsFlvSegment : public ISrsReloadHandler SrsRtmpJitter* jitter; SrsRtmpJitterAlgorithm jitter_algorithm; SrsFileWriter* fs; +private: + /** + * the offset of file for duration value. + * the next 8 bytes is the double value. + */ + int64_t duration_offset; + /** + * the offset of file for filesize value. + * the next 8 bytes is the double value. + */ + int64_t filesize_offset; private: std::string tmp_flv_file; private: @@ -124,7 +135,7 @@ class SrsFlvSegment : public ISrsReloadHandler /** * write the metadata to segment. */ - virtual int write_metadata(SrsOnMetaDataPacket* metadata); + virtual int write_metadata(SrsSharedPtrMessage* metadata); /** * @param __audio, directly ptr, copy it if need to save it. */ @@ -133,6 +144,10 @@ class SrsFlvSegment : public ISrsReloadHandler * @param __video, directly ptr, copy it if need to save it. */ virtual int write_video(SrsSharedPtrMessage* __video); + /** + * update the flv metadata. + */ + virtual int update_flv_metadata(); private: /** * generate the flv segment path. @@ -178,7 +193,7 @@ class SrsDvrPlan /** * when got metadata. */ - virtual int on_meta_data(SrsOnMetaDataPacket* metadata); + virtual int on_meta_data(SrsSharedPtrMessage* __metadata); /** * @param __audio, directly ptr, copy it if need to save it. */ @@ -213,12 +228,24 @@ class SrsDvrSessionPlan : public SrsDvrPlan */ class SrsDvrAppendPlan : public SrsDvrPlan { +private: + int64_t last_update_time; public: SrsDvrAppendPlan(); virtual ~SrsDvrAppendPlan(); public: virtual int on_publish(); virtual void on_unpublish(); + /** + * @param audio, directly ptr, copy it if need to save it. + */ + virtual int on_audio(SrsSharedPtrMessage* audio); + /** + * @param video, directly ptr, copy it if need to save it. + */ + virtual int on_video(SrsSharedPtrMessage* video); +private: + virtual int update_duration(SrsSharedPtrMessage* msg); }; /** @@ -231,6 +258,7 @@ class SrsDvrSegmentPlan : public SrsDvrPlan int segment_duration; SrsSharedPtrMessage* sh_audio; SrsSharedPtrMessage* sh_video; + SrsSharedPtrMessage* metadata; public: SrsDvrSegmentPlan(); virtual ~SrsDvrSegmentPlan(); @@ -239,6 +267,10 @@ class SrsDvrSegmentPlan : public SrsDvrPlan virtual int on_publish(); virtual void on_unpublish(); /** + * when got metadata. + */ + virtual int on_meta_data(SrsSharedPtrMessage* __metadata); + /** * @param audio, directly ptr, copy it if need to save it. */ virtual int on_audio(SrsSharedPtrMessage* audio); diff --git a/trunk/src/kernel/srs_kernel_file.cpp b/trunk/src/kernel/srs_kernel_file.cpp index f5b08a5b6d..f8da03e248 100644 --- a/trunk/src/kernel/srs_kernel_file.cpp +++ b/trunk/src/kernel/srs_kernel_file.cpp @@ -116,6 +116,11 @@ bool SrsFileWriter::is_open() return fd > 0; } +void SrsFileWriter::lseek(int64_t offset) +{ + ::lseek(fd, (off_t)offset, SEEK_SET); +} + int64_t SrsFileWriter::tellg() { return (int64_t)::lseek(fd, 0, SEEK_CUR); diff --git a/trunk/src/kernel/srs_kernel_file.hpp b/trunk/src/kernel/srs_kernel_file.hpp index 31857ba13e..53a9f0e62c 100644 --- a/trunk/src/kernel/srs_kernel_file.hpp +++ b/trunk/src/kernel/srs_kernel_file.hpp @@ -54,6 +54,7 @@ class SrsFileWriter virtual void close(); public: virtual bool is_open(); + virtual void lseek(int64_t offset); virtual int64_t tellg(); public: /** diff --git a/trunk/src/protocol/srs_rtmp_amf0.cpp b/trunk/src/protocol/srs_rtmp_amf0.cpp index d5668ae267..6114181d56 100644 --- a/trunk/src/protocol/srs_rtmp_amf0.cpp +++ b/trunk/src/protocol/srs_rtmp_amf0.cpp @@ -448,11 +448,6 @@ SrsAmf0Any* SrsUnSortedHashtable::value_at(int index) void SrsUnSortedHashtable::set(string key, SrsAmf0Any* value) { - if (!value) { - srs_warn("add a NULL propertity %s", key.c_str()); - return; - } - std::vector::iterator it; for (it = properties.begin(); it != properties.end(); ++it) { @@ -467,7 +462,9 @@ void SrsUnSortedHashtable::set(string key, SrsAmf0Any* value) } } - properties.push_back(std::make_pair(key, value)); + if (value) { + properties.push_back(std::make_pair(key, value)); + } } SrsAmf0Any* SrsUnSortedHashtable::get_property(string name) diff --git a/trunk/src/protocol/srs_rtmp_amf0.hpp b/trunk/src/protocol/srs_rtmp_amf0.hpp index c3b32159ce..d92676753c 100644 --- a/trunk/src/protocol/srs_rtmp_amf0.hpp +++ b/trunk/src/protocol/srs_rtmp_amf0.hpp @@ -794,6 +794,10 @@ namespace _srs_internal virtual std::string key_at(int index); virtual const char* key_raw_at(int index); virtual SrsAmf0Any* value_at(int index); + /** + * set the value of hashtable. + * @param value, the value to set. NULL to delete the property. + */ virtual void set(std::string key, SrsAmf0Any* value); public: virtual SrsAmf0Any* get_property(std::string name); diff --git a/trunk/src/protocol/srs_rtmp_stack.cpp b/trunk/src/protocol/srs_rtmp_stack.cpp index 59dd5a0b75..9a33b8a3cc 100644 --- a/trunk/src/protocol/srs_rtmp_stack.cpp +++ b/trunk/src/protocol/srs_rtmp_stack.cpp @@ -434,7 +434,6 @@ int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int si { int ret = ERROR_SUCCESS; - srs_assert(pheader != NULL); if (ptr) { ret = ERROR_SYSTEM_ASSERT_FAILED; srs_error("should not set the payload twice. ret=%d", ret); @@ -446,11 +445,13 @@ int SrsSharedPtrMessage::create(SrsMessageHeader* pheader, char* payload, int si ptr = new __SrsSharedPtr(); // direct attach the data. - ptr->header.message_type = pheader->message_type; - ptr->header.payload_length = size; - ptr->header.perfer_cid = pheader->perfer_cid; - this->timestamp = pheader->timestamp; - this->stream_id = pheader->stream_id; + if (pheader) { + ptr->header.message_type = pheader->message_type; + ptr->header.payload_length = size; + ptr->header.perfer_cid = pheader->perfer_cid; + this->timestamp = pheader->timestamp; + this->stream_id = pheader->stream_id; + } ptr->payload = payload; ptr->size = size; diff --git a/trunk/src/protocol/srs_rtmp_stack.hpp b/trunk/src/protocol/srs_rtmp_stack.hpp index 10ede8c41b..be96b51c73 100644 --- a/trunk/src/protocol/srs_rtmp_stack.hpp +++ b/trunk/src/protocol/srs_rtmp_stack.hpp @@ -283,6 +283,7 @@ class SrsSharedPtrMessage * create shared ptr message, * from the header and payload. * @remark user should never free the payload. + * @param pheader, the header to copy to the message. NULL to ignore. */ virtual int create(SrsMessageHeader* pheader, char* payload, int size); /**