diff --git a/base/CMakeLists.txt b/base/CMakeLists.txt index a0891c043..864ad0c80 100755 --- a/base/CMakeLists.txt +++ b/base/CMakeLists.txt @@ -195,6 +195,9 @@ SET(CORE_FILES src/MotionVectorExtractor.cpp src/OverlayModule.cpp src/OrderedCacheOfFiles.cpp + src/SimpleControlModule.cpp + src/APErrorObject.cpp + src/APHealthObject.cpp ) SET(CORE_FILES_H @@ -257,6 +260,10 @@ SET(CORE_FILES_H include/OrderedCacheOfFiles.h include/TestSignalGeneratorSrc.h include/AbsControlModule.h + include/SimpleControlModule.h + include/APErrorObject.h + include/APCallback.h + include/APHealthObject.h ) IF(ENABLE_WINDOWS) diff --git a/base/include/AIPExceptions.h b/base/include/AIPExceptions.h index 6cb7d8849..6488c6128 100755 --- a/base/include/AIPExceptions.h +++ b/base/include/AIPExceptions.h @@ -38,6 +38,7 @@ #define MP4_OCOF_INVALID_DUR 7823 #define MP4_UNEXPECTED_STATE 7824 #define MODULE_ENROLLMENT_FAILED 7825 +#define CTRL_MODULE_INVALID_STATE 7826 #define AIPException_LOG_SEV(severity,type) for(std::ostringstream stream; Logger::getLogger()->push(severity, stream);) Logger::getLogger()->aipexceptionPre(stream, severity,type) diff --git a/base/include/APCallback.h b/base/include/APCallback.h new file mode 100644 index 000000000..ebdce2160 --- /dev/null +++ b/base/include/APCallback.h @@ -0,0 +1,7 @@ +#pragma once +#include "APErrorObject.h" +#include "APHealthObject.h" +#include + +using APErrorCallback = std::function; +using APHealthCallback = std::function; diff --git a/base/include/APErrorObject.h b/base/include/APErrorObject.h new file mode 100644 index 000000000..d266b1bb5 --- /dev/null +++ b/base/include/APErrorObject.h @@ -0,0 +1,28 @@ +#pragma once +#include + +class APErrorObject { +private: + int mErrorCode; + std::string mErrorMessage; + std::string mModuleName; + std::string mModuleId; + std::string mTimestamp; + + std::string getCurrentTimestamp() const; + +public: + APErrorObject(int errCode, const std::string &errorMsg); + + int getErrorCode() const; + std::string getErrorMessage() const; + std::string getModuleName() const; + std::string getModuleId() const; + std::string getTimestamp() const; + + void displayError() const; + void setErrorCode(int errCode); + void setErrorMessage(const std::string &errorMsg); + void setModuleName(const std::string &modName); + void setModuleId(const std::string &modId); +}; diff --git a/base/include/APHealthObject.h b/base/include/APHealthObject.h new file mode 100644 index 000000000..8e8798194 --- /dev/null +++ b/base/include/APHealthObject.h @@ -0,0 +1,19 @@ +#pragma once +#include + +class APHealthObject +{ +private: + std::string mModuleId; + std::string mTimestamp; + + std::string getCurrentTimestamp() const; + +public: + APHealthObject(const std::string &modId); + + std::string getModuleId() const; + std::string getTimestamp() const; + + void setModuleId(const std::string &modId); +}; diff --git a/base/include/AbsControlModule.h b/base/include/AbsControlModule.h index ad8aa6b34..e0d04aea3 100644 --- a/base/include/AbsControlModule.h +++ b/base/include/AbsControlModule.h @@ -1,4 +1,5 @@ #pragma once +#include "APCallback.h" #include "Command.h" #include "Module.h" #include @@ -6,35 +7,39 @@ class PipeLine; class AbsControlModuleProps : public ModuleProps { public: - AbsControlModuleProps() {} + AbsControlModuleProps() {} }; class AbsControlModule : public Module { public: - AbsControlModule(AbsControlModuleProps _props); - ~AbsControlModule(); - bool init(); - bool term(); - std::string enrollModule(std::string pName, std::string role, - boost::shared_ptr module); - std::pair> getModuleofRole(std::string pName, - std::string role); - virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {} - virtual void handleMMQExport(Command cmd, bool priority = false) {} - virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {} - virtual void handleSendMMQTSCmd(uint64_t mmqBeginTS, uint64_t mmqEndTS, bool priority = false) {} - virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {} - virtual void handleGoLive(bool goLive, bool priority) {} - virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {} - boost::container::deque> pipelineModules; - std::map> moduleRoles; + AbsControlModule(AbsControlModuleProps _props); + ~AbsControlModule(); + bool init(); + bool term(); + bool enrollModule(std::string role, boost::shared_ptr module); + boost::shared_ptr getModuleofRole(std::string role); + virtual void handleMp4MissingVideotrack(std::string previousVideoFile, std::string nextVideoFile) {} + virtual void handleMMQExport(Command cmd, bool priority = false) {} + virtual void handleMMQExportView(uint64_t startTS, uint64_t endTS = 9999999999999, bool playabckDirection = true, bool Mp4ReaderExport = false, bool priority = false) {} + virtual void handleSendMMQTSCmd(uint64_t mmqBeginTS, uint64_t mmqEndTS, bool priority = false) {} + virtual void handleLastGtkGLRenderTS(uint64_t latestGtkGlRenderTS, bool priority) {} + virtual void handleGoLive(bool goLive, bool priority) {} + virtual void handleDecoderSpeed(DecoderPlaybackSpeed cmd, bool priority) {} + boost::container::deque> pipelineModules; + std::map> moduleRoles; + virtual void handleError(const APErrorObject &error) {} + virtual void handleHealthCallback(const APHealthObject &healthObj) {} + protected: - bool process(frame_container &frames); - bool handleCommand(Command::CommandType type, frame_sp &frame); - bool handlePropsChange(frame_sp &frame); + bool process(frame_container& frames); + bool handleCommand(Command::CommandType type, frame_sp& frame); + bool handlePropsChange(frame_sp& frame); + virtual void sendEOS() {} + virtual void sendEOS(frame_sp& frame) {} + virtual void sendEOPFrame() {} private: - class Detail; - boost::shared_ptr mDetail; + class Detail; + boost::shared_ptr mDetail; }; \ No newline at end of file diff --git a/base/include/BrightnessContrastControlXform.h b/base/include/BrightnessContrastControlXform.h index 3eb54ec42..331bc56be 100644 --- a/base/include/BrightnessContrastControlXform.h +++ b/base/include/BrightnessContrastControlXform.h @@ -44,7 +44,6 @@ class BrightnessContrastControl : public Module bool validateInputPins(); bool validateOutputPins(); void addInputPin(framemetadata_sp &metadata, string &pinId); - void setProps(BrightnessContrastControl); bool handlePropsChange(frame_sp &frame); private: diff --git a/base/include/GtkGlRenderer.h b/base/include/GtkGlRenderer.h index 8c8d5f880..5056eaade 100644 --- a/base/include/GtkGlRenderer.h +++ b/base/include/GtkGlRenderer.h @@ -4,9 +4,10 @@ #include #include -class GtkGlRendererProps : public ModuleProps { +class GtkGlRendererProps : public ModuleProps +{ public: - GtkGlRendererProps(GtkWidget* _glArea, int _windowWidth, int _windowHeight, bool _isPlaybackRenderer = true) : ModuleProps() // take gtk string + GtkGlRendererProps(GtkWidget *_glArea, int _windowWidth, int _windowHeight, bool _isPlaybackRenderer = true) : ModuleProps() // take gtk string { // gladeFileName = _gladeFileName; glArea = _glArea; @@ -14,33 +15,33 @@ class GtkGlRendererProps : public ModuleProps { windowHeight = _windowHeight; isPlaybackRenderer = _isPlaybackRenderer; } - GtkWidget* glArea; + GtkWidget *glArea; int windowWidth = 0; int windowHeight = 0; bool isPlaybackRenderer = true; }; -class GtkGlRenderer : public Module { +class GtkGlRenderer : public Module +{ public: - GtkGlRenderer(GtkGlRendererProps props); - ~GtkGlRenderer(); - - bool init(); - bool term(); - bool changeProps(GtkWidget *glArea, int windowWidth, int windowHeight); - + GtkGlRenderer(GtkGlRendererProps props); + ~GtkGlRenderer(); + bool init(); + bool term(); + bool changeProps(GtkWidget *glArea, int windowWidth, int windowHeight); protected: - bool process(frame_container& frames); + bool process(frame_container &frames); bool processSOS(frame_sp &frame); bool validateInputPins(); bool shouldTriggerSOS(); bool handleCommand(Command::CommandType type, frame_sp &frame); void pushFrame(frame_sp frame); + private: - class Detail; - boost::shared_ptr mDetail; - std::chrono::steady_clock::time_point lastFrameTime = - std::chrono::steady_clock::now(); - std::queue frameQueue; - std::mutex queueMutex; + class Detail; + boost::shared_ptr mDetail; + std::chrono::steady_clock::time_point lastFrameTime = + std::chrono::steady_clock::now(); + std::queue frameQueue; + std::mutex queueMutex; }; diff --git a/base/include/ImageEncoderCV.h b/base/include/ImageEncoderCV.h index 918d536d6..362f76c2e 100644 --- a/base/include/ImageEncoderCV.h +++ b/base/include/ImageEncoderCV.h @@ -19,7 +19,6 @@ class ImageEncoderCV : public Module virtual ~ImageEncoderCV(); bool init(); bool term(); - protected: bool process(frame_container& frames); bool processSOS(frame_sp& frame); @@ -27,7 +26,6 @@ class ImageEncoderCV : public Module bool validateOutputPins(); private: - void setMetadata(framemetadata_sp& metadata); int mFrameType; ImageEncoderCVProps props; class Detail; diff --git a/base/include/Module.h b/base/include/Module.h index 409fcbd50..842dd2df7 100644 --- a/base/include/Module.h +++ b/base/include/Module.h @@ -19,6 +19,7 @@ #include "FIndexStrategy.h" #include "Command.h" #include "BufferMaker.h" +#include "APCallback.h" using namespace std; @@ -28,399 +29,453 @@ class PaceMaker; class ModuleProps { -public: - enum FrameFetchStrategy { - PUSH, - PULL - }; - ModuleProps() - { - fps = 60; - qlen = 20; - logHealth = false; - logHealthFrequency = 1000; - quePushStrategyType = QuePushStrategy::BLOCKING; - maxConcurrentFrames = 0; - fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; - frameFetchStrategy = FrameFetchStrategy::PUSH; - } - - ModuleProps(float _fps) - { - fps = _fps; - qlen = 20; - logHealth = false; - logHealthFrequency = 1000; - quePushStrategyType = QuePushStrategy::BLOCKING; - maxConcurrentFrames = 0; - fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; - frameFetchStrategy = FrameFetchStrategy::PUSH; - } - - ModuleProps(float _fps, size_t _qlen, bool _logHealth) - { - fps = _fps; - qlen = _qlen; - logHealth = _logHealth; - logHealthFrequency = 1000; - quePushStrategyType = QuePushStrategy::BLOCKING; - maxConcurrentFrames = 0; - fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; - frameFetchStrategy = FrameFetchStrategy::PUSH; - } - - ModuleProps(FrameFetchStrategy _frameFetchStrategy) - { - fps = 60; - qlen = 20; - logHealth = false; - logHealthFrequency = 1000; - quePushStrategyType = QuePushStrategy::BLOCKING; - maxConcurrentFrames = 0; - fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; - frameFetchStrategy = _frameFetchStrategy; - } - - size_t getQLen() - { - return qlen; - } - - virtual size_t getSerializeSize() - { - // 1024 is for boost serialize - return 1024 + sizeof(fps) + sizeof(qlen) + sizeof(logHealth) + sizeof(logHealthFrequency) + sizeof(maxConcurrentFrames) + sizeof(skipN) + sizeof(skipD) + sizeof(quePushStrategyType) + sizeof(fIndexStrategyType); - } - - float fps; // can be updated during runtime with setProps - size_t qlen; // run time changing doesn't effect this - bool logHealth; // can be updated during runtime with setProps - int logHealthFrequency; // 1000 by default - logs the health stats frequency - - // used for VimbaSource where we want to create the max frames and keep recycling it - // for the VimbaDrive we announce frames after init - 100/200 - // see VimbaSource.cpp on how it is used - size_t maxConcurrentFrames; - - // 0/1 - skipN == 0 - don't skip any - process all - // 1/1 - skipN == skipD - skip all - don't process any - // 1/2 skips every alternate frame - // 1/3 skips 1 out of every 3 frames - // 2/3 skips 2 out of every 3 frames - // 5/6 skips 5 out of every 6 frames - // skipD >= skipN - int skipN = 0; - int skipD = 1; - //have one more enum and then in module.cpp dont call run if the enum is pull type. - FrameFetchStrategy frameFetchStrategy; - QuePushStrategy::QuePushStrategyType quePushStrategyType; - FIndexStrategy::FIndexStrategyType fIndexStrategyType; - +public: + enum FrameFetchStrategy + { + PUSH, + PULL + }; + ModuleProps() + { + fps = 60; + qlen = 20; + logHealth = false; + logHealthFrequency = 1000; + quePushStrategyType = QuePushStrategy::BLOCKING; + maxConcurrentFrames = 0; + fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; + frameFetchStrategy = FrameFetchStrategy::PUSH; + enableHealthCallBack = false; + healthUpdateIntervalInSec = 5; + } + + ModuleProps(float _fps) + { + fps = _fps; + qlen = 20; + logHealth = false; + logHealthFrequency = 1000; + quePushStrategyType = QuePushStrategy::BLOCKING; + maxConcurrentFrames = 0; + fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; + frameFetchStrategy = FrameFetchStrategy::PUSH; + enableHealthCallBack = false; + healthUpdateIntervalInSec = 5; + } + + ModuleProps(float _fps, size_t _qlen, bool _logHealth) + { + fps = _fps; + qlen = _qlen; + logHealth = _logHealth; + logHealthFrequency = 1000; + quePushStrategyType = QuePushStrategy::BLOCKING; + maxConcurrentFrames = 0; + fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; + frameFetchStrategy = FrameFetchStrategy::PUSH; + enableHealthCallBack = false; + healthUpdateIntervalInSec = 5; + } + + ModuleProps(FrameFetchStrategy _frameFetchStrategy) + { + fps = 60; + qlen = 20; + logHealth = false; + logHealthFrequency = 1000; + quePushStrategyType = QuePushStrategy::BLOCKING; + maxConcurrentFrames = 0; + fIndexStrategyType = FIndexStrategy::FIndexStrategyType::AUTO_INCREMENT; + frameFetchStrategy = _frameFetchStrategy; + enableHealthCallBack = false; + healthUpdateIntervalInSec = 5; + } + + size_t getQLen() + { + return qlen; + } + + virtual size_t getSerializeSize() + { + // 1024 is for boost serialize + return 1024 + sizeof(fps) + sizeof(qlen) + sizeof(logHealth) + + sizeof(logHealthFrequency) + sizeof(maxConcurrentFrames) + + sizeof(skipN) + sizeof(skipD) + sizeof(quePushStrategyType) + + sizeof(fIndexStrategyType) + sizeof(enableHealthCallBack); + } + + float fps; // can be updated during runtime with setProps + size_t qlen; // run time changing doesn't effect this + bool logHealth; // can be updated during runtime with setProps + int logHealthFrequency; // 1000 by default - logs the health stats frequency + + // used for VimbaSource where we want to create the max frames and keep + // recycling it for the VimbaDrive we announce frames after init - 100/200 see + // VimbaSource.cpp on how it is used + size_t maxConcurrentFrames; + + // 0/1 - skipN == 0 - don't skip any - process all + // 1/1 - skipN == skipD - skip all - don't process any + // 1/2 skips every alternate frame + // 1/3 skips 1 out of every 3 frames + // 2/3 skips 2 out of every 3 frames + // 5/6 skips 5 out of every 6 frames + // skipD >= skipN + int skipN = 0; + int skipD = 1; + // have one more enum and then in module.cpp dont call run if the enum is pull + // type. + FrameFetchStrategy frameFetchStrategy; + QuePushStrategy::QuePushStrategyType quePushStrategyType; + FIndexStrategy::FIndexStrategyType fIndexStrategyType; + bool enableHealthCallBack; // ToEnable HealthCallback we need to set ModuleProps as true, Will get Callbacks only if ControlModule is there + int healthUpdateIntervalInSec; // Health Callback Interval Defined in Sec, if Value is + // set to 5, then it means after every 5 sec Control Module will receive health callback private: - friend class Module; - - friend class boost::serialization::access; - template - void serialize(Archive & ar, const unsigned int /* file_version */) { - ar & fps & qlen & logHealth & logHealthFrequency & maxConcurrentFrames & skipN & skipD & quePushStrategyType & fIndexStrategyType & frameFetchStrategy; - } + friend class Module; + + friend class boost::serialization::access; + template + void serialize(Archive &ar, const unsigned int /* file_version */) + { + ar & fps & qlen & logHealth & logHealthFrequency & maxConcurrentFrames & + skipN & skipD & quePushStrategyType & fIndexStrategyType & + frameFetchStrategy & enableHealthCallBack; + } }; -class Module { - +class Module +{ + public: - enum Kind { - SOURCE, - TRANSFORM, - SINK - }; - enum ModuleState { - Initialized, - Running, - EndOfStreamNormal, - EndOfStreamSocketError - }; - Module(Kind nature, string name, ModuleProps _props); - virtual ~Module(); - Kind getNature() { return myNature; } - string getName() { return myName; } - string getId() { return myId; } - double getPipelineFps(); - uint64_t getTickCounter(); - - string addOutputPin(framemetadata_sp& metadata); // throw exception - vector getAllOutputPinsByType(int type); - void addOutputPin(framemetadata_sp& metadata, string& pinId); - bool setNext(boost::shared_ptr next, vector& pinIdArr, bool open = true); - virtual bool setNext(boost::shared_ptr next, bool open = true, bool sieve = true); // take all the output pins - bool addFeedback(boost::shared_ptr next, vector& pinIdArr, bool open = true); - bool addFeedback(boost::shared_ptr next, bool open = true); // take all the output pins - boost_deque> getConnectedModules(); - - bool relay(boost::shared_ptr next, bool open, bool priority = false); - - virtual bool init(); - void operator()(); //to support boost::thread - virtual bool run(); - bool play(float speed, bool direction = true); - bool play(bool play); - bool queueStep(); - virtual bool step(); - virtual bool stop(); - virtual bool term(); - virtual bool isFull(); - bool isNextModuleQueFull(); - - void adaptQueue(boost::shared_ptr queAdapter); - - void register_consumer(boost::function, bool bFatal=false); - boost::shared_ptr getPacer() { return pacer; } - static frame_sp getFrameByType(frame_container& frames, int frameType); - virtual void flushQue(); - bool getPlayDirection() { return mDirection; } - virtual void flushQueRecursive(); + enum Kind + { + SOURCE, + TRANSFORM, + SINK, + CONTROL + }; + enum ModuleState + { + Initialized, + Running, + EndOfStreamNormal, + EndOfStreamSocketError + }; + Module(Kind nature, string name, ModuleProps _props); + virtual ~Module(); + Kind getNature() { return myNature; } + string getName() { return myName; } + string getId() { return myId; } + double getPipelineFps(); + uint64_t getTickCounter(); + + string addOutputPin(framemetadata_sp &metadata); // throw exception + vector getAllOutputPinsByType(int type); + void addOutputPin(framemetadata_sp &metadata, string &pinId); + bool setNext(boost::shared_ptr next, vector &pinIdArr, + bool open = true); + virtual bool setNext(boost::shared_ptr next, bool open = true, + bool sieve = true); // take all the output pins + bool addFeedback(boost::shared_ptr next, vector &pinIdArr, + bool open = true); + bool addFeedback(boost::shared_ptr next, + bool open = true); // take all the output pins + boost_deque> getConnectedModules(); + + bool relay(boost::shared_ptr next, bool open, bool priority = false); + + virtual bool init(); + void operator()(); // to support boost::thread + virtual bool run(); + bool play(float speed, bool direction = true); + bool play(bool play); + bool queueStep(); + virtual bool step(); + virtual bool stop(); + virtual bool term(); + virtual bool isFull(); + bool isNextModuleQueFull(); + + void adaptQueue(boost::shared_ptr queAdapter); + + void register_consumer(boost::function, bool bFatal = false); + boost::shared_ptr getPacer() { return pacer; } + static frame_sp getFrameByType(frame_container &frames, int frameType); + virtual void flushQue(); + bool getPlayDirection() { return mDirection; } + virtual void flushQueRecursive(); + virtual void addControlModule(boost::shared_ptr cModule); + void registerHealthCallback(APHealthCallback callback); + void executeErrorCallback(const APErrorObject &error); + void registerErrorCallback(APErrorCallback callback); + + ModuleProps getProps(); + protected: - virtual boost_deque getFrames(frame_container& frames); - virtual bool process(frame_container& frames) { return false; } - virtual bool processEOS(string& pinId) { return true; } // EOS is propagated in stepNonSource for every encountered EOSFrame - pinId is first stream in the map - virtual bool processSOS(frame_sp& frame) { return true; } // SOS is Start of Stream - virtual bool shouldTriggerSOS(); - virtual bool produce() { return false; } - bool stepNonSource(frame_container& frames); - bool preProcessNonSource(frame_container& frames); - bool isRunning() { return mRunning; } - - ModuleProps getProps(); - void setProps(ModuleProps& props); - void fillProps(ModuleProps& props); - template - void addPropsToQueue(T& props, bool priority = false) - { - auto size = props.getSerializeSize(); - auto frame = makeCommandFrame(size, mPropsChangeMetadata); - - // serialize - serialize(props, frame); - // add to que - frame_container frames; - frames.insert(make_pair("props_change", frame)); - if(!priority) - { - Module::push(frames); - } - else - { - Module::push_back(frames); - } - } - virtual bool handlePropsChange(frame_sp& frame); - virtual bool handleCommand(Command::CommandType type, frame_sp& frame); - template - bool handlePropsChange(frame_sp& frame, T& props) - { - //deserialize - deSerialize(props, frame); - - // set props - Module::setProps(props); - - return true; - } - - template - bool queueCommand(T& cmd, bool priority = false) - { - auto size = cmd.getSerializeSize(); - auto frame = makeCommandFrame(size, mCommandMetadata); - - Utils::serialize(cmd, frame->data(), size); - - // add to que - frame_container frames; - frames.insert(make_pair("command", frame)); - if(priority) - { - Module::push_back(frames); - } - else - { - Module::push(frames); - } - return true; - } - - template - void getCommand(T& cmd, frame_sp& frame) - { - Utils::deSerialize(cmd, frame->data(), frame->size()); - } - - bool queuePlayPauseCommand(PlayPauseCommand ppCmd, bool priority = false); - frame_sp makeCommandFrame(size_t size, framemetadata_sp& metadata); - frame_sp makeFrame(size_t size, string& pinId); - frame_sp makeFrame(size_t size); // use only if 1 output pin is there - frame_sp makeFrame(); - frame_sp makeFrame(frame_sp& bigFrame, size_t& newSize, string& pinId); - frame_sp getEOSFrame(); - frame_sp getEmptyFrame(); - - void setMetadata(std::string& pinId, framemetadata_sp& metadata); - - virtual bool send(frame_container& frames, bool forceBlockingPush=false); - virtual void sendEOS(); - virtual void sendEOS(frame_sp& frame); - virtual void sendMp4ErrorFrame(frame_sp& frame); - virtual void sendEoPFrame(); - - boost::function onStepFail; - //various behaviours for stepFail: - void ignore(int times); //do nothing - void stop_onStepfail(); - void emit_event(unsigned short eventID); //regular events - void emit_fatal(unsigned short eventID); //fatal events need a permanent handler - - friend class PipeLine; - - boost::function event_consumer; - boost::function fatal_event_consumer; - - enum ModuleState module_state; - void setModuleState(enum ModuleState es) { module_state = es; } - ModuleState getModuleState() { - return module_state; - } - - virtual bool validateInputPins(); // invoked with setInputPin - virtual bool validateOutputPins(); // invoked with addOutputPin - virtual bool validateInputOutputPins() { return validateInputPins() && validateOutputPins(); } // invoked during Module::init before anything else - - size_t getNumberOfOutputPins(bool implicit = true) - { - auto pinCount = mOutputPinIdFrameFactoryMap.size(); - // override the implicit behaviour - if (!implicit) - { - pinCount += mInputPinIdMetadataMap.size(); - } - return pinCount; - } - size_t getNumberOfInputPins() { return mInputPinIdMetadataMap.size(); } - framemetadata_sp getFirstInputMetadata(); - framemetadata_sp getFirstOutputMetadata(); - framemetadata_sp getOutputMetadata(string outPinID); - metadata_by_pin& getInputMetadata() { return mInputPinIdMetadataMap; } - framefactory_by_pin& getOutputFrameFactory() { return mOutputPinIdFrameFactoryMap; } - framemetadata_sp getInputMetadataByType(int type); - int getNumberOfInputsByType(int type); - int getNumberOfOutputsByType(int type); - framemetadata_sp getOutputMetadataByType(int type); - bool isMetadataEmpty(framemetadata_sp& metadata); - bool isFrameEmpty(frame_sp& frame); - string getInputPinIdByType(int type); - string getOutputPinIdByType(int type); - - bool setNext(boost::shared_ptr next, bool open, bool isFeedback, bool sieve); // take all the output pins - bool setNext(boost::shared_ptr next, vector& pinIdArr, bool open, bool isFeedback, bool sieve); - void addInputPin(framemetadata_sp& metadata, string& pinId, bool isFeedback); - virtual void addInputPin(framemetadata_sp& metadata, string& pinId); // throws exception if validation fails - boost::shared_ptr getQue() { return mQue; } - - bool getPlayState() { return mPlay; } - - // only for unit test - Connections getConnections() { return mConnections; } - - //following is useful for testing to know whats in queue - frame_container try_pop(); - frame_container pop(); - - bool processSourceQue(); - bool handlePausePlay(bool play); - virtual bool handlePausePlay(float speed = 1, bool direction = true); - virtual void notifyPlay(bool play) {} - - //makes buffers from frameFactory - class FFBufferMaker : public BufferMaker { - public: - FFBufferMaker(Module& module); - virtual void* make(size_t dataSize); - frame_sp getFrame() { - return frameIMade; - } - private: - Module& myModule; - frame_sp frameIMade; - }; - - FFBufferMaker createFFBufferMaker(); - boost::shared_ptr controlModule = nullptr; -private: - void setSieveDisabledFlag(bool sieve); - frame_sp makeFrame(size_t size, framefactory_sp& framefactory); - bool push(frame_container frameContainer); //exchanges the buffer - bool push_back(frame_container frameContainer); - bool try_push(frame_container frameContainer); //tries to exchange the buffer - - bool addEoPFrame(frame_container& frames); - bool handleStop(); - - template - void serialize(T& props, frame_sp& frame) - { - boost::iostreams::basic_array_sink device_sink((char*)frame->data(), frame->size()); - boost::iostreams::stream > s_sink(device_sink); - - boost::archive::binary_oarchive oa(s_sink); - oa << props; - } - - template - void deSerialize(T& props, frame_sp& frame) - { - boost::iostreams::basic_array_source device((char*)frame->data(), frame->size()); - boost::iostreams::stream > s(device); - boost::archive::binary_iarchive ia(s); - - ia >> props; - } - - bool shouldForceStep(); - bool shouldSkip(); - - bool isFeedbackEnabled(std::string& moduleId); // get pins and call - - bool mPlay; - bool mDirection; - float mSpeed; - uint32_t mForceStepCount; - int mSkipIndex; - Kind myNature; - string myName; - string myId; - boost::thread myThread; - boost::shared_ptr mQue; - bool mRunning; - uint32_t mStopCount; - uint32_t mForwardPins; - bool mIsSieveDisabledForAny = false; - boost::shared_ptr mpFrameFactory; - boost::shared_ptr mpCommandFactory; - boost::shared_ptr pacer; - - Connections mConnections; // For each module, all the required pins - map> mModules; - map mRelay; - - std::map mInputPinsDirection; - metadata_by_pin mInputPinIdMetadataMap; - framefactory_by_pin mOutputPinIdFrameFactoryMap; - std::shared_ptr mFIndexStrategy; - - class Profiler; - boost::shared_ptr mProfiler; - boost::shared_ptr mProps; - boost::shared_ptr mQuePushStrategy; - - framemetadata_sp mCommandMetadata; - framemetadata_sp mPropsChangeMetadata; -}; \ No newline at end of file + virtual boost_deque getFrames(frame_container &frames); + virtual bool process(frame_container &frames) { return false; } + virtual bool processEOS(string &pinId) { return true; } // EOS is propagated in stepNonSource for every encountered EOSFrame - pinId is first stream in the map + virtual bool processSOS(frame_sp &frame) { return true; } // SOS is Start of Stream + virtual bool shouldTriggerSOS(); + virtual bool produce() { return false; } + bool stepNonSource(frame_container &frames); + bool preProcessNonSource(frame_container &frames); + bool preProcessControl(frame_container &frames); + bool isRunning() { return mRunning; } + + void setProps(ModuleProps &props); + void fillProps(ModuleProps &props); + template + void addPropsToQueue(T &props, bool priority = false) + { + auto size = props.getSerializeSize(); + auto frame = makeCommandFrame(size, mPropsChangeMetadata); + + // serialize + serialize(props, frame); + // add to que + frame_container frames; + frames.insert(make_pair("props_change", frame)); + if (!priority) + { + Module::push(frames); + } + else + { + Module::push_back(frames); + } + } + virtual bool handlePropsChange(frame_sp &frame); + virtual bool handleCommand(Command::CommandType type, frame_sp &frame); + template + bool handlePropsChange(frame_sp &frame, T &props) + { + // deserialize + deSerialize(props, frame); + + // set props + Module::setProps(props); + + return true; + } + + template + bool queuePriorityCommand(T &cmd) + { + queueCommand(cmd, true); + } + + template + bool queueCommand(T &cmd, bool priority = false) + { + auto size = cmd.getSerializeSize(); + auto frame = makeCommandFrame(size, mCommandMetadata); + + Utils::serialize(cmd, frame->data(), size); + + // add to que + frame_container frames; + frames.insert(make_pair("command", frame)); + if (priority) + { + Module::push_back(frames); + } + else + { + Module::push(frames); + } + return true; + } + + template + void getCommand(T &cmd, frame_sp &frame) + { + Utils::deSerialize(cmd, frame->data(), frame->size()); + } + + bool queuePlayPauseCommand(PlayPauseCommand ppCmd, bool priority = false); + frame_sp makeCommandFrame(size_t size, framemetadata_sp &metadata); + frame_sp makeFrame(size_t size, string &pinId); + frame_sp makeFrame(size_t size); // use only if 1 output pin is there + frame_sp makeFrame(); + frame_sp makeFrame(frame_sp &bigFrame, size_t &newSize, string &pinId); + frame_sp getEOSFrame(); + frame_sp getEmptyFrame(); + + void setMetadata(std::string &pinId, framemetadata_sp &metadata); + + virtual bool send(frame_container &frames, bool forceBlockingPush = false); + virtual void sendEOS(); + virtual void sendEOS(frame_sp &frame); + virtual void sendMp4ErrorFrame(frame_sp &frame); + virtual void sendEoPFrame(); + + boost::function onStepFail; + // various behaviours for stepFail: + void ignore(int times); // do nothing + void stop_onStepfail(); + void emit_event(unsigned short eventID); // regular events + void + emit_fatal(unsigned short eventID); // fatal events need a permanent handler + + friend class PipeLine; + + boost::function event_consumer; + boost::function fatal_event_consumer; + + enum ModuleState module_state; + void setModuleState(enum ModuleState es) { module_state = es; } + ModuleState getModuleState() { return module_state; } + + virtual bool validateInputPins(); // invoked with setInputPin + virtual bool validateOutputPins(); // invoked with addOutputPin + virtual bool validateInputOutputPins() + { + return validateInputPins() && validateOutputPins(); + } // invoked during Module::init before anything else + + size_t getNumberOfOutputPins(bool implicit = true) + { + auto pinCount = mOutputPinIdFrameFactoryMap.size(); + // override the implicit behaviour + if (!implicit) + { + pinCount += mInputPinIdMetadataMap.size(); + } + return pinCount; + } + size_t getNumberOfInputPins() { return mInputPinIdMetadataMap.size(); } + framemetadata_sp getFirstInputMetadata(); + framemetadata_sp getFirstOutputMetadata(); + framemetadata_sp getOutputMetadata(string outPinID); + metadata_by_pin &getInputMetadata() { return mInputPinIdMetadataMap; } + framefactory_by_pin &getOutputFrameFactory() + { + return mOutputPinIdFrameFactoryMap; + } + framemetadata_sp getInputMetadataByType(int type); + int getNumberOfInputsByType(int type); + int getNumberOfOutputsByType(int type); + framemetadata_sp getOutputMetadataByType(int type); + bool isMetadataEmpty(framemetadata_sp &metadata); + bool isFrameEmpty(frame_sp &frame); + string getInputPinIdByType(int type); + string getOutputPinIdByType(int type); + + bool setNext(boost::shared_ptr next, bool open, bool isFeedback, + bool sieve); // take all the output pins + bool setNext(boost::shared_ptr next, vector &pinIdArr, + bool open, bool isFeedback, bool sieve); + void addInputPin(framemetadata_sp &metadata, string &pinId, bool isFeedback); + virtual void + addInputPin(framemetadata_sp &metadata, + string &pinId); // throws exception if validation fails + boost::shared_ptr getQue() { return mQue; } + + bool getPlayState() { return mPlay; } + + // only for unit test + Connections getConnections() { return mConnections; } + + // following is useful for testing to know whats in queue + frame_container try_pop(); + frame_container pop(); + + bool processSourceQue(); + bool handlePausePlay(bool play); + virtual bool handlePausePlay(float speed = 1, bool direction = true); + virtual void notifyPlay(bool play) {} + + // makes buffers from frameFactory + class FFBufferMaker : public BufferMaker + { + public: + FFBufferMaker(Module &module); + virtual void *make(size_t dataSize); + frame_sp getFrame() + { + return frameIMade; + } + + private: + Module &myModule; + frame_sp frameIMade; + }; + + FFBufferMaker createFFBufferMaker(); + boost::shared_ptr controlModule = nullptr; + +private: + void setSieveDisabledFlag(bool sieve); + frame_sp makeFrame(size_t size, framefactory_sp &framefactory); + bool push(frame_container frameContainer); // exchanges the buffer + bool push_back(frame_container frameContainer); + bool try_push(frame_container frameContainer); // tries to exchange the buffer + + bool addEoPFrame(frame_container &frames); + bool handleStop(); + + template + void serialize(T &props, frame_sp &frame) + { + boost::iostreams::basic_array_sink device_sink((char *)frame->data(), frame->size()); + boost::iostreams::stream> s_sink(device_sink); + + boost::archive::binary_oarchive oa(s_sink); + oa << props; + } + + template + void deSerialize(T &props, frame_sp &frame) + { + boost::iostreams::basic_array_source device((char *)frame->data(), frame->size()); + boost::iostreams::stream> s(device); + boost::archive::binary_iarchive ia(s); + + ia >> props; + } + + bool shouldForceStep(); + bool shouldSkip(); + + bool isFeedbackEnabled(std::string &moduleId); // get pins and call + + bool mPlay; + bool mDirection; + float mSpeed; + uint32_t mForceStepCount; + int mSkipIndex; + Kind myNature; + string myName; + string myId; + boost::thread myThread; + boost::shared_ptr mQue; + bool mRunning; + uint32_t mStopCount; + uint32_t mForwardPins; + bool mIsSieveDisabledForAny = false; + boost::shared_ptr mpFrameFactory; + boost::shared_ptr mpCommandFactory; + boost::shared_ptr pacer; + APErrorCallback mErrorCallback; + + Connections mConnections; // For each module, all the required pins + map> mModules; + map mRelay; + + std::map mInputPinsDirection; + metadata_by_pin mInputPinIdMetadataMap; + framefactory_by_pin mOutputPinIdFrameFactoryMap; + std::shared_ptr mFIndexStrategy; + + class Profiler; + boost::shared_ptr mProfiler; + boost::shared_ptr mProps; + boost::shared_ptr mQuePushStrategy; + + framemetadata_sp mCommandMetadata; + framemetadata_sp mPropsChangeMetadata; + APHealthCallback mHealthCallback; +}; diff --git a/base/include/SimpleControlModule.h b/base/include/SimpleControlModule.h new file mode 100644 index 000000000..be151463b --- /dev/null +++ b/base/include/SimpleControlModule.h @@ -0,0 +1,32 @@ +#pragma once +#include "AbsControlModule.h" + +class SimpleControlModuleProps : public AbsControlModuleProps { +public: + SimpleControlModuleProps() {} +}; + +class SimpleControlModule : public AbsControlModule +{ +public: + SimpleControlModule(SimpleControlModuleProps _props) : AbsControlModule(_props) + { + } + + ~SimpleControlModule() + { + + } + + void handleError(const APErrorObject &error); + void handleHealthCallback(const APHealthObject &healthObj); + + // ErrorCallbacks +protected: + void sendEOS(); + void sendEOS(frame_sp& frame); + void sendEOPFrame(); +private: + class Detail; + boost::shared_ptr mDetail; +}; diff --git a/base/include/TextOverlayXForm.h b/base/include/TextOverlayXForm.h index 0baff6b19..77ac9d786 100644 --- a/base/include/TextOverlayXForm.h +++ b/base/include/TextOverlayXForm.h @@ -46,7 +46,6 @@ class TextOverlayXForm : public Module bool validateInputPins(); bool validateOutputPins(); void addInputPin(framemetadata_sp &metadata, string &pinId); - void setProps(TextOverlayXForm); bool handlePropsChange(frame_sp &frame); private: diff --git a/base/include/VirtualPTZ.h b/base/include/VirtualPTZ.h index 973b69836..0d638cfd8 100644 --- a/base/include/VirtualPTZ.h +++ b/base/include/VirtualPTZ.h @@ -52,7 +52,6 @@ class VirtualPTZ : public Module bool validateInputPins(); bool validateOutputPins(); void addInputPin(framemetadata_sp &metadata, string &pinId); - void setProps(VirtualPTZ); bool handlePropsChange(frame_sp &frame); private: diff --git a/base/src/APErrorObject.cpp b/base/src/APErrorObject.cpp new file mode 100644 index 000000000..3ca183c1d --- /dev/null +++ b/base/src/APErrorObject.cpp @@ -0,0 +1,58 @@ +#include "APErrorObject.h" +#include "Logger.h" +#include +#include +#include +#include +#include + +APErrorObject::APErrorObject(int errCode, const std::string &errorMsg) + : mErrorCode(errCode), mErrorMessage(errorMsg), mModuleName(""), + mModuleId("") +{ + mTimestamp = getCurrentTimestamp(); +} + +int APErrorObject::getErrorCode() const { return mErrorCode; } + +std::string APErrorObject::getCurrentTimestamp() const +{ + auto now = std::chrono::system_clock::now(); + std::time_t now_time = std::chrono::system_clock::to_time_t(now); + std::tm tm = *std::localtime(&now_time); + std::stringstream ss; + ss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S"); + return ss.str(); +} + +std::string APErrorObject::getErrorMessage() const { return mErrorMessage; } + +std::string APErrorObject::getModuleName() const { return mModuleName; } + +std::string APErrorObject::getModuleId() const { return mModuleId; } + +std::string APErrorObject::getTimestamp() const { return mTimestamp; } + +void APErrorObject::displayError() const +{ + LOG_ERROR << "Module Name < " << mModuleName << " > Module Id < " << mModuleId + << " > Time Stamp < " << mTimestamp << " > Error Message < " + << mErrorMessage << " >"; +} + +void APErrorObject::setErrorCode(int errCode) +{ + mErrorCode = errCode; +} + +void APErrorObject::setErrorMessage(const std::string &errorMsg) +{ + mErrorMessage = errorMsg; +} + +void APErrorObject::setModuleName(const std::string &modName) +{ + mModuleName = modName; +} + +void APErrorObject::setModuleId(const std::string &modId) { mModuleId = modId; } diff --git a/base/src/APHealthObject.cpp b/base/src/APHealthObject.cpp new file mode 100644 index 000000000..d5fbf2d23 --- /dev/null +++ b/base/src/APHealthObject.cpp @@ -0,0 +1,25 @@ +#include "APHealthObject.h" +#include "Logger.h" +#include +#include + +APHealthObject::APHealthObject(const std::string &modId) : mModuleId(modId) +{ + mTimestamp = getCurrentTimestamp(); +} + +std::string APHealthObject::getCurrentTimestamp() const +{ + auto now = std::chrono::system_clock::now(); + std::time_t now_time = std::chrono::system_clock::to_time_t(now); + std::tm tm = *std::localtime(&now_time); + std::stringstream ss; + ss << std::put_time(&tm, "%Y-%m-%d %H:%M:%S"); + return ss.str(); +} + +std::string APHealthObject::getModuleId() const { return mModuleId; } + +std::string APHealthObject::getTimestamp() const { return mTimestamp; } + +void APHealthObject::setModuleId(const std::string &modId) { mModuleId = modId; } diff --git a/base/src/AbsControlModule.cpp b/base/src/AbsControlModule.cpp index 01bbefce6..387397d6f 100644 --- a/base/src/AbsControlModule.cpp +++ b/base/src/AbsControlModule.cpp @@ -8,78 +8,87 @@ class AbsControlModule::Detail { public: - Detail(AbsControlModuleProps& _props) : mProps(_props) - { - } + Detail(AbsControlModuleProps& _props) : mProps(_props) + { + } - ~Detail() - { - } + ~Detail() + { + } - std::string getPipelineRole(std::string pName, std::string role) - { - return pName + "_" + role; - } - - AbsControlModuleProps mProps; + AbsControlModuleProps mProps; }; AbsControlModule::AbsControlModule(AbsControlModuleProps _props) - :Module(TRANSFORM, "AbsControlModule", _props) + :Module(CONTROL, "AbsControlModule", _props) { - mDetail.reset(new Detail(_props)); + mDetail.reset(new Detail(_props)); } AbsControlModule::~AbsControlModule() {} bool AbsControlModule::handleCommand(Command::CommandType type, frame_sp& frame) { - return true; + return true; } bool AbsControlModule::handlePropsChange(frame_sp& frame) { - return true; + return true; } bool AbsControlModule::init() { - if (!Module::init()) - { - return false; - } - return true; + if (!Module::init()) + { + return false; + } + return true; } bool AbsControlModule::term() { - return Module::term(); + return Module::term(); } bool AbsControlModule::process(frame_container& frames) { - return true; + // Commands are already processed by the time we reach here. + return true; } -std::string AbsControlModule::enrollModule(std::string pName, std::string role, boost::shared_ptr module) +bool AbsControlModule::enrollModule(std::string role, boost::shared_ptr module) { - std::string pipelineRole = mDetail->getPipelineRole(pName, role); - if (moduleRoles.find(pipelineRole) != moduleRoles.end()) - { - std::string errMsg = "Enrollment Failed: This role <" + role + "> already registered with the Module <" + moduleRoles[pipelineRole]->getName() + "> in PipeLine <" + pName + ">"; - LOG_ERROR << errMsg; - throw AIPException(MODULE_ENROLLMENT_FAILED, errMsg); - } - moduleRoles[pipelineRole] = module; - return pipelineRole; + if (moduleRoles.find(role) != moduleRoles.end()) + { + LOG_ERROR << "Role already registered with the control module."; + return false; + } + + moduleRoles[role] = module; + + // NOTE: If you want error callback and health callback to work with a module, registering it with control is mandatory. + module->registerErrorCallback( + [this](const APErrorObject& error) { handleError(error); }); + + if (module->getProps().enableHealthCallBack) + { + module->registerHealthCallback( + [this](const APHealthObject& message) { handleHealthCallback(message); }); + } + + return true; } -std::pair> AbsControlModule::getModuleofRole(std::string pName, std::string role) +boost::shared_ptr AbsControlModule::getModuleofRole(std::string role) { - std::string pipelineRole = mDetail->getPipelineRole(pName, role); - if (moduleRoles.find(pipelineRole) == moduleRoles.end()) - { - return std::make_pair>(false, nullptr); - } - std::pair> res(true, moduleRoles[pipelineRole]); - return res; + boost::shared_ptr moduleWithRole = nullptr; + try + { + moduleWithRole = moduleRoles[role]; + } + catch (std::out_of_range) + { + LOG_ERROR << "no module with the role <" << role << "> registered with the control module."; + } + return moduleWithRole; } \ No newline at end of file diff --git a/base/src/GtkGlRenderer.cpp b/base/src/GtkGlRenderer.cpp index 1061b26f7..9bacee9f2 100644 --- a/base/src/GtkGlRenderer.cpp +++ b/base/src/GtkGlRenderer.cpp @@ -15,13 +15,15 @@ #include "GTKSetup.h" #include "GTKView.h" -struct signal { +struct signal +{ const gchar *signal; GCallback handler; GdkEventMask mask; }; -class GtkGlRenderer::Detail { +class GtkGlRenderer::Detail +{ public: Detail(GtkGlRendererProps &_props) : mProps(_props) { isMetadataSet = false; } @@ -29,51 +31,56 @@ class GtkGlRenderer::Detail { ~Detail() {} static void on_resize(GtkGLArea *area, gint width, gint height, - gpointer data) { - LOG_INFO << "GL Area Width " << width << "Height " << height; + gpointer data) + { view_set_window(width, height); background_set_window(width, height); } void setProps(GtkGlRendererProps &props) { mProps = props; } + static gboolean on_render(GtkGLArea *glarea, GdkGLContext *context, - gpointer data) { + gpointer data) + { GtkGlRenderer::Detail *detailInstance = (GtkGlRenderer::Detail *)data; - if (detailInstance->isMetadataSet == false) { + if (detailInstance->isMetadataSet == false) + { LOG_TRACE << "Metadata is Not Set "; return TRUE; } gint x, y; // Check if the child widget is realized (has an associated window) - if (gtk_widget_get_realized(GTK_WIDGET(glarea))) { + if (gtk_widget_get_realized(GTK_WIDGET(glarea))) + { // Get the immediate parent of the child GtkWidget *parent = gtk_widget_get_parent(GTK_WIDGET(glarea)); // Check if the parent is realized - if (parent && gtk_widget_get_realized(parent)) { + if (parent && gtk_widget_get_realized(parent)) + { // Get the position of the child relative to its parent gtk_widget_translate_coordinates(GTK_WIDGET(glarea), parent, 0, 0, &x, &y); - } else { - // g_print("Error: Child's parent is not realized.\n"); } - } else { - // g_print("Error: Child widget is not realized.\n"); } - if (!detailInstance->cachedFrame.get()) { - LOG_ERROR << "Got Empty Frame"; + if (!detailInstance->cachedFrame.get()) + { + LOG_TRACE << "Got Empty Frame"; return TRUE; } detailInstance->renderFrame = detailInstance->cachedFrame; void *frameToRender; - if (detailInstance->isDmaMem) { + if (detailInstance->isDmaMem) + { #if defined(__arm__) || defined(__aarch64__) frameToRender = static_cast(detailInstance->renderFrame->data()) ->getHostPtr(); #endif - } else { + } + else + { frameToRender = detailInstance->renderFrame->data(); } glClear(GL_COLOR_BUFFER_BIT | GL_DEPTH_BUFFER_BIT); @@ -90,7 +97,8 @@ class GtkGlRenderer::Detail { { gtk_gl_area_make_current(glarea); - if (gtk_gl_area_get_error(glarea) != NULL) { + if (gtk_gl_area_get_error(glarea) != NULL) + { LOG_ERROR << "Failed to initialize buffer"; return FALSE; } @@ -126,14 +134,17 @@ class GtkGlRenderer::Detail { } static void on_unrealize(GtkGLArea *glarea, gint width, gint height, - gpointer data) { + gpointer data) + { LOG_ERROR << "UNREALIZE " "SIGNAL==================================>>>>>>>>>>>>>>>>>"; } static gboolean on_scroll(GtkWidget *widget, GdkEventScroll *event, - gpointer data) { - switch (event->direction) { + gpointer data) + { + switch (event->direction) + { case GDK_SCROLL_UP: view_z_decrease(); break; @@ -150,14 +161,17 @@ class GtkGlRenderer::Detail { } void connect_signals(GtkWidget *widget, struct signal *signals, - size_t members) { - FOREACH_NELEM(signals, members, s) { + size_t members) + { + FOREACH_NELEM(signals, members, s) + { gtk_widget_add_events(widget, s->mask); g_signal_connect(widget, s->signal, s->handler, this); } } - void connect_window_signals(GtkWidget *window) { + void connect_window_signals(GtkWidget *window) + { struct signal signals[] = { {"destroy", G_CALLBACK(gtk_main_quit), (GdkEventMask)0}, }; @@ -165,7 +179,8 @@ class GtkGlRenderer::Detail { connect_signals(window, signals, NELEM(signals)); } - void connect_glarea_signals(GtkWidget *glarea) { + void connect_glarea_signals(GtkWidget *glarea) + { std::chrono::time_point t = std::chrono::system_clock::now(); auto dur = std::chrono::duration_cast( @@ -178,48 +193,58 @@ class GtkGlRenderer::Detail { // g_signal_connect(glarea, "unrealize", G_CALLBACK(on_unrealize), this); } - void disconnect_glarea_signals(GtkWidget *glarea) { + void disconnect_glarea_signals(GtkWidget *glarea) + { g_signal_handler_disconnect(glarea, realizeId); g_signal_handler_disconnect(glarea, renderId); g_signal_handler_disconnect(glarea, resizeId); } - bool init() { + bool init() + { connect_glarea_signals(glarea); return true; } - GtkWidget *glarea; - int windowWidth, windowHeight; - uint64_t frameWidth, frameHeight; - frame_sp cachedFrame, renderFrame; - void *frameToRender; - bool isDmaMem; - bool isMetadataSet; - GtkGlRendererProps mProps; - guint realizeId; - guint renderId; - guint resizeId; - bool isPlaybackRenderer = true; + GtkWidget *glarea; + int windowWidth, windowHeight; + uint64_t frameWidth, frameHeight; + frame_sp cachedFrame, renderFrame; + void *frameToRender; + bool isDmaMem; + bool isMetadataSet; + GtkGlRendererProps mProps; + guint realizeId; + guint renderId; + guint resizeId; + bool isPlaybackRenderer = true; + + // Error Object + APErrorCallback mErrorCallback; + std::string mModuleId; + std::string mModuleName; + }; GtkGlRenderer::GtkGlRenderer(GtkGlRendererProps props) : Module(SINK, "GtkGlRenderer", props) { - mDetail.reset(new Detail(props)); - mDetail->glarea = props.glArea; - mDetail->windowWidth = props.windowWidth; - mDetail->windowHeight = props.windowHeight; - mDetail->isPlaybackRenderer = props.isPlaybackRenderer; - //LOG_ERROR<<"i am creating gtkgl renderer width and height is "<mProps.windowWidth; + mDetail.reset(new Detail(props)); + mDetail->glarea = props.glArea; + mDetail->windowWidth = props.windowWidth; + mDetail->windowHeight = props.windowHeight; + mDetail->isPlaybackRenderer = props.isPlaybackRenderer; } GtkGlRenderer::~GtkGlRenderer() {} -bool GtkGlRenderer::init() { - if (!Module::init()) { +bool GtkGlRenderer::init() +{ + if (!Module::init()) + { return false; } - if (!mDetail->init()) { + if (!mDetail->init()) + { LOG_ERROR << "Failed To Initialize GtkGl Area "; return false; } @@ -229,37 +254,29 @@ bool GtkGlRenderer::init() { bool GtkGlRenderer::process(frame_container &frames) { - auto myId = Module::getId(); - auto frame = frames.cbegin()->second; - mDetail->cachedFrame = frame; - + auto myId = Module::getId(); + auto frame = frames.cbegin()->second; + mDetail->cachedFrame = frame; - if ((controlModule != nullptr && mDetail->isPlaybackRenderer == true)) - { - auto currentFrameTs = frames.cbegin()->second->timestamp; - boost::shared_ptrctl = boost::dynamic_pointer_cast(controlModule); - ctl->handleLastGtkGLRenderTS(currentFrameTs, true); - } - return true; + if ((controlModule != nullptr && mDetail->isPlaybackRenderer == true)) + { + auto currentFrameTs = frames.cbegin()->second->timestamp; + boost::shared_ptr ctl = boost::dynamic_pointer_cast(controlModule); + ctl->handleLastGtkGLRenderTS(currentFrameTs, true); + } + return true; } -void GtkGlRenderer::pushFrame(frame_sp frame) { +void GtkGlRenderer::pushFrame(frame_sp frame) +{ std::lock_guard lock(queueMutex); frameQueue.push(frame); } -// Need to check on Mem Type Supported -// Already Checked With CPU , Need to check with -// framemetadata_sp metadata = getFirstInputMetadata(); -// FrameMetadata::MemType memType = metadata->getMemType(); -// if (memType != FrameMetadata::MemType::DMABUF) -// { -// LOG_ERROR << "<" << getId() << ">::validateInputPins input memType is -// expected to be DMABUF. Actual<" << memType << ">"; return false; -// } - -bool GtkGlRenderer::validateInputPins() { - if (getNumberOfInputPins() < 1) { +bool GtkGlRenderer::validateInputPins() +{ + if (getNumberOfInputPins() < 1) + { LOG_ERROR << "<" << getId() << ">::validateInputPins size is expected to be 1. Actual<" << getNumberOfInputPins() << ">"; @@ -269,13 +286,15 @@ bool GtkGlRenderer::validateInputPins() { return true; } -bool GtkGlRenderer::term() { +bool GtkGlRenderer::term() +{ bool res = Module::term(); return res; } bool GtkGlRenderer::changeProps(GtkWidget *glArea, int windowWidth, - int windowHeight) { + int windowHeight) +{ mDetail->disconnect_glarea_signals(GTK_WIDGET(mDetail->glarea)); mDetail->glarea = glArea; mDetail->windowWidth = windowWidth; @@ -285,27 +304,35 @@ bool GtkGlRenderer::changeProps(GtkWidget *glArea, int windowWidth, return true; } -bool GtkGlRenderer::shouldTriggerSOS() { - if (!mDetail->isMetadataSet) { +bool GtkGlRenderer::shouldTriggerSOS() +{ + if (!mDetail->isMetadataSet) + { LOG_TRACE << "WIll Trigger SOS"; return true; } return false; } -bool GtkGlRenderer::processSOS(frame_sp &frame) { +bool GtkGlRenderer::processSOS(frame_sp &frame) +{ auto inputMetadata = frame->getMetadata(); auto frameType = inputMetadata->getFrameType(); LOG_TRACE << "GOT METADATA " << inputMetadata->getFrameType(); int width = 0; int height = 0; - switch (frameType) { - case FrameMetadata::FrameType::RAW_IMAGE: { + switch (frameType) + { + case FrameMetadata::FrameType::RAW_IMAGE: + { auto metadata = FrameMetadataFactory::downcast(inputMetadata); if (metadata->getImageType() != ImageMetadata::RGBA && - metadata->getImageType() != ImageMetadata::RGB) { + metadata->getImageType() != ImageMetadata::RGB) + { + APErrorObject error(0, "Unsupported Image Type"); + executeErrorCallback(error); throw AIPException(AIP_FATAL, "Unsupported ImageType, Currently Only RGB " ", BGR , BGRA and RGBA is supported<" + std::to_string(frameType) + ">"); @@ -316,18 +343,23 @@ bool GtkGlRenderer::processSOS(frame_sp &frame) { metadata->getMemType() == FrameMetadata::MemType::DMABUF; LOG_INFO << "Width is " << metadata->getWidth() << "Height is " - << metadata->getHeight(); + << metadata->getHeight(); FrameMetadata::MemType memType = metadata->getMemType(); { if (memType != FrameMetadata::MemType::DMABUF) LOG_INFO << "Memory Type Is Not DMA but it's a interleaved Image"; } - } break; - case FrameMetadata::FrameType::RAW_IMAGE_PLANAR: { + } + break; + case FrameMetadata::FrameType::RAW_IMAGE_PLANAR: + { auto metadata = FrameMetadataFactory::downcast(inputMetadata); - if (metadata->getImageType() != ImageMetadata::RGBA) { + if (metadata->getImageType() != ImageMetadata::RGBA) + { + APErrorObject error(0, "Unsupported Image Type"); + executeErrorCallback(error); throw AIPException(AIP_FATAL, "Unsupported ImageType, Currently Only " "RGB, BGR, BGRA and RGBA is supported<" + std::to_string(frameType) + ">"); @@ -337,12 +369,14 @@ bool GtkGlRenderer::processSOS(frame_sp &frame) { mDetail->isDmaMem = metadata->getMemType() == FrameMetadata::MemType::DMABUF; LOG_INFO << "Width is " << metadata->getWidth(0) << "Height is " - << metadata->getHeight(0); + << metadata->getHeight(0); FrameMetadata::MemType memType = metadata->getMemType(); - if (memType != FrameMetadata::MemType::DMABUF) { + if (memType != FrameMetadata::MemType::DMABUF) + { LOG_INFO << "Memory Type Is Not DMA but it's a planar Image"; } - } break; + } + break; default: throw AIPException(AIP_FATAL, "Unsupported FrameType<" + std::to_string(frameType) + ">"); @@ -351,6 +385,7 @@ bool GtkGlRenderer::processSOS(frame_sp &frame) { return true; } -bool GtkGlRenderer::handleCommand(Command::CommandType type, frame_sp &frame) { +bool GtkGlRenderer::handleCommand(Command::CommandType type, frame_sp &frame) +{ return Module::handleCommand(type, frame); } \ No newline at end of file diff --git a/base/src/ImageEncoderCV.cpp b/base/src/ImageEncoderCV.cpp index 62df9e772..a09a71cd1 100644 --- a/base/src/ImageEncoderCV.cpp +++ b/base/src/ImageEncoderCV.cpp @@ -61,10 +61,8 @@ class ImageEncoderCV::Detail cv::Mat iImg; vector flags; - private: ImageEncoderCVProps props; - }; ImageEncoderCV::ImageEncoderCV(ImageEncoderCVProps _props) : Module(TRANSFORM, "ImageEncoderCV", _props) @@ -143,7 +141,7 @@ bool ImageEncoderCV::process(frame_container &frames) return true; } vector buf; - + mDetail->iImg.data = static_cast(frame->data()); cv::imencode(".jpg",mDetail->iImg,buf,mDetail->flags); auto outFrame = makeFrame(buf.size()); @@ -158,4 +156,4 @@ bool ImageEncoderCV::processSOS(frame_sp &frame) auto metadata = frame->getMetadata(); mDetail->setMetadata(metadata); return true; -} +} \ No newline at end of file diff --git a/base/src/Module.cpp b/base/src/Module.cpp index 53b581983..661b7ee15 100644 --- a/base/src/Module.cpp +++ b/base/src/Module.cpp @@ -1,1589 +1,1757 @@ #include "stdafx.h" #include -#include #include +#include -#include -#include -#include "Module.h" #include "AIPExceptions.h" #include "Frame.h" #include "FrameContainerQueue.h" #include "FrameMetadata.h" +#include "Module.h" +#include +#include -#include "PaceMaker.h" #include "BufferMaker.h" +#include "PaceMaker.h" #include "PausePlayMetadata.h" // makes frames from this module's frame factory -Module::FFBufferMaker::FFBufferMaker(Module& module):myModule(module){} -void * Module::FFBufferMaker::make(size_t dataSize) +Module::FFBufferMaker::FFBufferMaker(Module &module) : myModule(module) {} +void *Module::FFBufferMaker::make(size_t dataSize) { - if(frameIMade.get()!=nullptr) - { throw AIPException(AIP_NOTEXEPCTED,"The frame was already made"); } + if (frameIMade.get() != nullptr) + { + throw AIPException(AIP_NOTEXEPCTED, "The frame was already made"); + } - frameIMade=myModule.makeFrame(dataSize); - return frameIMade->data(); + frameIMade = myModule.makeFrame(dataSize); + return frameIMade->data(); } - class Module::Profiler { - using sys_clock = std::chrono::system_clock; + using sys_clock = std::chrono::system_clock; public: - Profiler(string &id, bool _shouldLog, int _printFrequency, std::function _getPoolHealthRecord) : moduleId(id), shouldLog(_shouldLog), mPipelineFps(0), printFrequency(_printFrequency) - { - getPoolHealthRecord = _getPoolHealthRecord; - } - - void setShouldLog(bool _shouldLog) - { - shouldLog = _shouldLog; - } - - virtual ~Profiler() - { - } - - void startPipelineLap() - { - - pipelineStart = sys_clock::now(); - processingStart = pipelineStart; - } - - void startProcessingLap() - { - - processingStart = sys_clock::now(); - } - - void endLap(size_t _queSize) - { - sys_clock::time_point end = sys_clock::now(); - std::chrono::nanoseconds diff = end - pipelineStart; - totalPipelineDuration += diff.count(); - diff = end - processingStart; - totalProcessingDuration += diff.count(); - queSize += _queSize; - - counter += 1; - if (counter % printFrequency == 0) - { - auto processingDurationInSeconds = totalProcessingDuration / 1000000000.0; - double processingFps = printFrequency / processingDurationInSeconds; - auto pipelineDurationInSeconds = totalPipelineDuration / 1000000000.0; - double pipelineFps = printFrequency / pipelineDurationInSeconds; - auto idleWaitingTime = pipelineDurationInSeconds - processingDurationInSeconds; - - if (shouldLog) - { - LOG_INFO << moduleId << " processed<" << printFrequency << "> frames. Pipeline Time<" << pipelineDurationInSeconds << "> PipelineAvgFps<" << std::setprecision(5) << pipelineFps << "> Processing Time<" << processingDurationInSeconds << "> ProcessingAvgFps<" << std::setprecision(5) << processingFps << "> AvgQue<" << std::setprecision(5) << (queSize / printFrequency) << "> IdleTime<" << idleWaitingTime << "> Que<" << _queSize << "> " << getPoolHealthRecord(); - } - - totalPipelineDuration = 0; - totalProcessingDuration = 0; - queSize = 0; - - mPipelineFps = pipelineFps; - } - } - - uint64_t getTickCounter() - { - return counter; - } - - double getPipelineFps() - { - return mPipelineFps; - } - - void resetStats() - { - mPipelineFps = 0; - } + Profiler(string &id, bool _shouldLog, int _printFrequency, int _healthUpdateIntervalInSec, + std::function _getPoolHealthRecord, + APHealthCallback _healthCallback) + : moduleId(id), shouldLog(_shouldLog), mPipelineFps(0), + printFrequency(_printFrequency), healthCallback(_healthCallback) + { + getPoolHealthRecord = _getPoolHealthRecord; + lastHealthCallbackTime = sys_clock::now(); + mHealthUpdateIntervalInSec = _healthUpdateIntervalInSec; + } + + void setShouldLog(bool _shouldLog) { shouldLog = _shouldLog; } + + virtual ~Profiler() {} + + void startPipelineLap() + { + + pipelineStart = sys_clock::now(); + processingStart = pipelineStart; + } + + void startProcessingLap() { processingStart = sys_clock::now(); } + + void endLap(size_t _queSize) + { + sys_clock::time_point end = sys_clock::now(); + std::chrono::nanoseconds diff = end - pipelineStart; + totalPipelineDuration += diff.count(); + diff = end - processingStart; + totalProcessingDuration += diff.count(); + queSize += _queSize; + + counter += 1; + if (counter % printFrequency == 0) + { + auto processingDurationInSeconds = totalProcessingDuration / 1000000000.0; + double processingFps = printFrequency / processingDurationInSeconds; + auto pipelineDurationInSeconds = totalPipelineDuration / 1000000000.0; + double pipelineFps = printFrequency / pipelineDurationInSeconds; + auto idleWaitingTime = + pipelineDurationInSeconds - processingDurationInSeconds; + + if (shouldLog) + { + LOG_INFO << moduleId << " processed<" << printFrequency + << "> frames. Pipeline Time<" << pipelineDurationInSeconds + << "> PipelineAvgFps<" << std::setprecision(5) << pipelineFps + << "> Processing Time<" << processingDurationInSeconds + << "> ProcessingAvgFps<" << std::setprecision(5) + << processingFps << "> AvgQue<" << std::setprecision(5) + << (queSize / printFrequency) << "> IdleTime<" + << idleWaitingTime << "> Que<" << _queSize << "> " + << getPoolHealthRecord(); + } + + totalPipelineDuration = 0; + totalProcessingDuration = 0; + queSize = 0; + + mPipelineFps = pipelineFps; + } + + auto now = sys_clock::now(); + auto elapsed = std::chrono::duration_cast( + now - lastHealthCallbackTime) + .count(); + if (elapsed >= mHealthUpdateIntervalInSec) + { + if (healthCallback) + { + APHealthObject healthObject(moduleId); + healthCallback(healthObject); + } + lastHealthCallbackTime = now; + } + } + + uint64_t getTickCounter() { return counter; } + + double getPipelineFps() { return mPipelineFps; } + + void resetStats() { mPipelineFps = 0; } + + void setHealthCallback(APHealthCallback _healthCallback) + { + healthCallback = _healthCallback; + } private: - string moduleId; - sys_clock::time_point processingStart; - sys_clock::time_point pipelineStart; - int printFrequency; - uint64_t counter = 0; - double totalProcessingDuration = 0; - double totalPipelineDuration = 0; - double queSize = 0; - bool shouldLog = false; - std::function getPoolHealthRecord; - - double mPipelineFps; + string moduleId; + sys_clock::time_point processingStart; + sys_clock::time_point pipelineStart; + sys_clock::time_point lastHealthCallbackTime; + int printFrequency; + uint64_t counter = 0; + double totalProcessingDuration = 0; + double totalPipelineDuration = 0; + double queSize = 0; + bool shouldLog = false; + std::function getPoolHealthRecord; + + double mPipelineFps; + APHealthCallback healthCallback; + int mHealthUpdateIntervalInSec; }; -Module::Module(Kind nature, string name, ModuleProps _props) : mRunning(false), mPlay(true), mDirection(true), mForceStepCount(0), mStopCount(0), mForwardPins(0), myNature(nature), myName(name), mSkipIndex(0) +Module::Module(Kind nature, string name, ModuleProps _props) + : mRunning(false), mPlay(true), mDirection(true), mForceStepCount(0), + mStopCount(0), mForwardPins(0), myNature(nature), myName(name), + mSkipIndex(0), mHealthCallback(nullptr) { - static int moduleCounter = 0; - moduleCounter += 1; - myId = name + "_" + std::to_string(moduleCounter); + static int moduleCounter = 0; + moduleCounter += 1; + myId = name + "_" + std::to_string(moduleCounter); - mQue.reset(new FrameContainerQueue(_props.qlen)); + mQue.reset(new FrameContainerQueue(_props.qlen)); - onStepFail = boost::bind(&Module::ignore, this, 0); + onStepFail = boost::bind(&Module::ignore, this, 0); + LOG_INFO << "Setting Module tolerance for step failure as: " << "<0>. Currently there is no way to change this."; - pacer = boost::shared_ptr(new PaceMaker(_props.fps)); - auto tempId = getId(); - mProfiler.reset(new Profiler(tempId, _props.logHealth, _props.logHealthFrequency, [&]() -> std::string { - if(!mpFrameFactory.get()){ - return ""; - } - return mpFrameFactory->getPoolHealthRecord(); - })); - if (_props.skipN > _props.skipD) - { - throw AIPException(AIP_ROI_OUTOFRANGE, "skipN <= skipD"); - } - mProps.reset(new ModuleProps(_props)); // saving for restoring later + pacer = boost::shared_ptr(new PaceMaker(_props.fps)); + auto tempId = getId(); + mProfiler.reset(new Profiler( + tempId, _props.logHealth, _props.logHealthFrequency, _props.healthUpdateIntervalInSec, + [&]() -> std::string + { + if (!mpFrameFactory.get()) + { + return ""; + } + return mpFrameFactory->getPoolHealthRecord(); + }, + mHealthCallback)); + if (_props.skipN > _props.skipD) + { + throw AIPException(AIP_ROI_OUTOFRANGE, "skipN <= skipD"); + } + mProps.reset(new ModuleProps(_props)); // saving for restoring later - mCommandMetadata.reset(new FrameMetadata(FrameMetadata::FrameType::COMMAND)); - mPropsChangeMetadata.reset(new FrameMetadata(FrameMetadata::FrameType::PROPS_CHANGE)); -} -Module::~Module() -{ + mCommandMetadata.reset(new FrameMetadata(FrameMetadata::FrameType::COMMAND)); + mPropsChangeMetadata.reset( + new FrameMetadata(FrameMetadata::FrameType::PROPS_CHANGE)); } +Module::~Module() {} bool Module::term() { - mQue->clear(); - // in case of cyclic dependency - one module holds the reference of the other and hence they never get freed - mModules.clear(); - mProfiler->resetStats(); + mQue->clear(); + // in case of cyclic dependency - one module holds the reference of the other + // and hence they never get freed + mModules.clear(); + mProfiler->resetStats(); - return true; + return true; } -double Module::getPipelineFps() -{ - return mProfiler->getPipelineFps(); -} +double Module::getPipelineFps() { return mProfiler->getPipelineFps(); } -uint64_t Module::getTickCounter() -{ - return mProfiler->getTickCounter(); -} +uint64_t Module::getTickCounter() { return mProfiler->getTickCounter(); } void Module::setProps(ModuleProps &props) { - if (props.qlen != mProps->qlen) - { - throw AIPException(AIP_NOTIMPLEMENTED, string("qlen cannot be changed")); - } + if (props.qlen != mProps->qlen) + { + throw AIPException(AIP_NOTIMPLEMENTED, string("qlen cannot be changed")); + } - pacer->setFps(props.fps); - mProfiler->setShouldLog(props.logHealth); - if (props.skipN > props.skipD) - { - // processing all - props.skipN = 0; - props.skipD = 1; - } - mProps.reset(new ModuleProps(props)); + pacer->setFps(props.fps); + mProfiler->setShouldLog(props.logHealth); + if (props.skipN > props.skipD) + { + // processing all + props.skipN = 0; + props.skipD = 1; + } + mProps.reset(new ModuleProps(props)); } -ModuleProps Module::getProps() -{ - return *mProps.get(); -} +ModuleProps Module::getProps() { return *mProps.get(); } void Module::fillProps(ModuleProps &props) { - props.fps = mProps->fps; - props.qlen = mProps->qlen; - props.logHealth = mProps->logHealth; + props.fps = mProps->fps; + props.qlen = mProps->qlen; + props.logHealth = mProps->logHealth; } string Module::addOutputPin(framemetadata_sp &metadata) { - std::string pinId = myId + "_pin_" + std::to_string(mOutputPinIdFrameFactoryMap.size() + 1); - addOutputPin(metadata, pinId); + std::string pinId = + myId + "_pin_" + std::to_string(mOutputPinIdFrameFactoryMap.size() + 1); + addOutputPin(metadata, pinId); - return pinId; + return pinId; } void Module::addOutputPin(framemetadata_sp &metadata, string &pinId) { - if (mOutputPinIdFrameFactoryMap.find(pinId) != mOutputPinIdFrameFactoryMap.end()) - { - // key alread exist exception - auto msg = "<" + getId() + "> pinId<" + pinId + "> Already Exist. Please give unique name."; - throw AIPException(AIP_UNIQUE_CONSTRAINT_FAILED, msg); - } - mOutputPinIdFrameFactoryMap.insert(std::make_pair(pinId, framefactory_sp(new FrameFactory(metadata, mProps->maxConcurrentFrames)))); - - if (!validateOutputPins()) - { - mOutputPinIdFrameFactoryMap.erase(pinId); - auto msg = "<" + getId() + "> Output Pins Validation Failed."; - throw AIPException(AIP_PINS_VALIDATION_FAILED, msg); - } + if (mOutputPinIdFrameFactoryMap.find(pinId) != + mOutputPinIdFrameFactoryMap.end()) + { + // key alread exist exception + auto msg = "<" + getId() + "> pinId<" + pinId + + "> Already Exist. Please give unique name."; + throw AIPException(AIP_UNIQUE_CONSTRAINT_FAILED, msg); + } + mOutputPinIdFrameFactoryMap.insert( + std::make_pair(pinId, framefactory_sp(new FrameFactory( + metadata, mProps->maxConcurrentFrames)))); + + if (!validateOutputPins()) + { + mOutputPinIdFrameFactoryMap.erase(pinId); + auto msg = "<" + getId() + "> Output Pins Validation Failed."; + throw AIPException(AIP_PINS_VALIDATION_FAILED, msg); + } } void Module::setSieveDisabledFlag(bool sieve) { - // mIsSieveDisabledForAny is true when atleast one downstream connection of this module has sieve disabled - if (!sieve) - { - mIsSieveDisabledForAny = !sieve; - } -} - -bool Module::setNext(boost::shared_ptr next, vector &pinIdArr, bool open, bool isFeedback, bool sieve) -{ - if (next->getNature() < this->getNature()) - { - LOG_ERROR << "Can not connect these modules " << this->getId() << " -> " << next->getId(); - return false; - } - - if (pinIdArr.size() == 0) - { - LOG_ERROR << "No Pins to connect. " << this->getId() << " -> " << next->getId(); - return false; - } - - auto nextModuleId = next->getId(); - if (mModules.find(nextModuleId) != mModules.end()) - { - LOG_ERROR << "<" << getId() << "> Connection for <" << nextModuleId << " > already done."; - return false; - } - mModules[nextModuleId] = next; - mConnections.insert(make_pair(nextModuleId, boost::container::deque())); - - if (sieve) - { - for (auto &pinId : pinIdArr) - { - if (mOutputPinIdFrameFactoryMap.find(pinId) == mOutputPinIdFrameFactoryMap.end()) - { - auto msg = "pinId<" + pinId + "> doesn't exist in <" + this->getId() + ">"; - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - throw AIPException(AIP_PIN_NOTFOUND, msg); - } - - framemetadata_sp metadata = mOutputPinIdFrameFactoryMap[pinId]->getFrameMetadata(); - // Set input meta here - try - { - next->addInputPin(metadata, pinId, isFeedback); // addInputPin throws exception from validateInputPins - } - catch (AIP_Exception& exception) - { - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - throw exception; - } - catch (...) - { - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - LOG_FATAL << ""; - throw AIPException(AIP_FATAL, "<" + getId() + "> addInputPin. PinId<" + pinId + ">. Unknown exception."); - } - - // add next module here - mConnections[nextModuleId].push_back(pinId); - } - } - else - { - // important - flag used to send enough number of EOP frames - setSieveDisabledFlag(sieve); - for (auto& pinId : pinIdArr) - { - bool pinFound = false; - if (mOutputPinIdFrameFactoryMap.find(pinId) != mOutputPinIdFrameFactoryMap.end()) - { - pinFound = true; - framemetadata_sp metadata = mOutputPinIdFrameFactoryMap[pinId]->getFrameMetadata(); - // Set input meta here - try - { - next->addInputPin(metadata, pinId, isFeedback); // addInputPin throws exception from validateInputPins - } - catch (AIP_Exception& exception) - { - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - throw exception; - } - catch (...) - { - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - LOG_FATAL << ""; - throw AIPException(AIP_FATAL, "<" + getId() + "> addInputPin. PinId<" + pinId + ">. Unknown exception."); - } - } - if (mInputPinIdMetadataMap.find(pinId) != mInputPinIdMetadataMap.end()) - { - pinFound = true; - framemetadata_sp metadata = mInputPinIdMetadataMap[pinId]; - - // Set input meta here - try - { - next->addInputPin(metadata, pinId, isFeedback); // addInputPin throws exception from validateInputPins - } - catch (AIP_Exception& exception) - { - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - throw exception; - } - catch (...) - { - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - LOG_FATAL << ""; - throw AIPException(AIP_FATAL, "<" + getId() + "> addInputPin. PinId<" + pinId + ">. Unknown exception."); - } - } - - if (!pinFound) - { - auto msg = "pinId<" + pinId + "> doesn't exist in <" + this->getId() + ">"; - mModules.erase(nextModuleId); - mConnections.erase(nextModuleId); - throw AIPException(AIP_PIN_NOTFOUND, msg); - } - // add next module here - mConnections[nextModuleId].push_back(pinId); - } - } - - mRelay[nextModuleId] = open; - - return true; + // mIsSieveDisabledForAny is true when atleast one downstream connection of + // this module has sieve disabled + if (!sieve) + { + mIsSieveDisabledForAny = !sieve; + } +} + +bool Module::setNext(boost::shared_ptr next, vector &pinIdArr, + bool open, bool isFeedback, bool sieve) +{ + if (next->getNature() < this->getNature()) + { + LOG_ERROR << "Can not connect these modules " << this->getId() << " -> " + << next->getId(); + return false; + } + + if (pinIdArr.size() == 0) + { + LOG_ERROR << "No Pins to connect. " << this->getId() << " -> " + << next->getId(); + return false; + } + + auto nextModuleId = next->getId(); + if (mModules.find(nextModuleId) != mModules.end()) + { + LOG_ERROR << "<" << getId() << "> Connection for <" << nextModuleId + << " > already done."; + return false; + } + mModules[nextModuleId] = next; + mConnections.insert( + make_pair(nextModuleId, boost::container::deque())); + + if (sieve) + { + for (auto &pinId : pinIdArr) + { + if (mOutputPinIdFrameFactoryMap.find(pinId) == + mOutputPinIdFrameFactoryMap.end()) + { + auto msg = + "pinId<" + pinId + "> doesn't exist in <" + this->getId() + ">"; + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + throw AIPException(AIP_PIN_NOTFOUND, msg); + } + + framemetadata_sp metadata = + mOutputPinIdFrameFactoryMap[pinId]->getFrameMetadata(); + // Set input meta here + try + { + next->addInputPin( + metadata, pinId, + isFeedback); // addInputPin throws exception from validateInputPins + } + catch (AIP_Exception &exception) + { + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + throw exception; + } + catch (...) + { + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + LOG_FATAL << ""; + throw AIPException(AIP_FATAL, "<" + getId() + "> addInputPin. PinId<" + + pinId + ">. Unknown exception."); + } + + // add next module here + mConnections[nextModuleId].push_back(pinId); + } + } + else + { + // important - flag used to send enough number of EOP frames + setSieveDisabledFlag(sieve); + for (auto &pinId : pinIdArr) + { + bool pinFound = false; + if (mOutputPinIdFrameFactoryMap.find(pinId) != + mOutputPinIdFrameFactoryMap.end()) + { + pinFound = true; + framemetadata_sp metadata = + mOutputPinIdFrameFactoryMap[pinId]->getFrameMetadata(); + // Set input meta here + try + { + next->addInputPin(metadata, pinId, + isFeedback); // addInputPin throws exception from + // validateInputPins + } + catch (AIP_Exception &exception) + { + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + throw exception; + } + catch (...) + { + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + LOG_FATAL << ""; + throw AIPException(AIP_FATAL, "<" + getId() + + "> addInputPin. PinId<" + pinId + + ">. Unknown exception."); + } + } + if (mInputPinIdMetadataMap.find(pinId) != mInputPinIdMetadataMap.end()) + { + pinFound = true; + framemetadata_sp metadata = mInputPinIdMetadataMap[pinId]; + + // Set input meta here + try + { + next->addInputPin(metadata, pinId, + isFeedback); // addInputPin throws exception from + // validateInputPins + } + catch (AIP_Exception &exception) + { + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + throw exception; + } + catch (...) + { + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + LOG_FATAL << ""; + throw AIPException(AIP_FATAL, "<" + getId() + + "> addInputPin. PinId<" + pinId + + ">. Unknown exception."); + } + } + + if (!pinFound) + { + auto msg = + "pinId<" + pinId + "> doesn't exist in <" + this->getId() + ">"; + mModules.erase(nextModuleId); + mConnections.erase(nextModuleId); + throw AIPException(AIP_PIN_NOTFOUND, msg); + } + // add next module here + mConnections[nextModuleId].push_back(pinId); + } + } + + mRelay[nextModuleId] = open; + + return true; } // default - open, sieve is enabled - feedback false bool Module::setNext(boost::shared_ptr next, bool open, bool sieve) { - return setNext(next, open, false, sieve); + return setNext(next, open, false, sieve); } -bool Module::setNext(boost::shared_ptr next, bool open, bool isFeedback, bool sieve) +bool Module::setNext(boost::shared_ptr next, bool open, bool isFeedback, + bool sieve) { - pair me; // map element - vector pinIdArr; - BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) - { - pinIdArr.push_back(me.first); - } + pair me; // map element + vector pinIdArr; + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + pinIdArr.push_back(me.first); + } - if (!sieve) - { - pair me; // map element - BOOST_FOREACH(me, mInputPinIdMetadataMap) - { - pinIdArr.push_back(me.first); - } - } + if (!sieve) + { + pair me; // map element + BOOST_FOREACH (me, mInputPinIdMetadataMap) + { + pinIdArr.push_back(me.first); + } + } - // sending all the outputpins - return setNext(next, pinIdArr, open, isFeedback, sieve); + // sending all the outputpins + return setNext(next, pinIdArr, open, isFeedback, sieve); } -bool Module::setNext(boost::shared_ptr next, vector &pinIdArr, bool open) +bool Module::setNext(boost::shared_ptr next, vector &pinIdArr, + bool open) { - return setNext(next, pinIdArr, open, false, true); + return setNext(next, pinIdArr, open, false, true); } -bool Module::addFeedback(boost::shared_ptr next, vector &pinIdArr, bool open) +bool Module::addFeedback(boost::shared_ptr next, + vector &pinIdArr, bool open) { - return setNext(next, pinIdArr, open, true, true); + return setNext(next, pinIdArr, open, true, true); } bool Module::addFeedback(boost::shared_ptr next, bool open) { - return setNext(next, open, true, true); + return setNext(next, open, true, true); } -void Module::addInputPin(framemetadata_sp &metadata, string &pinId, bool isFeedback) +void Module::addInputPin(framemetadata_sp &metadata, string &pinId, + bool isFeedback) { - addInputPin(metadata, pinId); - if (isFeedback) - { - mForwardPins--; - mInputPinsDirection[pinId] = false; // feedback - } + addInputPin(metadata, pinId); + if (isFeedback) + { + mForwardPins--; + mInputPinsDirection[pinId] = false; // feedback + } } void Module::addInputPin(framemetadata_sp &metadata, string &pinId) { - if (mInputPinIdMetadataMap.find(pinId) != mInputPinIdMetadataMap.end()) - { - auto msg = "<" + getId() + "> pinId <" + pinId + "> already added for <" + getId() + ">"; - throw AIPException(AIP_UNIQUE_CONSTRAINT_FAILED, msg); - } + if (mInputPinIdMetadataMap.find(pinId) != mInputPinIdMetadataMap.end()) + { + auto msg = "<" + getId() + "> pinId <" + pinId + "> already added for <" + + getId() + ">"; + throw AIPException(AIP_UNIQUE_CONSTRAINT_FAILED, msg); + } - mInputPinIdMetadataMap[pinId] = metadata; + mInputPinIdMetadataMap[pinId] = metadata; - if (!validateInputPins()) - { - mInputPinIdMetadataMap.erase(pinId); - auto msg = "Input Pins Validation Failed. <" + getId() + ">"; - throw AIPException(AIP_PINS_VALIDATION_FAILED, msg); - } + if (!validateInputPins()) + { + mInputPinIdMetadataMap.erase(pinId); + auto msg = "Input Pins Validation Failed. <" + getId() + ">"; + throw AIPException(AIP_PINS_VALIDATION_FAILED, msg); + } - mForwardPins++; - mInputPinsDirection[pinId] = true; // forward + mForwardPins++; + mInputPinsDirection[pinId] = true; // forward } bool Module::isFeedbackEnabled(std::string &moduleId) { - auto &pinIdArr = mConnections[moduleId]; - auto childModule = mModules[moduleId]; - for (auto itr = pinIdArr.begin(); itr != pinIdArr.end(); itr++) - { - auto &pinId = *itr; - if (childModule->mInputPinsDirection[pinId]) - { - // forward pin found - so feedback not enabled - return false; - } - } + auto &pinIdArr = mConnections[moduleId]; + auto childModule = mModules[moduleId]; + for (auto itr = pinIdArr.begin(); itr != pinIdArr.end(); itr++) + { + auto &pinId = *itr; + if (childModule->mInputPinsDirection[pinId]) + { + // forward pin found - so feedback not enabled + return false; + } + } - return true; + return true; } bool Module::validateInputPins() { - if (myNature == SOURCE && getNumberOfInputPins() == 0) - { - return true; - } - - return false; + if (myNature == SOURCE && getNumberOfInputPins() == 0) + { + return true; + } + else if (myNature == CONTROL && getNumberOfInputPins() > 0) + { + throw AIPException(CTRL_MODULE_INVALID_STATE, "Illegal: Control module does not take any input pins."); + } + return false; } bool Module::validateOutputPins() { - if (myNature == SINK && getNumberOfOutputPins() == 0) - { - return true; - } - - return false; + if (myNature == SINK && getNumberOfOutputPins() == 0) + { + return true; + } + else if (myNature == CONTROL && getNumberOfOutputPins() > 0) + { + throw AIPException(CTRL_MODULE_INVALID_STATE, "Illegal: Control module does not take any input pins."); + } + return false; } framemetadata_sp Module::getFirstInputMetadata() { - return mInputPinIdMetadataMap.begin()->second; + return mInputPinIdMetadataMap.begin()->second; } framemetadata_sp Module::getFirstOutputMetadata() { - return mOutputPinIdFrameFactoryMap.begin()->second->getFrameMetadata(); + return mOutputPinIdFrameFactoryMap.begin()->second->getFrameMetadata(); } -boost::container::deque> Module::getConnectedModules() +boost::container::deque> +Module::getConnectedModules() { - boost::container::deque> nextModules; + boost::container::deque> nextModules; - for (map>::const_iterator it = mModules.cbegin(); it != mModules.cend(); ++it) - { - auto pModule = it->second; - nextModules.push_back(pModule); - } + for (map>::const_iterator it = + mModules.cbegin(); + it != mModules.cend(); ++it) + { + auto pModule = it->second; + nextModules.push_back(pModule); + } - return nextModules; + return nextModules; } bool Module::init() { - auto ret = validateInputOutputPins(); - if (!ret) - { - return ret; - } - - mQue->accept(); - if (mModules.size() == 1 && mProps->quePushStrategyType == QuePushStrategy::NON_BLOCKING_ALL_OR_NONE) - { - mProps->quePushStrategyType = QuePushStrategy::NON_BLOCKING_ANY; - } - mQuePushStrategy = QuePushStrategy::getStrategy(mProps->quePushStrategyType, myId); - // loop all the downstream modules and set the que - for (map>::const_iterator it = mModules.begin(); it != mModules.end(); ++it) - { - auto pModule = it->second; - auto que = pModule->getQue(); - mQuePushStrategy->addQue(it->first, que); - } - - if (myNature == TRANSFORM && getNumberOfInputPins() == 1 && getNumberOfOutputPins() == 1) - { - // propagate hint - // currently propagating if 1 input and 1 output - - auto in = getFirstInputMetadata(); - auto out = getFirstOutputMetadata(); - out->copyHint(*in.get()); - } - if (myNature == SOURCE) - { - pair me; // map element - BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) - { - auto metadata = me.second->getFrameMetadata(); - if(!metadata->isSet()) - { - throw AIPException(AIP_FATAL, "Source FrameFactory is constructed without metadata set"); - } - mOutputPinIdFrameFactoryMap[me.first].reset(new FrameFactory(metadata, mProps->maxConcurrentFrames)); - } - } - mpCommandFactory.reset(new FrameFactory(mCommandMetadata)); - - mStopCount = 0; - - mFIndexStrategy = FIndexStrategy::create(mProps->fIndexStrategyType); - - return ret; + auto ret = validateInputOutputPins(); + if (!ret) + { + return ret; + } + + mQue->accept(); + + if (mModules.size() == 1 && mProps->quePushStrategyType == QuePushStrategy::NON_BLOCKING_ALL_OR_NONE) + { + mProps->quePushStrategyType = QuePushStrategy::NON_BLOCKING_ANY; + } + mQuePushStrategy = QuePushStrategy::getStrategy(mProps->quePushStrategyType, myId); + // loop all the downstream modules and set the que + for (map>::const_iterator it = mModules.begin(); it != mModules.end(); ++it) + { + auto pModule = it->second; + auto que = pModule->getQue(); + mQuePushStrategy->addQue(it->first, que); + } + + if (myNature == TRANSFORM && getNumberOfInputPins() == 1 && getNumberOfOutputPins() == 1) + { + // propagate hint + // currently propagating if 1 input and 1 output + + auto in = getFirstInputMetadata(); + auto out = getFirstOutputMetadata(); + out->copyHint(*in.get()); + } + if (myNature == SOURCE) + { + pair me; // map element + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + auto metadata = me.second->getFrameMetadata(); + if (!metadata->isSet()) + { + throw AIPException(AIP_FATAL, "Source FrameFactory is constructed without metadata set"); + } + mOutputPinIdFrameFactoryMap[me.first].reset(new FrameFactory(metadata, mProps->maxConcurrentFrames)); + } + } + mpCommandFactory.reset(new FrameFactory(mCommandMetadata)); + + mStopCount = 0; + + mFIndexStrategy = FIndexStrategy::create(mProps->fIndexStrategyType); + + return ret; } bool Module::push(frame_container frameContainer) { - mQue->push(frameContainer); - return true; + mQue->push(frameContainer); + return true; } bool Module::push_back(frame_container frameContainer) { - mQue->push_back(frameContainer); - return true; + mQue->push_back(frameContainer); + return true; } bool Module::try_push(frame_container frameContainer) { - auto rc = mQue->try_push(frameContainer); - return rc; + auto rc = mQue->try_push(frameContainer); + return rc; } -frame_container Module::try_pop() -{ - return mQue->try_pop(); -} +frame_container Module::try_pop() { return mQue->try_pop(); } -frame_container Module::pop() -{ - return mQue->pop(); -} +frame_container Module::pop() { return mQue->pop(); } bool Module::isFull() { - bool ret = false; - map> mModules; - for (auto it = mModules.cbegin(); it != mModules.end(); it++) - { - if (it->second->isFull()) - { - ret = true; - break; - } - } + bool ret = false; + map> mModules; + for (auto it = mModules.cbegin(); it != mModules.end(); it++) + { + if (it->second->isFull()) + { + ret = true; + break; + } + } - return ret; + return ret; } bool Module::isNextModuleQueFull() { - bool ret = false; - for (auto it = mModules.cbegin(); it != mModules.end(); it++) - { - if (it->second->mQue->isFull()) - { - auto modID = it->second->myId; - ret = true; - break; - } - } + bool ret = false; + for (auto it = mModules.cbegin(); it != mModules.end(); it++) + { + if (it->second->mQue->isFull()) + { + auto modID = it->second->myId; + ret = true; + break; + } + } - return ret; + return ret; } bool Module::send(frame_container &frames, bool forceBlockingPush) { - // mFindex may be propagated for EOS, EOP, Command, PropsChange also - which is wrong - uint64_t fIndex = 0; - uint64_t timestamp = 0; - if (frames.size() != 0) - { - if (myNature == TRANSFORM && getNumberOfInputPins() == 1) - { - // propagating fIndex2 - auto pinId = getInputMetadata().begin()->first; - if (frames.find(pinId) != frames.end()) - { - auto fIndex2 = frames[pinId]->fIndex2; - for (auto me = mOutputPinIdFrameFactoryMap.cbegin(); me != mOutputPinIdFrameFactoryMap.cend(); me++) - { - if (frames.find(me->first) != frames.end()) - { - frames[me->first]->fIndex2 = fIndex2; - } - } - } - } - - if (myNature != SOURCE) - { - // first input pin - auto pinId = getInputMetadata().begin()->first; - if (frames.find(pinId) != frames.end()) - { - fIndex = frames[pinId]->fIndex; - timestamp = frames[pinId]->timestamp; - } - else - { - // try output pins - muxer comes here - for (auto me = mOutputPinIdFrameFactoryMap.cbegin(); me != mOutputPinIdFrameFactoryMap.cend(); me++) - { - auto &pinId = me->first; - if (frames.find(pinId) != frames.end()) - { - fIndex = frames[pinId]->fIndex; - timestamp = frames[pinId]->timestamp; - break; - } - } - } - } - else - { - // try for all output pins - for (auto me = mOutputPinIdFrameFactoryMap.cbegin(); me != mOutputPinIdFrameFactoryMap.cend(); me++) - { - auto &pinId = me->first; - if (frames.find(pinId) != frames.end()) - { - fIndex = frames[pinId]->fIndex; - timestamp = frames[pinId]->timestamp; - break; - } - } - } - } - - fIndex = mFIndexStrategy->getFIndex(fIndex); - - for (auto it = frames.cbegin(); it != frames.cend(); it++) - { - if (mOutputPinIdFrameFactoryMap.find(it->first) == mOutputPinIdFrameFactoryMap.end()) - { - continue; - } - it->second->fIndex = fIndex; - it->second->timestamp = timestamp; - } - - auto ret = true; - // loop over all the modules and send - for (Connections::const_iterator it = mConnections.begin(); it != mConnections.end(); it++) - { - auto &nextModuleId = it->first; - if (!mRelay[nextModuleId] && !forceBlockingPush) - { - // This is dangerous - the callers may assume that all the frames go through - but since it is relay - they wont go through - // so using forceBlockingPush to open the relay for important messages - // currently only EOS and EOP frames can break the relay - continue; - } - - auto pinsArr = it->second; - frame_container requiredPins; - - for (auto i = pinsArr.begin(); i != pinsArr.end(); i++) - { - auto pinId = *i; - if (frames.find(pinId) == frames.end()) - { - // pinId not found - continue; - } - requiredPins.insert(make_pair(pinId, frames[pinId])); // only required pins map is created - } - - if (requiredPins.size() == 0) - { - // no pins found - continue; - } - - // next module push - if (!forceBlockingPush) - { - //LOG_ERROR << "forceBlocking Push myID" << myId << "sending to <" << nextModuleId; - mQuePushStrategy->push(nextModuleId, requiredPins); - } - else - { - //LOG_ERROR << "normal push myID" << myId << "sending to <" << nextModuleId; - mModules[nextModuleId]->push(requiredPins); - } - } - - return mQuePushStrategy->flush(); + if (myNature == CONTROL) + { + throw AIPException(CTRL_MODULE_INVALID_STATE, "Illegal: Control module can not send data frames."); + } + + // mFindex may be propagated for EOS, EOP, Command, PropsChange also - which is wrong + uint64_t fIndex = 0; + uint64_t timestamp = 0; + if (frames.size() != 0) + { + if (myNature == TRANSFORM && getNumberOfInputPins() == 1) + { + // propagating fIndex2 + auto pinId = getInputMetadata().begin()->first; + if (frames.find(pinId) != frames.end()) + { + auto fIndex2 = frames[pinId]->fIndex2; + for (auto me = mOutputPinIdFrameFactoryMap.cbegin(); me != mOutputPinIdFrameFactoryMap.cend(); me++) + { + if (frames.find(me->first) != frames.end()) + { + frames[me->first]->fIndex2 = fIndex2; + } + } + } + } + + if (myNature != SOURCE) + { + // first input pin + auto pinId = getInputMetadata().begin()->first; + if (frames.find(pinId) != frames.end()) + { + fIndex = frames[pinId]->fIndex; + timestamp = frames[pinId]->timestamp; + } + else + { + // try output pins - muxer comes here + for (auto me = mOutputPinIdFrameFactoryMap.cbegin(); me != mOutputPinIdFrameFactoryMap.cend(); me++) + { + auto &pinId = me->first; + if (frames.find(pinId) != frames.end()) + { + fIndex = frames[pinId]->fIndex; + timestamp = frames[pinId]->timestamp; + break; + } + } + } + } + else + { + // try for all output pins + for (auto me = mOutputPinIdFrameFactoryMap.cbegin(); me != mOutputPinIdFrameFactoryMap.cend(); me++) + { + auto &pinId = me->first; + if (frames.find(pinId) != frames.end()) + { + fIndex = frames[pinId]->fIndex; + timestamp = frames[pinId]->timestamp; + break; + } + } + } + } + + fIndex = mFIndexStrategy->getFIndex(fIndex); + + for (auto it = frames.cbegin(); it != frames.cend(); it++) + { + if (mOutputPinIdFrameFactoryMap.find(it->first) == mOutputPinIdFrameFactoryMap.end()) + { + continue; + } + it->second->fIndex = fIndex; + it->second->timestamp = timestamp; + } + + auto ret = true; + // loop over all the modules and send + for (Connections::const_iterator it = mConnections.begin(); it != mConnections.end(); it++) + { + auto &nextModuleId = it->first; + if (!mRelay[nextModuleId] && !forceBlockingPush) + { + // This is dangerous - the callers may assume that all the frames go through - but since it is relay - they wont go through + // so using forceBlockingPush to open the relay for important messages + // currently only EOS and EOP frames can break the relay + continue; + } + + auto pinsArr = it->second; + frame_container requiredPins; + + for (auto i = pinsArr.begin(); i != pinsArr.end(); i++) + { + auto pinId = *i; + if (frames.find(pinId) == frames.end()) + { + // pinId not found + continue; + } + requiredPins.insert(make_pair(pinId, frames[pinId])); // only required pins map is created + } + + if (requiredPins.size() == 0) + { + // no pins found + continue; + } + + // next module push + if (!forceBlockingPush) + { + // LOG_ERROR << "forceBlocking Push myID" << myId << "sending to <" << nextModuleId; + mQuePushStrategy->push(nextModuleId, requiredPins); + } + else + { + // LOG_ERROR << "normal push myID" << myId << "sending to <" << nextModuleId; + mModules[nextModuleId]->push(requiredPins); + } + } + + return mQuePushStrategy->flush(); } boost_deque Module::getFrames(frame_container &frames) { - boost_deque frames_arr; - for (frame_container::const_iterator it = frames.begin(); it != frames.end(); it++) - { - frames_arr.push_back(it->second); - } + boost_deque frames_arr; + for (frame_container::const_iterator it = frames.begin(); it != frames.end(); + it++) + { + frames_arr.push_back(it->second); + } - return frames_arr; + return frames_arr; } string getPinIdByType(int type, metadata_by_pin &metadataMap) { - pair me; // map element - BOOST_FOREACH (me, metadataMap) - { - if (me.second->getFrameType() == type) - { - return me.first; - } - } + pair me; // map element + BOOST_FOREACH (me, metadataMap) + { + if (me.second->getFrameType() == type) + { + return me.first; + } + } - return ""; + return ""; } string getPinIdByType(int type, framefactory_by_pin &metadataMap) { - pair me; // map element - BOOST_FOREACH (me, metadataMap) - { - if (me.second->getFrameMetadata()->getFrameType() == type) - { - return me.first; - } - } + pair me; // map element + BOOST_FOREACH (me, metadataMap) + { + if (me.second->getFrameMetadata()->getFrameType() == type) + { + return me.first; + } + } - return ""; + return ""; } vector Module::getAllOutputPinsByType(int type) { - vector pins; + vector pins; - pair me; // map element - BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) - { - if (me.second->getFrameMetadata()->getFrameType() == type) - { - pins.push_back(me.first); - } - } + pair me; // map element + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + if (me.second->getFrameMetadata()->getFrameType() == type) + { + pins.push_back(me.first); + } + } - return pins; + return pins; } string Module::getInputPinIdByType(int type) { - return getPinIdByType(type, mInputPinIdMetadataMap); + return getPinIdByType(type, mInputPinIdMetadataMap); } string Module::getOutputPinIdByType(int type) { - return getPinIdByType(type, mOutputPinIdFrameFactoryMap); + return getPinIdByType(type, mOutputPinIdFrameFactoryMap); } framemetadata_sp getMetadataByType(int type, metadata_by_pin &metadataMap) { - pair me; // map element - BOOST_FOREACH (me, metadataMap) - { - if (me.second->getFrameType() == type) - { - return me.second; - } - } + pair me; // map element + BOOST_FOREACH (me, metadataMap) + { + if (me.second->getFrameType() == type) + { + return me.second; + } + } - return framemetadata_sp(); + return framemetadata_sp(); } int getNumberOfPinsByType(int type, metadata_by_pin &metadataMap) { - int count = 0; - pair me; // map element - BOOST_FOREACH (me, metadataMap) - { - if (me.second->getFrameType() == type) - { - count += 1; - } - } + int count = 0; + pair me; // map element + BOOST_FOREACH (me, metadataMap) + { + if (me.second->getFrameType() == type) + { + count += 1; + } + } - return count; + return count; } -// instead of global functions - make a detail class and make these functions public inside detail -framemetadata_sp getMetadataByType(int type, framefactory_by_pin &frameFactoryMap) +// instead of global functions - make a detail class and make these functions +// public inside detail +framemetadata_sp getMetadataByType(int type, + framefactory_by_pin &frameFactoryMap) { - pair me; // map element - BOOST_FOREACH (me, frameFactoryMap) - { - if (me.second->getFrameMetadata()->getFrameType() == type) - { - return me.second->getFrameMetadata(); - } - } + pair me; // map element + BOOST_FOREACH (me, frameFactoryMap) + { + if (me.second->getFrameMetadata()->getFrameType() == type) + { + return me.second->getFrameMetadata(); + } + } - return framemetadata_sp(); + return framemetadata_sp(); } int getNumberOfPinsByType(int type, framefactory_by_pin &frameFactoryMap) { - int count = 0; - pair me; // map element - BOOST_FOREACH (me, frameFactoryMap) - { - if (me.second->getFrameMetadata()->getFrameType() == type) - { - count += 1; - } - } + int count = 0; + pair me; // map element + BOOST_FOREACH (me, frameFactoryMap) + { + if (me.second->getFrameMetadata()->getFrameType() == type) + { + count += 1; + } + } - return count; + return count; } framemetadata_sp Module::getInputMetadataByType(int type) { - return getMetadataByType(type, mInputPinIdMetadataMap); + return getMetadataByType(type, mInputPinIdMetadataMap); } framemetadata_sp Module::getOutputMetadataByType(int type) { - return getMetadataByType(type, mOutputPinIdFrameFactoryMap); + return getMetadataByType(type, mOutputPinIdFrameFactoryMap); } framemetadata_sp Module::getOutputMetadata(string outPinID) { - auto it = mOutputPinIdFrameFactoryMap.find(outPinID); - - if (it == mOutputPinIdFrameFactoryMap.end()) - { - throw AIPException(AIP_FATAL, string("No metadata defined for output pin ")+ outPinID); - } - return it->second->getFrameMetadata(); + auto it = mOutputPinIdFrameFactoryMap.find(outPinID); + + if (it == mOutputPinIdFrameFactoryMap.end()) + { + throw AIPException( + AIP_FATAL, string("No metadata defined for output pin ") + outPinID); + } + return it->second->getFrameMetadata(); } int Module::getNumberOfInputsByType(int type) { - return getNumberOfPinsByType(type, mInputPinIdMetadataMap); + return getNumberOfPinsByType(type, mInputPinIdMetadataMap); } int Module::getNumberOfOutputsByType(int type) { - return getNumberOfPinsByType(type, mOutputPinIdFrameFactoryMap); + return getNumberOfPinsByType(type, mOutputPinIdFrameFactoryMap); } bool Module::isMetadataEmpty(framemetadata_sp &metatata) { - return !metatata.get(); + return !metatata.get(); } -bool Module::isFrameEmpty(frame_sp &frame) -{ - return !frame.get(); -} +bool Module::isFrameEmpty(frame_sp &frame) { return !frame.get(); } frame_sp Module::getFrameByType(frame_container &frames, int frameType) { - // This returns only the first matched frametype - // remmeber the map is ordered by pin ids - for (auto it = frames.cbegin(); it != frames.cend(); it++) - { - auto frame = it->second; - if (frame->getMetadata()->getFrameType() == frameType) - { - return frame; - } - } + // This returns only the first matched frametype + // remmeber the map is ordered by pin ids + for (auto it = frames.cbegin(); it != frames.cend(); it++) + { + auto frame = it->second; + if (frame->getMetadata()->getFrameType() == frameType) + { + return frame; + } + } - return frame_sp(); + return frame_sp(); } frame_sp Module::makeFrame() { - auto size = mOutputPinIdFrameFactoryMap.begin()->second->getFrameMetadata()->getDataSize(); - auto pinId = mOutputPinIdFrameFactoryMap.begin()->first; - return makeFrame(size,pinId); + auto size = mOutputPinIdFrameFactoryMap.begin() + ->second->getFrameMetadata() + ->getDataSize(); + auto pinId = mOutputPinIdFrameFactoryMap.begin()->first; + return makeFrame(size, pinId); } frame_sp Module::makeFrame(size_t size) { - return makeFrame(size, mOutputPinIdFrameFactoryMap.begin()->second); + return makeFrame(size, mOutputPinIdFrameFactoryMap.begin()->second); } frame_sp Module::makeFrame(size_t size, string &pinId) { - return makeFrame(size,mOutputPinIdFrameFactoryMap[pinId]); + return makeFrame(size, mOutputPinIdFrameFactoryMap[pinId]); } -frame_sp Module::makeCommandFrame(size_t size,framemetadata_sp& metadata) +frame_sp Module::makeCommandFrame(size_t size, framemetadata_sp &metadata) { - auto frame = mpCommandFactory->create(size, mpCommandFactory, metadata); - return frame; + auto frame = mpCommandFactory->create(size, mpCommandFactory, metadata); + return frame; } -frame_sp Module::makeFrame(size_t size,framefactory_sp& frameFactory) +frame_sp Module::makeFrame(size_t size, framefactory_sp &frameFactory) { - return frameFactory->create(size, frameFactory); + return frameFactory->create(size, frameFactory); } frame_sp Module::makeFrame(frame_sp &bigFrame, size_t &size, string &pinId) { - return mOutputPinIdFrameFactoryMap[pinId]->create(bigFrame, size, mOutputPinIdFrameFactoryMap[pinId]); -} - -void Module::setMetadata(std::string& pinId, framemetadata_sp& metadata){ - mOutputPinIdFrameFactoryMap[pinId]->setMetadata(metadata); - return; + return mOutputPinIdFrameFactoryMap[pinId]->create( + bigFrame, size, mOutputPinIdFrameFactoryMap[pinId]); } -frame_sp Module::getEOSFrame() +void Module::setMetadata(std::string &pinId, framemetadata_sp &metadata) { - return mpCommandFactory->getEOSFrame(); + mOutputPinIdFrameFactoryMap[pinId]->setMetadata(metadata); + return; } -frame_sp Module::getEmptyFrame() -{ - return mpCommandFactory->getEmptyFrame(); -} +frame_sp Module::getEOSFrame() { return mpCommandFactory->getEOSFrame(); } + +frame_sp Module::getEmptyFrame() { return mpCommandFactory->getEmptyFrame(); } void Module::operator()() { - if (mProps->frameFetchStrategy == ModuleProps::FrameFetchStrategy::PUSH) - { - run(); - } + if (mProps->frameFetchStrategy == ModuleProps::FrameFetchStrategy::PUSH) + { + run(); + } } bool Module::run() { - LOG_INFO << "Starting " << myId << " on " << myThread.get_id(); - mRunning = true; - handlePausePlay(mPlay); - while (mRunning) - { - if (!step()) - { - stop_onStepfail(); - break; - } - } - LOG_INFO << "Ending " << myId << " on " << myThread.get_id(); - term(); //my job is done - return true; + LOG_INFO << "Starting " << myId << " on " << myThread.get_id(); + mRunning = true; + handlePausePlay(mPlay); + while (mRunning) + { + if (!step()) + { + stop_onStepfail(); + break; + } + } + LOG_INFO << "Ending " << myId << " on " << myThread.get_id(); + term(); // my job is done + return true; } bool isMetadatset(metadata_by_pin &metadataMap) { - bool bSet = true; + bool bSet = true; - pair me; // map element - BOOST_FOREACH (me, metadataMap) - { - if (!me.second->isSet()) - { - bSet = false; - break; - } - } + pair me; // map element + BOOST_FOREACH (me, metadataMap) + { + if (!me.second->isSet()) + { + bSet = false; + break; + } + } - return bSet; + return bSet; } bool isMetadatset(framefactory_by_pin &framefactoryMap) { - bool bSet = true; + bool bSet = true; - pair me; // map element - BOOST_FOREACH (me, framefactoryMap) - { - if (!me.second->getFrameMetadata()->isSet()) - { - bSet = false; - break; - } - } + pair me; // map element + BOOST_FOREACH (me, framefactoryMap) + { + if (!me.second->getFrameMetadata()->isSet()) + { + bSet = false; + break; + } + } - return bSet; + return bSet; } bool Module::shouldTriggerSOS() { - if (!isMetadatset(mInputPinIdMetadataMap) || !isMetadatset(mOutputPinIdFrameFactoryMap)) - { - return true; - } + if (!isMetadatset(mInputPinIdMetadataMap) || + !isMetadatset(mOutputPinIdFrameFactoryMap)) + { + return true; + } - return false; + return false; } bool Module::queuePlayPauseCommand(PlayPauseCommand ppCmd, bool priority) { - auto metadata = framemetadata_sp(new PausePlayMetadata()); - auto frame = makeCommandFrame(ppCmd.getSerializeSize(), metadata); - Utils::serialize(ppCmd, frame->data(), ppCmd.getSerializeSize()); - - // add to que - frame_container frames; - frames.insert(make_pair("pause_play", frame)); - if (!priority) - { - if (!Module::try_push(frames)) - { - LOG_ERROR << "failed to push play command to the que"; - return false; - } - } - else - { - Module::push_back(frames); - } - return true; + auto metadata = framemetadata_sp(new PausePlayMetadata()); + auto frame = makeCommandFrame(ppCmd.getSerializeSize(), metadata); + Utils::serialize(ppCmd, frame->data(), ppCmd.getSerializeSize()); + + // add to que + frame_container frames; + frames.insert(make_pair("pause_play", frame)); + if (!priority) + { + if (!Module::try_push(frames)) + { + LOG_ERROR << "failed to push play command to the que"; + return false; + } + } + else + { + Module::push_back(frames); + } + return true; } bool Module::play(bool _play) { - if (_play) - { - return play(1, mDirection); - } + if (_play) + { + return play(1, mDirection); + } - return play(0, mDirection); + return play(0, mDirection); } bool Module::play(float speed, bool direction) { - if (!mRunning) - { - // comes here if module is not running in a thread - // comes here when pipeline is started with run_all_threaded_withpause - return handlePausePlay(speed, direction); - } - PlayPauseCommand ppCmd(speed, direction); - return queuePlayPauseCommand(ppCmd); + if (!mRunning) + { + // comes here if module is not running in a thread + // comes here when pipeline is started with run_all_threaded_withpause + return handlePausePlay(speed, direction); + } + PlayPauseCommand ppCmd(speed, direction); + return queuePlayPauseCommand(ppCmd); } bool Module::queueStep() { - auto cmd = StepCommand(); - return queueCommand(cmd); + auto cmd = StepCommand(); + return queueCommand(cmd); } bool Module::relay(boost::shared_ptr next, bool open, bool priority) { - auto nextModuleId = next->getId(); - if (mModules.find(nextModuleId) == mModules.end()) - { - LOG_ERROR << "<" << getId() << "> Connection for <" << nextModuleId << " > doesn't exist."; - return false; - } + auto nextModuleId = next->getId(); + if (mModules.find(nextModuleId) == mModules.end()) + { + LOG_ERROR << "<" << getId() << "> Connection for <" << nextModuleId + << " > doesn't exist."; + return false; + } + + auto cmd = RelayCommand(nextModuleId, open); + return queueCommand(cmd, priority); +} + +void Module::addControlModule(boost::shared_ptr cModule) +{ + controlModule = cModule; +} + +void Module::registerErrorCallback(APErrorCallback callback) +{ + mErrorCallback = callback; +} - auto cmd = RelayCommand(nextModuleId, open); - return queueCommand(cmd, priority); +void Module::executeErrorCallback(const APErrorObject& _error) +{ + APErrorObject error = _error; + if(mErrorCallback) + { + error.setModuleId(myId); + error.setModuleName(myName); + mErrorCallback(error); + } } void Module::flushQueRecursive() { - flushQue(); + flushQue(); - // recursively call the flushQue for children modules - for (auto it = mModules.begin(); it != mModules.end(); ++it) - { - it->second->flushQueRecursive(); - } + // recursively call the flushQue for children modules + for (auto it = mModules.begin(); it != mModules.end(); ++it) + { + it->second->flushQueRecursive(); + } } void Module::flushQue() { - LOG_INFO << "mQue flushed for <" << myId << ">"; - mQue->flush(); + LOG_INFO << "mQue flushed for <" << myId << ">"; + mQue->flush(); } bool Module::processSourceQue() { - frame_container frames; - while ((frames = mQue->try_pop()).size()) - { - auto it = frames.cbegin(); - while (it != frames.cend()) - { - auto frame = it->second; - auto pinId = it->first; - it++; - - if (frame->isPausePlay()) - { - PlayPauseCommand ppCmd; - getCommand(ppCmd, frame); - handlePausePlay(ppCmd.speed, ppCmd.direction); - } - else if (frame->isPropsChange()) - { - handlePropsChange(frame); - } - else if (frame->isCommand()) - { - auto cmdType = NoneCommand::getCommandType(frame->data(), frame->size()); - handleCommand(cmdType, frame); - } - else if (frame->isEoP()) - { - handleStop(); - return false; - } - else - { - LOG_ERROR << frame->getMetadata()->getFrameType() << "<> not handled"; - } - } - } - - return true; + frame_container frames; + while ((frames = mQue->try_pop()).size()) + { + auto it = frames.cbegin(); + while (it != frames.cend()) + { + auto frame = it->second; + auto pinId = it->first; + it++; + + if (frame->isPausePlay()) + { + PlayPauseCommand ppCmd; + getCommand(ppCmd, frame); + handlePausePlay(ppCmd.speed, ppCmd.direction); + } + else if (frame->isPropsChange()) + { + handlePropsChange(frame); + } + else if (frame->isCommand()) + { + auto cmdType = + NoneCommand::getCommandType(frame->data(), frame->size()); + handleCommand(cmdType, frame); + } + else if (frame->isEoP()) + { + handleStop(); + return false; + } + else + { + LOG_ERROR << frame->getMetadata()->getFrameType() << "<> not handled"; + } + } + } + + return true; } bool Module::handlePausePlay(float speed, bool direction) { - mDirection = direction; - return handlePausePlay(speed > 0); + mDirection = direction; + return handlePausePlay(speed > 0); } bool Module::handlePausePlay(bool play) { - mPlay = play; - notifyPlay(mPlay); - mSpeed = mPlay ? 1 : 0; - return true; + mPlay = play; + notifyPlay(mPlay); + mSpeed = mPlay ? 1 : 0; + return true; } bool Module::step() { - bool ret = false; - if (myNature == SOURCE) - { - if (!processSourceQue()) - { - return true; - } - bool forceStep = shouldForceStep(); - - pacer->start(); - - if (mPlay || forceStep) - { - mProfiler->startPipelineLap(); - ret = produce(); - mProfiler->endLap(0); - } - else - { - ret = true; - // ret false will kill the thread - } - - pacer->end(); - } - else - { - mProfiler->startPipelineLap(); - - //LOG_ERROR << "Module Id is " << Module::getId() << "Module FPS is " << Module::getPipelineFps() << mProps->fps; - auto frames = mQue->pop(); - preProcessNonSource(frames); - - if (frames.size() == 0 || shouldSkip()) - { - // it can come here only if frames.erase from processEOS or processSOS or processEoP or isPropsChange() or isCommand() - return true; - } - - if(mPlay) - { - mProfiler->startProcessingLap(); - ret = stepNonSource(frames); - mProfiler->endLap(mQue->size()); - } - else - { - ret = true; - } - } - - return ret; + bool ret = false; + if (myNature == SOURCE) + { + if (!processSourceQue()) + { + return true; + } + bool forceStep = shouldForceStep(); + + pacer->start(); + + if (mPlay || forceStep) + { + mProfiler->startPipelineLap(); + ret = produce(); + mProfiler->endLap(0); + } + else + { + ret = true; + // ret false will kill the thread + } + + pacer->end(); + } + else if (myNature == CONTROL) + { + auto frames = mQue->pop(); + preProcessControl(frames); + if (frames.size() != 0) + { + throw AIPException(CTRL_MODULE_INVALID_STATE, "Unexpected: " + std::to_string(frames.size()) + " frames remain unprocessed in control module."); + } + } + else + { + mProfiler->startPipelineLap(); + + // LOG_ERROR << "Module Id is " << Module::getId() << "Module FPS is " << Module::getPipelineFps() << mProps->fps; + auto frames = mQue->pop(); + preProcessNonSource(frames); + + if (frames.size() == 0 || shouldSkip()) + { + // it can come here only if frames.erase from processEOS or processSOS or processEoP or isPropsChange() or isCommand() + return true; + } + + if (mPlay) + { + mProfiler->startProcessingLap(); + ret = stepNonSource(frames); + mProfiler->endLap(mQue->size()); + } + else + { + ret = true; + } + } + + return ret; +} + +void Module::registerHealthCallback(APHealthCallback callback) +{ + mHealthCallback = callback; + mProfiler->setHealthCallback(mHealthCallback); } void Module::sendEOS() { - if (myNature == SINK) - { - return; - } - - frame_container frames; - auto frame = frame_sp(new EoSFrame()); - pair me; // map element - BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) - { - frames.insert(make_pair(me.first, frame)); - } - - send(frames, true); -} - -void Module::sendEOS(frame_sp& frame) -{ - if (myNature == SINK) - { - return; - } - frame_container frames; - pair me; // map element - BOOST_FOREACH(me, mOutputPinIdFrameFactoryMap) - { - frames.insert(make_pair(me.first, frame)); - } - - send(frames, true); -} - -void Module::sendMp4ErrorFrame(frame_sp& frame) -{ - if (myNature == SINK) - { - return; - } - - frame_container frames; - pair me; // map element - BOOST_FOREACH(me, mOutputPinIdFrameFactoryMap) { - frames.insert(make_pair(me.first, frame)); - } - - send(frames, true); + if (myNature == SINK || myNature == CONTROL) + { + return; + } + + frame_container frames; + auto frame = frame_sp(new EoSFrame()); + pair me; // map element + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + frames.insert(make_pair(me.first, frame)); + } + + send(frames, true); +} + +void Module::sendEOS(frame_sp &frame) +{ + if (myNature == SINK || myNature == CONTROL) + { + return; + } + frame_container frames; + pair me; // map element + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + frames.insert(make_pair(me.first, frame)); + } + + send(frames, true); +} + +void Module::sendMp4ErrorFrame(frame_sp &frame) +{ + if (myNature == SINK) + { + return; + } + + frame_container frames; + pair me; // map element + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + frames.insert(make_pair(me.first, frame)); + } + + send(frames, true); +} + +bool Module::preProcessControl(frame_container &frames) // ctrl: continue on this. +{ + bool eosEncountered = false; + auto it = frames.cbegin(); + while (it != frames.cend()) + { + // increase the iterator manually + auto frame = it->second; + auto pinId = it->first; + it++; + if (frame->isEOS()) + { + // EOS Strategy + processEOS(pinId); + if (!eosEncountered) + { + sendEOS(); // propagating eosframe with every eos encountered + } + frames.erase(pinId); + eosEncountered = true; + continue; + } + + if (frame->isPropsChange()) + { + if (!handlePropsChange(frame)) + { + throw AIPException(AIP_FATAL, string("Handle PropsChange failed")); + } + frames.erase(pinId); + continue; + } + + if (frame->isEoP()) + { + handleStop(); + frames.erase(pinId); + continue; + } + + if (frame->isCommand()) + { + auto cmdType = NoneCommand::getCommandType(frame->data(), frame->size()); + handleCommand(cmdType, frame); + frames.erase(pinId); + continue; + } + + throw AIPException(CTRL_MODULE_INVALID_STATE, "Unexpected data frame recieved in control module"); + } + + return true; } bool Module::preProcessNonSource(frame_container &frames) { - auto bTriggerSOS = shouldTriggerSOS(); // donot calculate every time - store the state when condition changes - - bool eosEncountered = false; - auto it = frames.cbegin(); - while (it != frames.cend()) - { - // increase the iterator manually - - auto frame = it->second; - auto pinId = it->first; - it++; - if (frame->isEOS()) - { - // EOS Strategy - // should we send all frames at a shot or 1 by 1 ? - processEOS(pinId); - if (!eosEncountered) - { - sendEOS(); // propagating eosframe with every eos encountered - } - frames.erase(pinId); - eosEncountered = true; - continue; - } - - if (frame->isPropsChange()) - { - if (!handlePropsChange(frame)) - { - throw AIPException(AIP_FATAL, string("Handle PropsChange failed")); - } - frames.erase(pinId); - continue; - } - - if (frame->isEoP()) - { - handleStop(); - frames.erase(pinId); - continue; - } - - if (frame->isCommand()) - { - auto cmdType = NoneCommand::getCommandType(frame->data(), frame->size()); - handleCommand(cmdType, frame); - frames.erase(pinId); - continue; - } - - if (!bTriggerSOS) - { - // framemetadata is set. No action required - continue; - } - - // new framemetadata_sp can be created - example JPEGDecoderNVJPEG - mInputPinIdMetadataMap[pinId] = frame->getMetadata(); - if (!processSOS(frame)) - { - // remove frame from frames because it is still not ready to process frames - frames.erase(pinId); - } - else - { - if (myNature == TRANSFORM && !shouldTriggerSOS()) - { - // only if shouldTriggerSOS returns false - pair me; // map element - BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) - { - auto metadata = me.second->getFrameMetadata(); - if (!metadata->isSet()) - { - throw AIPException(AIP_FATAL, getId() + "<>Transform FrameFactory is constructed without metadata set"); - } - mOutputPinIdFrameFactoryMap[me.first].reset(new FrameFactory(metadata, mProps->maxConcurrentFrames)); - } - } - } - // bug: outputmetadata can also be updated ? give a set function - } - - return true; + auto bTriggerSOS = shouldTriggerSOS(); // donot calculate every time - store + // the state when condition changes + + bool eosEncountered = false; + auto it = frames.cbegin(); + while (it != frames.cend()) + { + // increase the iterator manually + + auto frame = it->second; + auto pinId = it->first; + it++; + if (frame->isEOS()) + { + // EOS Strategy + // should we send all frames at a shot or 1 by 1 ? + processEOS(pinId); + if (!eosEncountered) + { + sendEOS(); // propagating eosframe with every eos encountered + } + frames.erase(pinId); + eosEncountered = true; + continue; + } + + if (frame->isPropsChange()) + { + if (!handlePropsChange(frame)) + { + throw AIPException(AIP_FATAL, string("Handle PropsChange failed")); + } + frames.erase(pinId); + continue; + } + + if (frame->isEoP()) + { + handleStop(); + frames.erase(pinId); + continue; + } + + if (frame->isCommand()) + { + auto cmdType = NoneCommand::getCommandType(frame->data(), frame->size()); + handleCommand(cmdType, frame); + frames.erase(pinId); + continue; + } + + if (!bTriggerSOS) + { + // framemetadata is set. No action required + continue; + } + + // new framemetadata_sp can be created - example JPEGDecoderNVJPEG + mInputPinIdMetadataMap[pinId] = frame->getMetadata(); + if (!processSOS(frame)) + { + // remove frame from frames because it is still not ready to process + // frames + frames.erase(pinId); + } + else + { + if (myNature == TRANSFORM && !shouldTriggerSOS()) + { + // only if shouldTriggerSOS returns false + pair me; // map element + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + auto metadata = me.second->getFrameMetadata(); + if (!metadata->isSet()) + { + throw AIPException(AIP_FATAL, + getId() + "<>Transform FrameFactory is " + "constructed without metadata set"); + } + mOutputPinIdFrameFactoryMap[me.first].reset( + new FrameFactory(metadata, mProps->maxConcurrentFrames)); + } + } + } + // bug: outputmetadata can also be updated ? give a set function + } + + return true; } bool Module::stepNonSource(frame_container &frames) { - bool ret = true; - try - { - ret = process(frames); - } - catch (AIP_Exception &) - { - // assuming already logged - } - catch(const std::exception& exception) - { - LOG_FATAL << getId() << "<>" << exception.what(); - } - catch (...) - { - LOG_FATAL << getId() << "<> Unknown exception. Catching throw"; - } - - return ret; + bool ret = true; + try + { + ret = process(frames); + } + catch (AIP_Exception &) + { + // assuming already logged + } + catch (const std::exception &exception) + { + LOG_FATAL << getId() << "<>" << exception.what(); + } + catch (...) + { + LOG_FATAL << getId() << "<> Unknown exception. Catching throw"; + } + + return ret; } bool Module::addEoPFrame(frame_container &frames) { - pair me; // map element - BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) - { - auto frame = frame_sp(new EoPFrame()); - auto metadata = me.second->getFrameMetadata(); - frame->setMetadata(metadata); - frames.insert(make_pair(me.first, frame)); - } - - // if sieve is disabled for atleast one connection - send additional EOP frames - extra EOP frames downstream shouldn't matter - if (mIsSieveDisabledForAny) - { - pair me; // map element - BOOST_FOREACH(me, mInputPinIdMetadataMap) - { - auto frame = frame_sp(new EoPFrame()); - frame->setMetadata(me.second); - frames.insert(make_pair(me.first, frame)); - } - } - return true; + pair me; // map element + BOOST_FOREACH (me, mOutputPinIdFrameFactoryMap) + { + auto frame = frame_sp(new EoPFrame()); + auto metadata = me.second->getFrameMetadata(); + frame->setMetadata(metadata); + frames.insert(make_pair(me.first, frame)); + } + + // if sieve is disabled for atleast one connection - send additional EOP + // frames - extra EOP frames downstream shouldn't matter + if (mIsSieveDisabledForAny) + { + pair me; // map element + BOOST_FOREACH (me, mInputPinIdMetadataMap) + { + auto frame = frame_sp(new EoPFrame()); + frame->setMetadata(me.second); + frames.insert(make_pair(me.first, frame)); + } + } + return true; } bool Module::handleStop() { - // force stop is required - if (mRunning == false) - { - return true; - } - mStopCount++; - if (myNature != SOURCE && mStopCount != mForwardPins) - { - return true; - } - if (myNature != SINK) - { - sendEoPFrame(); - } - mRunning = false; - // if pull and not source - call term - if (mProps->frameFetchStrategy == ModuleProps::FrameFetchStrategy::PULL && myNature != SOURCE) - { - term(); - } - - return true; + // ctrl module - crash or ignore the command or send an CtrlErrorCommandFrame ? + // force stop is required + if (mRunning == false) + { + return true; + } + mStopCount++; + if (myNature != SOURCE && mStopCount != mForwardPins) + { + return true; + } + if (myNature != SINK) + { + sendEoPFrame(); + } + mRunning = false; + // if pull and not source - call term + if (mProps->frameFetchStrategy == ModuleProps::FrameFetchStrategy::PULL && myNature != SOURCE) + { + term(); + } + + return true; } void Module::sendEoPFrame() { - frame_container frames; - addEoPFrame(frames); + frame_container frames; + addEoPFrame(frames); - send(frames, true); + send(frames, true); } bool Module::stop() { - frame_container frames; - addEoPFrame(frames); + frame_container frames; + addEoPFrame(frames); - Module::push(frames); + Module::push(frames); - return true; + return true; } -void Module::adaptQueue(boost::shared_ptr queAdapter) +void Module::adaptQueue( + boost::shared_ptr queAdapter) { - queAdapter->adapt(mQue); - mQue = queAdapter; + queAdapter->adapt(mQue); + mQue = queAdapter; } void Module::ignore(int times) { - static int observed = 0; - observed++; - if (observed >= times && times > 0) - { - LOG_TRACE << "stopping due to step failure "; - observed = 0; - handleStop(); - } + static int observed = 0; + observed++; + if (observed >= times && times > 0) + { + LOG_TRACE << "stopping due to step failure "; + observed = 0; + handleStop(); + } } void Module::stop_onStepfail() { - LOG_ERROR << "Stopping " << myId << " due to step failure "; - handleStop(); + // ctrl - get and print the last command processed which might have caused the error + LOG_ERROR << "Stopping " << myId << " due to step failure "; + handleStop(); } void Module::emit_event(unsigned short eventID) { - if (!event_consumer.empty()) - { - event_consumer(this, eventID); - event_consumer.clear(); // we can only fire once. - } + if (!event_consumer.empty()) + { + event_consumer(this, eventID); + event_consumer.clear(); // we can only fire once. + } } void Module::emit_fatal(unsigned short eventID) { - if (!fatal_event_consumer.empty()) - { - //we have a handler... let's trigger it - fatal_event_consumer(this, eventID); - } - else - { - //we dont have a handler let's kill this thread - std::string msg("Fatal error in module "); - LOG_FATAL << "FATAL error in module : " << myName; - msg += myName; - msg += " Event ID "; - msg += std::to_string(eventID); - throw AIPException(AIP_FATAL, msg); - } + if (!fatal_event_consumer.empty()) + { + // we have a handler... let's trigger it + fatal_event_consumer(this, eventID); + } + else + { + // we dont have a handler let's kill this thread + std::string msg("Fatal error in module "); + LOG_FATAL << "FATAL error in module : " << myName; + msg += myName; + msg += " Event ID "; + msg += std::to_string(eventID); + throw AIPException(AIP_FATAL, msg); + } } -void Module::register_consumer(boost::function consumer, bool bFatal /*= false*/) +void Module::register_consumer( + boost::function consumer, + bool bFatal /*= false*/) { - (bFatal) ? (fatal_event_consumer = consumer) : (event_consumer = consumer); + (bFatal) ? (fatal_event_consumer = consumer) : (event_consumer = consumer); } bool Module::handlePropsChange(frame_sp &frame) { - throw AIPException(AIP_NOTIMPLEMENTED, "Props Change for not implemented"); + throw AIPException(AIP_NOTIMPLEMENTED, "Props Change for not implemented"); } bool Module::handleCommand(Command::CommandType type, frame_sp &frame) { - switch (type) - { - case Command::Relay: - { - RelayCommand cmd; - getCommand(cmd, frame); - - mRelay[cmd.nextModuleId] = cmd.open; - } - break; - case Command::Step: - { - // call step - mForceStepCount++; - } - break; - default: - throw AIPException(AIP_NOTIMPLEMENTED, "Command Handler for <" + to_string(type) + "> not implemented"); - } - - return true; + switch (type) + { + case Command::Relay: + { + RelayCommand cmd; + getCommand(cmd, frame); + + mRelay[cmd.nextModuleId] = cmd.open; + } + break; + case Command::Step: + { + // call step + mForceStepCount++; + } + break; + default: + throw AIPException(AIP_NOTIMPLEMENTED, "Command Handler for <" + + to_string(type) + + "> not implemented"); + } + + return true; } bool Module::shouldForceStep() { - auto forceStep = mForceStepCount > 0; - if (forceStep) - { - mForceStepCount--; - } + auto forceStep = mForceStepCount > 0; + if (forceStep) + { + mForceStepCount--; + } - return forceStep; + return forceStep; } bool Module::shouldSkip() { - if (mProps->skipN == 0) - { - return false; - } + if (mProps->skipN == 0) + { + return false; + } - if (mProps->skipN == mProps->skipD) - { - return true; - } + if (mProps->skipN == mProps->skipD) + { + return true; + } - auto skip = true; + auto skip = true; - if (mSkipIndex <= 0 || mSkipIndex > mProps->skipD) - { - mSkipIndex = mProps->skipD; - } + if (mSkipIndex <= 0 || mSkipIndex > mProps->skipD) + { + mSkipIndex = mProps->skipD; + } - if (mSkipIndex > mProps->skipN) - { - skip = false; - } + if (mSkipIndex > mProps->skipN) + { + skip = false; + } - mSkipIndex--; + mSkipIndex--; - return skip; + return skip; } \ No newline at end of file diff --git a/base/src/Overlay.cpp b/base/src/Overlay.cpp index f8ee8790f..3f459abf0 100644 --- a/base/src/Overlay.cpp +++ b/base/src/Overlay.cpp @@ -1,4 +1,3 @@ -#pragma once #include "Overlay.h" #include "OverlayFactory.h" diff --git a/base/src/PipeLine.cpp b/base/src/PipeLine.cpp index b41f01626..0bc0f9d79 100755 --- a/base/src/PipeLine.cpp +++ b/base/src/PipeLine.cpp @@ -36,7 +36,7 @@ bool PipeLine::addControlModule(boost::shared_ptr cModule) { for (int i = 0; i < modules.size(); i++) { - modules[i]->controlModule = cModule; + modules[i]->addControlModule(cModule); cModule->pipelineModules.push_back(modules[i]); } return true; diff --git a/base/src/SimpleControlModule.cpp b/base/src/SimpleControlModule.cpp new file mode 100644 index 000000000..0960c75c6 --- /dev/null +++ b/base/src/SimpleControlModule.cpp @@ -0,0 +1,29 @@ +#include "SimpleControlModule.h" + +void SimpleControlModule::sendEOS() +{ + return Module::sendEOS(); +} + +void SimpleControlModule::sendEOS(frame_sp& frame) +{ + return Module::sendEOS(frame); +} + +void SimpleControlModule::sendEOPFrame() +{ + return Module::sendEoPFrame(); +} + +// Right Now, Just Logging But Can be used to Do bunch of other things +void SimpleControlModule::handleError(const APErrorObject &error) +{ + LOG_ERROR << "Error in module " << error.getModuleName() << "Module Id" + << error.getModuleId() << " (Code " << error.getErrorCode() + << "): " << error.getErrorMessage(); +} + +void SimpleControlModule::handleHealthCallback(const APHealthObject &healthObj) +{ + LOG_ERROR << "Health Callback from module " << healthObj.getModuleId(); +} diff --git a/base/test/ImageEncodeCV_tests.cpp b/base/test/ImageEncodeCV_tests.cpp index e7f7500f6..f8cec1b93 100755 --- a/base/test/ImageEncodeCV_tests.cpp +++ b/base/test/ImageEncodeCV_tests.cpp @@ -13,6 +13,7 @@ #include "ImageEncoderCV.h" #include "PipeLine.h" #include "StatSink.h" +#include "SimpleControlModule.h" BOOST_AUTO_TEST_SUITE(ImageEncodeCV_tests) @@ -147,7 +148,7 @@ BOOST_AUTO_TEST_CASE(MONO_profile, *boost::unit_test::disabled()) BOOST_AUTO_TEST_CASE(RGB_profile, *boost::unit_test::disabled()) { LoggerProps logprops; - logprops.logLevel = boost::log::trivial::severity_level::info; + logprops.logLevel = boost::log::trivial::severity_level::error; Logger::initLogger(logprops); @@ -159,23 +160,31 @@ BOOST_AUTO_TEST_CASE(RGB_profile, *boost::unit_test::disabled()) auto rawImagePin = fileReader->addOutputPin(metadata); - - auto m2 = boost::shared_ptr(new ImageEncoderCV(ImageEncoderCVProps())); + ImageEncoderCVProps encoderProps; + encoderProps.enableHealthCallBack = true; + encoderProps.healthUpdateIntervalInSec = 10; + auto m2 = boost::shared_ptr(new ImageEncoderCV(encoderProps)); fileReader->setNext(m2); auto outputPinId = m2->getAllOutputPinsByType(FrameMetadata::ENCODED_IMAGE)[0]; - + + auto controlProps = SimpleControlModuleProps(); + boost::shared_ptr mControl = boost::shared_ptr(new SimpleControlModule(controlProps)); StatSinkProps statSinkProps; - statSinkProps.logHealth = true; - statSinkProps.logHealthFrequency = 10; + // statSinkProps.logHealth = true; + // statSinkProps.logHealthFrequency = 10; auto statSink = boost::shared_ptr(new StatSink(statSinkProps)); m2->setNext(statSink); auto p = boost::shared_ptr(new PipeLine("test")); p->appendModule(fileReader); + p->addControlModule(mControl); p->init(); + mControl->init(); + // If you want error callbackand health callback to work with a module, registering it with control is mandatory. + mControl->enrollModule("Encode", m2); p->run_all_threaded(); boost::this_thread::sleep_for(boost::chrono::seconds(3000)); p->stop(); diff --git a/base/test/gtkglrenderer_tests.cpp b/base/test/gtkglrenderer_tests.cpp index ed3281624..c60f4f465 100644 --- a/base/test/gtkglrenderer_tests.cpp +++ b/base/test/gtkglrenderer_tests.cpp @@ -25,6 +25,7 @@ #include "RTSPClientSrc.h" #include "StatSink.h" #include "VirtualCameraSink.h" +#include "SimpleControlModule.h" #include @@ -66,6 +67,34 @@ void secondPipeline() { p.init(); p.run_all_threaded(); } + +// Below Test is added to Give an Idea about How Error Callbacks can be used +boost::shared_ptr launchErrorCallPipeline() { + auto fileReaderProps = FileReaderModuleProps("./data/mono_200x200.raw", 0, -1); + fileReaderProps.readLoop = true; + fileReaderProps.fps = 300; + auto fileReader = boost::shared_ptr(new FileReaderModule(fileReaderProps)); + auto metadata = framemetadata_sp(new RawImageMetadata(200, 200, ImageMetadata::ImageType::MONO, CV_8UC1, 0, + CV_8U, FrameMetadata::HOST, true)); + auto rawImagePin = fileReader->addOutputPin(metadata); + + GtkGlRendererProps gtkglsinkProps(glarea, 1, 1); + GtkGl = boost::shared_ptr(new GtkGlRenderer(gtkglsinkProps)); + fileReader->setNext(GtkGl); + + auto controlProps = SimpleControlModuleProps(); + boost::shared_ptr mControl = boost::shared_ptr(new SimpleControlModule(controlProps)); + + + p.appendModule(fileReader); + p.addControlModule(mControl); + p.init(); + mControl->init(); + mControl->enrollModule("Renderer", GtkGl); + p.run_all_threaded(); + return GtkGl; +} + boost::shared_ptr laucX86Pipeline() { auto fileReaderProps = FileReaderModuleProps("./data/frame_1280x720_rgb.raw", 0, -1); @@ -595,4 +624,44 @@ BOOST_AUTO_TEST_CASE(windowInit2, *boost::unit_test::disabled()) { p.wait_for_all(); } +BOOST_AUTO_TEST_CASE(getErrorCallback, *boost::unit_test::disabled()) { + if (!gtk_init_check(NULL, NULL)) { + fputs("Could not initialize GTK", stderr); + } + GtkBuilder *m_builder = gtk_builder_new(); + if (!m_builder) { + LOG_ERROR << "Builder not found"; + } + gtk_builder_add_from_file(m_builder, "./data/app_ui.glade", NULL); + + window = GTK_WIDGET(gtk_window_new(GTK_WINDOW_TOPLEVEL)); + gtk_window_set_decorated(GTK_WINDOW(window), FALSE); + g_object_ref(window); + gtk_window_set_default_size(GTK_WINDOW(window), 1920, 1080); + gtk_window_set_resizable(GTK_WINDOW(window), FALSE); + gtk_widget_set_app_paintable(window, TRUE); + + do { + gtk_main_iteration(); + } while (gtk_events_pending()); + + GtkWidget *mainFixed = + GTK_WIDGET(gtk_builder_get_object(m_builder, "A_liveScreen")); + gtk_container_add(GTK_CONTAINER(window), mainFixed); + + glarea = GTK_WIDGET(gtk_builder_get_object(m_builder, "glareadraw")); + glAreaSwitch = GTK_WIDGET(gtk_builder_get_object(m_builder, "glareadraw1")); + + g_signal_connect(window, "destroy", G_CALLBACK(gtk_main_quit), NULL); + + launchErrorCallPipeline(); + gtk_widget_show_all(window); + + gtk_main(); + + p.stop(); + p.term(); + p.wait_for_all(); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/base/test/multimediaqueuexform_tests.cpp b/base/test/multimediaqueuexform_tests.cpp index 120c3152b..f4622a359 100644 --- a/base/test/multimediaqueuexform_tests.cpp +++ b/base/test/multimediaqueuexform_tests.cpp @@ -37,7 +37,9 @@ class SinkModule : public Module } protected: - bool process() {}; + bool process() { + return true; + }; bool validateOutputPins() { return true;