Skip to content

Commit

Permalink
adding context + only add working streams
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricve committed Jun 8, 2023
1 parent 0bb31e0 commit f641d7c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 30 deletions.
54 changes: 28 additions & 26 deletions av/avutil/avutil.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package avutil

import (
"io"
"strings"
"fmt"
"bytes"
"github.com/kerberos-io/joy4/av"
"context"
"fmt"
"io"
"net/url"
"os"
"path"
"strings"

"github.com/kerberos-io/joy4/av"
)

type HandlerDemuxer struct {
Expand All @@ -22,7 +24,7 @@ func (self *HandlerDemuxer) Close() error {

type HandlerMuxer struct {
av.Muxer
w io.WriteCloser
w io.WriteCloser
stage int
}

Expand Down Expand Up @@ -54,18 +56,18 @@ func (self *HandlerMuxer) Close() (err error) {
}

type RegisterHandler struct {
Ext string
ReaderDemuxer func(io.Reader)av.Demuxer
WriterMuxer func(io.Writer)av.Muxer
UrlMuxer func(string)(bool,av.MuxCloser,error)
UrlDemuxer func(string)(bool,av.DemuxCloser,error)
UrlReader func(string)(bool,io.ReadCloser,error)
Probe func([]byte)bool
AudioEncoder func(av.CodecType)(av.AudioEncoder,error)
AudioDecoder func(av.AudioCodecData)(av.AudioDecoder,error)
ServerDemuxer func(string)(bool,av.DemuxCloser,error)
ServerMuxer func(string)(bool,av.MuxCloser,error)
CodecTypes []av.CodecType
Ext string
ReaderDemuxer func(io.Reader) av.Demuxer
WriterMuxer func(io.Writer) av.Muxer
UrlMuxer func(string) (bool, av.MuxCloser, error)
UrlDemuxer func(string) (bool, av.DemuxCloser, error)
UrlReader func(string) (bool, io.ReadCloser, error)
Probe func([]byte) bool
AudioEncoder func(av.CodecType) (av.AudioEncoder, error)
AudioDecoder func(av.AudioCodecData) (av.AudioDecoder, error)
ServerDemuxer func(string) (bool, av.DemuxCloser, error)
ServerMuxer func(string) (bool, av.MuxCloser, error)
CodecTypes []av.CodecType
}

type Handlers struct {
Expand All @@ -78,7 +80,7 @@ func (self *Handlers) Add(fn func(*RegisterHandler)) {
self.handlers = append(self.handlers, *handler)
}

func (self *Handlers) openUrl(u *url.URL, uri string) (r io.ReadCloser, err error) {
func (self *Handlers) openUrl(ctx context.Context, u *url.URL, uri string) (r io.ReadCloser, err error) {
if u != nil && u.Scheme != "" {
for _, handler := range self.handlers {
if handler.UrlReader != nil {
Expand Down Expand Up @@ -124,7 +126,7 @@ func (self *Handlers) NewAudioDecoder(codec av.AudioCodecData) (dec av.AudioDeco
return
}

func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) {
func (self *Handlers) Open(ctx context.Context, uri string) (demuxer av.DemuxCloser, err error) {
listen := false
if strings.HasPrefix(uri, "listen:") {
uri = uri[len("listen:"):]
Expand Down Expand Up @@ -162,12 +164,12 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) {
for _, handler := range self.handlers {
if handler.Ext == ext {
if handler.ReaderDemuxer != nil {
if r, err = self.openUrl(u, uri); err != nil {
if r, err = self.openUrl(ctx, u, uri); err != nil {
return
}
demuxer = &HandlerDemuxer{
Demuxer: handler.ReaderDemuxer(r),
r: r,
r: r,
}
return
}
Expand All @@ -176,7 +178,7 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) {
}

var probebuf [1024]byte
if r, err = self.openUrl(u, uri); err != nil {
if r, err = self.openUrl(ctx, u, uri); err != nil {
return
}
if _, err = io.ReadFull(r, probebuf[:]); err != nil {
Expand All @@ -196,7 +198,7 @@ func (self *Handlers) Open(uri string) (demuxer av.DemuxCloser, err error) {
}
demuxer = &HandlerDemuxer{
Demuxer: handler.ReaderDemuxer(_r),
r: r,
r: r,
}
return
}
Expand Down Expand Up @@ -254,7 +256,7 @@ func (self *Handlers) FindCreate(uri string) (handler RegisterHandler, muxer av.
}
muxer = &HandlerMuxer{
Muxer: handler.WriterMuxer(w),
w: w,
w: w,
}
return
}
Expand All @@ -267,8 +269,8 @@ func (self *Handlers) FindCreate(uri string) (handler RegisterHandler, muxer av.

var DefaultHandlers = &Handlers{}

func Open(url string) (demuxer av.DemuxCloser, err error) {
return DefaultHandlers.Open(url)
func Open(ctx context.Context, url string) (demuxer av.DemuxCloser, err error) {
return DefaultHandlers.Open(ctx, url)
}

func Create(url string) (muxer av.MuxCloser, err error) {
Expand Down
12 changes: 8 additions & 4 deletions format/rtsp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/kerberos-io/joy4/format/rtsp/sdp"
"github.com/kerberos-io/joy4/utils/bits/pio"

//"log"
"log"
"net"
"net/textproto"
"net/url"
Expand Down Expand Up @@ -706,9 +706,13 @@ func (self *Client) Describe() (streams []sdp.Media, err error) {
self.streams = []*Stream{}
for _, media := range medias {
stream := &Stream{Sdp: media, client: self}
stream.makeCodecData()
self.streams = append(self.streams, stream)
streams = append(streams, media)
err = stream.makeCodecData()
if err == nil {
self.streams = append(self.streams, stream)
streams = append(streams, media)
} else {
log.Print("Error: ", err.Error())
}
}
self.stage = stageDescribeDone
return
Expand Down

0 comments on commit f641d7c

Please sign in to comment.