diff --git a/machinery/go.mod b/machinery/go.mod index 71cc474..b6f5c2b 100644 --- a/machinery/go.mod +++ b/machinery/go.mod @@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery go 1.19 -//replace github.com/kerberos-io/joy4 v1.0.51 => ../../../../github.com/kerberos-io/joy4 +//replace github.com/kerberos-io/joy4 v1.0.53 => ../../../../github.com/kerberos-io/joy4 //replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif require ( @@ -20,7 +20,7 @@ require ( github.com/golang-module/carbon/v2 v2.2.3 github.com/gorilla/websocket v1.5.0 github.com/kellydunn/golang-geo v0.7.0 - github.com/kerberos-io/joy4 v1.0.53 + github.com/kerberos-io/joy4 v1.0.54 github.com/kerberos-io/onvif v0.0.5 github.com/minio/minio-go/v6 v6.0.57 github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e diff --git a/machinery/go.sum b/machinery/go.sum index 0bc8f4c..af85644 100644 --- a/machinery/go.sum +++ b/machinery/go.sum @@ -175,8 +175,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg= github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU= -github.com/kerberos-io/joy4 v1.0.53 h1:DfVptCUzo/77xLUIwnp1/dbcVffmT0DKPDduQBcu26Y= -github.com/kerberos-io/joy4 v1.0.53/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU= +github.com/kerberos-io/joy4 v1.0.54 h1:Ct4G00sk/iLqm+wLV0gQWDxnKciAnLiTnuxF8hufcsc= +github.com/kerberos-io/joy4 v1.0.54/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU= github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w= github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0= github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs= diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index f0f8928..e0c55f9 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -84,6 +84,10 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi infile, streams, err := capture.OpenRTSP(rtspUrl) var queue *pubsub.Queue + var subQueue *pubsub.Queue + + var decoderMutex sync.Mutex + var subDecoderMutex sync.Mutex status := "not started" @@ -106,8 +110,6 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi // At some routines we will need to decode the image. // Make sure its properly locked as we only have a single decoder. - var decoderMutex sync.Mutex - var subDecoderMutex sync.Mutex decoder := capture.GetVideoDecoder(streams) var subDecoder *ffmpeg.VideoDecoder @@ -136,10 +138,10 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi queue.WriteHeader(streams) // We might have a substream, if so we'll create a seperate queue. - var subQueue *pubsub.Queue if subStreamEnabled { log.Log.Info("RunAgent: Creating sub stream queue with SetMaxGopCount set to " + strconv.Itoa(int(1))) subQueue = pubsub.NewQueue() + communication.SubQueue = subQueue subQueue.SetMaxGopCount(1) subQueue.WriteHeader(subStreams) } @@ -217,24 +219,33 @@ func RunAgent(configuration *models.Configuration, communication *models.Communi infile = nil queue.Close() queue = nil + communication.Queue = nil if subStreamEnabled { subInfile.Close() subInfile = nil subQueue.Close() subQueue = nil + communication.SubQueue = nil } close(communication.HandleONVIF) + communication.HandleONVIF = nil close(communication.HandleLiveHDHandshake) + communication.HandleLiveHDHandshake = nil close(communication.HandleMotion) + communication.HandleMotion = nil + + // Disconnect MQTT routers.DisconnectMQTT(mqttClient, &configuration.Config) // Wait a few seconds to stop the decoder. time.Sleep(time.Second * 3) decoder.Close() decoder = nil + communication.Decoder = nil if subStreamEnabled { subDecoder.Close() subDecoder = nil + communication.SubDecoder = nil } // Waiting for some seconds to make sure everything is properly closed. log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.") diff --git a/machinery/src/models/Communication.go b/machinery/src/models/Communication.go index 041c979..dfe5d8e 100644 --- a/machinery/src/models/Communication.go +++ b/machinery/src/models/Communication.go @@ -28,6 +28,7 @@ type Communication struct { HandleONVIF chan OnvifAction IsConfiguring *abool.AtomicBool Queue *pubsub.Queue + SubQueue *pubsub.Queue DecoderMutex *sync.Mutex SubDecoderMutex *sync.Mutex Decoder *ffmpeg.VideoDecoder