This repository has been archived by the owner on Jan 15, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
controlpoint.go
190 lines (176 loc) · 4.7 KB
/
controlpoint.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// Copyright 2014 Mikio Hara. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package ssdp
import (
"bytes"
"errors"
"log"
"net"
"net/http"
"runtime"
"sync"
"time"
)
// A ControlPoint represents a SSDP control point.
type ControlPoint struct {
// ErrorLog specified an optional logger for errors. If it is
// nil, logging goes to os.Stderr via the log package's
// standard logger.
ErrorLog *log.Logger
conn // network connection endpoint
group *net.UDPAddr // group address
unicast func(net.IP) bool // unicast address filter
mifs []net.Interface // multicast network interfaces
muxmu sync.RWMutex
mux map[*http.Request]chan *http.Response // unicast message mux
}
// ListenControlPoint listens on the UDP network Listener.Group and
// Listener.Port, and returns a control point. If mifs is nil, it
// tries to listen on all available multicast network interfaces.
func (ln *Listener) ListenControlPoint(mifs []net.Interface) (*ControlPoint, error) {
var err error
cp := &ControlPoint{mux: make(map[*http.Request]chan *http.Response)}
if cp.conn, cp.group, err = ln.listen(); err != nil {
return nil, err
}
if cp.group.IP.To4() != nil {
cp.unicast = ipv4Unicast
} else {
cp.unicast = ipv6Unicast
}
if cp.mifs, err = joinGroup(cp.conn, cp.group, mifs, cp.unicast); err != nil {
cp.Close()
return nil, err
}
return cp, nil
}
// Serve starts to handle incoming SSDP messages from SSDP
// devices. The handler must not be nil.
func (cp *ControlPoint) Serve(hdlr http.Handler) error {
if hdlr == nil {
return errors.New("invalid http handler")
}
b := make([]byte, 1280)
for {
n, path, err := cp.readFrom(b)
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
cp.logf("read failed: %v", err)
continue
}
return err
}
if !path.dst.IP.IsMulticast() {
resp, err := parseResponse(b[:n])
if err != nil {
cp.logf("parse response failed: %v", err)
continue
}
cp.muxmu.RLock()
for _, ch := range cp.mux {
ch <- resp
}
cp.muxmu.RUnlock()
continue
}
if !path.dst.IP.Equal(cp.group.IP) {
cp.logf("unknown destination address: %v on %v", path.dst, interfaceByIndex(cp.mifs, path.ifIndex).Name)
continue
}
req, err := parseAdvert(b[:n])
if err != nil {
cp.logf("parse advert failed: %v", err)
continue
}
if req.Method != notifyMethod {
continue
}
resp := newResponseWriter(cp.conn, cp.mifs, cp.group, path, req)
go func() {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
b := make([]byte, size)
b = b[:runtime.Stack(b, false)]
cp.logf("panic serving %v: %v\n%s", resp.path.src, err, b)
}
}()
hdlr.ServeHTTP(resp, req)
}()
}
}
// GroupAddr returns the joined group network address.
func (cp *ControlPoint) GroupAddr() *net.UDPAddr {
return cp.group
}
// Close closes the control point.
func (cp *ControlPoint) Close() error {
for _, ifi := range cp.mifs {
cp.LeaveGroup(&ifi, cp.group)
}
return cp.conn.Close()
}
// Interfaces returns a list of the joined multicast network
// interfaces.
func (cp *ControlPoint) Interfaces() []net.Interface {
return cp.mifs
}
// MSearch issues a M-SEARCH SSDP message, takes a timeout and returns
// a list of responses. Callers should close each http.Response.Body
// when done reading from it. If mifs is nil, it tries to use all
// available multicast network interfaces.
func (cp *ControlPoint) MSearch(hdr http.Header, mifs []net.Interface, tmo time.Duration) ([]*http.Response, error) {
req := newAdvert(msearchMethod, cp.group.String(), hdr)
var buf bytes.Buffer
if err := marshalAdvert(&buf, req); err != nil {
return nil, err
}
mifs, err := interfaces(mifs, cp.unicast)
if err != nil {
return nil, err
}
if _, err := cp.writeToMulti(buf.Bytes(), cp.group, mifs); err != nil {
return nil, err
}
respCh := cp.register(req)
defer cp.deregister(req)
t := time.NewTimer(tmo)
defer t.Stop()
var resps []*http.Response
loop:
for {
select {
case <-t.C:
break loop
case resp := <-respCh:
resps = append(resps, resp)
}
}
return resps, nil
}
func (cp *ControlPoint) register(req *http.Request) chan *http.Response {
cp.muxmu.Lock()
defer cp.muxmu.Unlock()
if ch, ok := cp.mux[req]; ok {
return ch
}
ch := make(chan *http.Response, 1)
cp.mux[req] = ch
return ch
}
func (cp *ControlPoint) deregister(req *http.Request) {
cp.muxmu.Lock()
if ch, ok := cp.mux[req]; ok {
close(ch)
}
delete(cp.mux, req)
cp.muxmu.Unlock()
}
func (cp *ControlPoint) logf(format string, args ...interface{}) {
if cp.ErrorLog != nil {
cp.ErrorLog.Printf(format, args...)
} else {
log.Printf(format, args...)
}
}