-
Notifications
You must be signed in to change notification settings - Fork 1
/
subscription.go
124 lines (106 loc) · 2.93 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package nopfs
import (
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"time"
)
// HTTPSubscriber represents a type that subscribes to a remote URL and appends data to a local file.
type HTTPSubscriber struct {
remoteURL string
localFile string
interval time.Duration
stopChannel chan struct{}
}
// NewHTTPSubscriber creates a new Subscriber instance with the given parameters.
func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HTTPSubscriber, error) {
logger.Infof("Subscribing to remote denylist: %s", remoteURL)
sub := HTTPSubscriber{
remoteURL: remoteURL,
localFile: localFile,
interval: interval,
stopChannel: make(chan struct{}, 1),
}
_, err := os.Stat(localFile)
// if not found, we perform a first sync before returning.
// this is necessary as otherwise the Blocker does not find much
// of the file
if err != nil && errors.Is(err, fs.ErrNotExist) {
logger.Infof("Performing first sync on: %s", localFile)
err := sub.downloadAndAppend()
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}
go sub.subscribe()
return &sub, nil
}
// subscribe starts the subscription process.
func (s *HTTPSubscriber) subscribe() {
timer := time.NewTimer(0)
for {
select {
case <-s.stopChannel:
logger.Infof("Stopping subscription on: %s", s.localFile)
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
err := s.downloadAndAppend()
if err != nil {
logger.Error(err)
}
timer.Reset(s.interval)
}
}
}
// Stop stops the subscription process.
func (s *HTTPSubscriber) Stop() {
close(s.stopChannel)
}
func (s *HTTPSubscriber) downloadAndAppend() error {
localFile, err := os.OpenFile(s.localFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer localFile.Close()
// Get the file size of the local file
localFileInfo, err := localFile.Stat()
if err != nil {
return err
}
localFileSize := localFileInfo.Size()
// Create a HTTP GET request with the Range header to download only the missing bytes
req, err := http.NewRequest("GET", s.remoteURL, nil)
if err != nil {
return err
}
rangeHeader := fmt.Sprintf("bytes=%d-", localFileSize)
req.Header.Set("Range", rangeHeader)
logger.Debug("%s: requesting bytes from %d: %s", s.localFile, localFileSize, req.URL)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
switch {
case resp.StatusCode == http.StatusPartialContent:
_, err = io.Copy(localFile, resp.Body)
if err != nil {
return err
}
logger.Infof("%s: appended %d bytes", s.localFile, resp.ContentLength)
case (resp.StatusCode >= http.StatusBadRequest &&
resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) ||
resp.StatusCode >= http.StatusInternalServerError:
return fmt.Errorf("%s: server returned with unexpected code %d", s.localFile, resp.StatusCode)
// error is ignored, we continued subscribed
}
return nil
}