From ab0256b12de4aa22f489521bcd92efe19abec5c3 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 12:36:10 +0200 Subject: [PATCH 01/15] Readme: explain how to double-hash by hand --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index 1cb8aa9..5b38c9f 100644 --- a/README.md +++ b/README.md @@ -82,6 +82,22 @@ hints: //QmbK7LDv5NNBvYQzNfm2eED17SNLt1yNMapcUhSuNLgkqz ``` +You can create double-hashes by hand with the following command: + +``` +printf "QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768/my/path" \ + | ipfs add --raw-leaves --only-hash --quiet \ + | ipfs cid format -f '%M' -b base58btc +``` + +where: + - `QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768` must always be a + CidV0. If you have a CIDv1 you need to convert it to CIDv0 first. i.e + `ipfs cid format -v0 bafybeihrw75yfhdx5qsqgesdnxejtjybscwuclpusvxkuttep6h7pkgmze` + - `/my/path` is optional depending on whether you want to block a specific path. No wildcards supported here! + - The command above should give `QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8`. Use it as `//QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8` on the denylist. + + ## Kubo plugin NOpfs Kubo plugin pre-built binary releases are available in the From 80a196c41d909c8f9bcff85cee0f49f935cd0884 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 13:28:36 +0200 Subject: [PATCH 02/15] denylist: refactor a bit with functions to readLines --- denylist.go | 152 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 87 insertions(+), 65 deletions(-) diff --git a/denylist.go b/denylist.go index b6ba739..0de7737 100644 --- a/denylist.go +++ b/denylist.go @@ -194,41 +194,21 @@ func (dl *Denylist) parseAndFollow(follow bool) error { return err } - // we will update N as we go after every line. - // Fixme: this is going to play weird as the buffered reader will - // read-ahead and consume N. + // we will update N as we go after every line. Fixme: this is + // going to play weird as the buffered reader will read-ahead + // and consume N. limRdr := &io.LimitedReader{ R: dl.f, N: maxLineSize, } r := bufio.NewReader(limRdr) - lineNumber := dl.Header.headerLines - for { - line, err := r.ReadString('\n') - // limit reader exhausted - if err == io.EOF && len(line) >= maxLineSize { - err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) - logger.Error(err) - dl.Close() - return err - } - // keep waiting for data - if err == io.EOF { - break - } - if err != nil { - logger.Error(err) - return err - } - - lineNumber++ - if err := dl.parseLine(line, lineNumber); err != nil { - logger.Error(err) - } - limRdr.N = maxLineSize // reset + lineNumber, err := dl.readLines(r, limRdr, lineNumber) + if err != nil { + return err } + // we finished reading the file as it EOF'ed. if !follow { return nil @@ -261,55 +241,97 @@ func (dl *Denylist) parseAndFollow(follow bool) error { } } - // Is this the right way of tailing a file? Pretty sure there are a - // bunch of gotchas. It seems to work when saving on top of a file - // though. Also, important that the limitedReader is there to avoid - // parsing a huge lines. Also, this could be done by just having - // watchers on the folder, but requires a small refactoring. - go func() { - line := "" + go dl.followLines(r, limRdr, lineNumber, waitForWrite) + return nil +} + +// readLines reads lines from a buffered reader on top of a limited reader, +// that we reset on every line. This enforces line-length limits. +func (dl *Denylist) readLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64) (uint64, error) { + for { + line, err := r.ReadString('\n') + // limit reader exhausted + if err == io.EOF && len(line) >= maxLineSize { + err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) + logger.Error(err) + dl.Close() + return lineNumber, err + } + // keep waiting for data + if err == io.EOF { + break + } + if err != nil { + logger.Error(err) + return lineNumber, err + } + + lineNumber++ + if err := dl.parseLine(line, lineNumber); err != nil { + logger.Error(err) + } limRdr.N = maxLineSize // reset - for { - partialLine, err := r.ReadString('\n') - line += partialLine + } + return lineNumber, nil +} - // limit reader exhausted - if err == io.EOF && limRdr.N == 0 { - err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) - logger.Error(err) - dl.Close() - return - } - // keep waiting for data - if err == io.EOF { - err := waitForWrite() - if err != nil { - logger.Error(err) - dl.Close() - return - } - continue - } +// Follow lines keeps reading lines as we get notified that there are new +// writes to the file. +// Is this the right way of tailing a file? Pretty sure there are a +// bunch of gotchas. It seems to work when saving on top of a file +// though. Also, important that the limitedReader is there to avoid +// parsing a huge lines. Also, this could be done by just having +// watchers on the folder, but requires a small refactoring. +func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64, waitWrite func() error) { + line := "" + limRdr.N = maxLineSize // reset + + for { + partialLine, err := r.ReadString('\n') + + // limit reader exhausted + if err == io.EOF && limRdr.N == 0 { + err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) + logger.Error(err) + dl.Close() + return + } + + // Record how much of a line we have + line += partialLine + + // keep waiting for data + if err == io.EOF { + // We read a full line or nothing and are waiting. + err := waitWrite() if err != nil { logger.Error(err) dl.Close() return } + continue + } + if err != nil { + logger.Error(err) + dl.Close() + return + } - lineNumber++ - // we have read up to \n - if err := dl.parseLine(line, lineNumber); err != nil { - logger.Error(err) - // log error and continue with next line + // if we are here, no EOF, no error and ReadString() + // found an \n so we have a full line. + + lineNumber++ + // we have read up to \n + if err := dl.parseLine(line, lineNumber); err != nil { + logger.Error(err) + // log error and continue with next line - } - // reset for next line - line = "" - limRdr.N = 2 << 20 // reset } - }() - return nil + // reset for next line + line = "" + limRdr.N = 2 << 20 // reset + } } // parseLine processes every full-line read and puts it into the BlocksDB etc. From 1ae47f603e48a0d79d7d6e9ea374772b34af39a2 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 14:15:07 +0200 Subject: [PATCH 03/15] More refactoring: use single functions for lines-parsing --- denylist.go | 69 +++++++++++++++-------------------------------------- 1 file changed, 19 insertions(+), 50 deletions(-) diff --git a/denylist.go b/denylist.go index 0de7737..e92d2f7 100644 --- a/denylist.go +++ b/denylist.go @@ -204,14 +204,9 @@ func (dl *Denylist) parseAndFollow(follow bool) error { r := bufio.NewReader(limRdr) lineNumber := dl.Header.headerLines - lineNumber, err := dl.readLines(r, limRdr, lineNumber) - if err != nil { - return err - } - // we finished reading the file as it EOF'ed. if !follow { - return nil + return dl.followLines(r, limRdr, lineNumber, nil) } // We now wait for new lines. @@ -245,45 +240,16 @@ func (dl *Denylist) parseAndFollow(follow bool) error { return nil } -// readLines reads lines from a buffered reader on top of a limited reader, +// followLines reads lines from a buffered reader on top of a limited reader, // that we reset on every line. This enforces line-length limits. -func (dl *Denylist) readLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64) (uint64, error) { - for { - line, err := r.ReadString('\n') - // limit reader exhausted - if err == io.EOF && len(line) >= maxLineSize { - err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) - logger.Error(err) - dl.Close() - return lineNumber, err - } - // keep waiting for data - if err == io.EOF { - break - } - if err != nil { - logger.Error(err) - return lineNumber, err - } - - lineNumber++ - if err := dl.parseLine(line, lineNumber); err != nil { - logger.Error(err) - } - limRdr.N = maxLineSize // reset - - } - return lineNumber, nil -} - -// Follow lines keeps reading lines as we get notified that there are new -// writes to the file. +// If we pass a waitWrite() function, then it waits when finding EOF. +// // Is this the right way of tailing a file? Pretty sure there are a // bunch of gotchas. It seems to work when saving on top of a file // though. Also, important that the limitedReader is there to avoid // parsing a huge lines. Also, this could be done by just having // watchers on the folder, but requires a small refactoring. -func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64, waitWrite func() error) { +func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64, waitWrite func() error) error { line := "" limRdr.N = maxLineSize // reset @@ -295,27 +261,29 @@ func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineN err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) logger.Error(err) dl.Close() - return + return err } // Record how much of a line we have line += partialLine - // keep waiting for data if err == io.EOF { - // We read a full line or nothing and are waiting. - err := waitWrite() - if err != nil { - logger.Error(err) - dl.Close() - return + if waitWrite != nil { // keep waiting + err := waitWrite() + if err != nil { + logger.Error(err) + dl.Close() + return err + } + continue + } else { // Finished + return nil } - continue } if err != nil { logger.Error(err) dl.Close() - return + return err } // if we are here, no EOF, no error and ReadString() @@ -330,8 +298,9 @@ func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineN } // reset for next line line = "" - limRdr.N = 2 << 20 // reset + limRdr.N = maxLineSize // reset } + return nil } // parseLine processes every full-line read and puts it into the BlocksDB etc. From eeab36eb912ea51ef0c6c122c7fcffdd86b33eca Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 14:15:42 +0200 Subject: [PATCH 04/15] Add HTTPSubscription client which subscribes to a denylist. --- cmd/httpsubs/main.go | 31 +++++++++++++++ subscription.go | 95 ++++++++++++++++++++++++++++++++++++++++++++ subscription_test.go | 46 +++++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 cmd/httpsubs/main.go create mode 100644 subscription.go create mode 100644 subscription_test.go diff --git a/cmd/httpsubs/main.go b/cmd/httpsubs/main.go new file mode 100644 index 0000000..976748a --- /dev/null +++ b/cmd/httpsubs/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "time" + + "github.com/ipfs-shipyard/nopfs" +) + +func main() { + if len(os.Args) != 3 { + fmt.Fprintln(os.Stderr, "Usage: program ") + os.Exit(1) + } + + local := os.Args[1] + remote := os.Args[2] + + fmt.Printf("%s: subscribed to %s. CTRL-C to stop\n", local, remote) + + subscriber := nopfs.NewHTTPSubscriber(remote, local, 1*time.Minute) + go subscriber.Subscribe() + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + fmt.Println("Stopping") + subscriber.Stop() +} diff --git a/subscription.go b/subscription.go new file mode 100644 index 0000000..af5b8a0 --- /dev/null +++ b/subscription.go @@ -0,0 +1,95 @@ +package nopfs + +import ( + "fmt" + "io" + "net/http" + "os" + "time" +) + +// Subscriber represents a type that subscribes to a remote URL and appends data to a local file. +type HTTPSubscriber struct { + RemoteURL string + LocalFile string + Interval time.Duration + stopChannel chan struct{} +} + +// NewSubscriber creates a new Subscriber instance with the given parameters. +func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) *HTTPSubscriber { + return &HTTPSubscriber{ + RemoteURL: remoteURL, + LocalFile: localFile, + Interval: interval, + stopChannel: make(chan struct{}), + } +} + +// Subscribe starts the subscription process. +func (s *HTTPSubscriber) Subscribe() { + timer := time.NewTimer(0) + + for { + select { + case <-s.stopChannel: + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + s.downloadAndAppend() + timer.Reset(s.Interval) + } + } +} + +// Stop stops the subscription process. +func (s *HTTPSubscriber) Stop() { + s.stopChannel <- struct{}{} +} + +func (s *HTTPSubscriber) downloadAndAppend() { + localFile, err := os.OpenFile(s.LocalFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + logger.Error(err) + } + defer localFile.Close() + + // Get the file size of the local file + localFileInfo, err := localFile.Stat() + if err != nil { + logger.Error(err) + } + + localFileSize := localFileInfo.Size() + + // Create a HTTP GET request with the Range header to download only the missing bytes + req, err := http.NewRequest("GET", s.RemoteURL, nil) + if err != nil { + logger.Error(err) + } + + rangeHeader := fmt.Sprintf("bytes=%d-", localFileSize) + req.Header.Set("Range", rangeHeader) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + logger.Error(err) + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusPartialContent: + _, err = io.Copy(localFile, resp.Body) + if err != nil { + logger.Error(err) + } + logger.Infof("%s: appended %d bytes", s.LocalFile, resp.ContentLength) + case (resp.StatusCode >= http.StatusBadRequest && + resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) || + resp.StatusCode >= http.StatusInternalServerError: + logger.Errorf("%s: server returned with unexpected code %s", resp.StatusCode) + // error is ignored, we continued subscribed + } +} diff --git a/subscription_test.go b/subscription_test.go new file mode 100644 index 0000000..4170001 --- /dev/null +++ b/subscription_test.go @@ -0,0 +1,46 @@ +package nopfs + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +func createTestServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulate serving a file from the server + data := []byte("This is the remote content.") + w.Header().Set("Content-Range", fmt.Sprintf("bytes 0-%d/%d", len(data)-1, len(data))) + w.Header().Set("Content-Length", fmt.Sprint(len(data))) + w.WriteHeader(http.StatusPartialContent) + _, _ = w.Write(data) + })) +} + +func TestHTTPSubscriber(t *testing.T) { + remoteServer := createTestServer() + defer remoteServer.Close() + + localFile := "test-local-file.txt" + defer os.Remove(localFile) + + subscriber := NewHTTPSubscriber(remoteServer.URL, localFile, 500*time.Millisecond) + go subscriber.Subscribe() + + // Allow some time for subscription to run + time.Sleep(time.Second) + subscriber.Stop() + + localFileContent, err := os.ReadFile(localFile) + if err != nil { + t.Errorf("Error reading local file: %v", err) + } + + expectedContent := "This is the remote content." + if string(localFileContent) != expectedContent { + t.Errorf("Local file content is incorrect. Got: %s, Expected: %s", string(localFileContent), expectedContent) + } +} From ac39a6d17365d4c20fc5daf3db989cf51bb08fe3 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 15:48:37 +0200 Subject: [PATCH 05/15] Make subscription tests more robust. Fix some issues --- subscription.go | 2 +- subscription_test.go | 49 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/subscription.go b/subscription.go index af5b8a0..6526b29 100644 --- a/subscription.go +++ b/subscription.go @@ -89,7 +89,7 @@ func (s *HTTPSubscriber) downloadAndAppend() { case (resp.StatusCode >= http.StatusBadRequest && resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) || resp.StatusCode >= http.StatusInternalServerError: - logger.Errorf("%s: server returned with unexpected code %s", resp.StatusCode) + logger.Errorf("%s: server returned with unexpected code %d", s.LocalFile, resp.StatusCode) // error is ignored, we continued subscribed } } diff --git a/subscription_test.go b/subscription_test.go index 4170001..a18e462 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -13,10 +13,49 @@ func createTestServer() *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Simulate serving a file from the server data := []byte("This is the remote content.") - w.Header().Set("Content-Range", fmt.Sprintf("bytes 0-%d/%d", len(data)-1, len(data))) - w.Header().Set("Content-Length", fmt.Sprint(len(data))) - w.WriteHeader(http.StatusPartialContent) - _, _ = w.Write(data) + contentLength := len(data) + + // Get the "Range" header from the request + rangeHeader := r.Header.Get("Range") + + if rangeHeader != "" { + var start, end int + + // Check for a range request in the format "bytes %d-" + if n, _ := fmt.Sscanf(rangeHeader, "bytes=%d-", &start); n == 1 { + // Handle open end range requests + if start >= contentLength { + http.Error(w, "Invalid Range header", http.StatusRequestedRangeNotSatisfiable) + return + } + end = contentLength - 1 + } else if n, _ := fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end); n == 2 { + // Check for valid byte range + if start < 0 || end >= contentLength || start > end { + http.Error(w, "Invalid Range header", http.StatusRequestedRangeNotSatisfiable) + return + } + } else { + http.Error(w, "Invalid Range header", http.StatusBadRequest) + return + } + + // Calculate the content range and length for the response + contentRange := fmt.Sprintf("bytes %d-%d/%d", start, end, contentLength) + w.Header().Set("Content-Range", contentRange) + w.Header().Set("Content-Length", fmt.Sprint(end-start+1)) + w.WriteHeader(http.StatusPartialContent) + + // Write the selected byte range to the response + _, _ = w.Write(data[start : end+1]) + } else { + // If no "Range" header, serve the entire content + w.Header().Set("Content-Range", fmt.Sprintf("bytes 0-%d/%d", contentLength-1, contentLength)) + w.Header().Set("Content-Length", fmt.Sprint(contentLength)) + w.WriteHeader(http.StatusPartialContent) + + _, _ = w.Write(data) + } })) } @@ -31,7 +70,7 @@ func TestHTTPSubscriber(t *testing.T) { go subscriber.Subscribe() // Allow some time for subscription to run - time.Sleep(time.Second) + time.Sleep(2 * time.Second) subscriber.Stop() localFileContent, err := os.ReadFile(localFile) From 5bd4fc22790391b0263486f3a97e1149731f5365 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 15:48:51 +0200 Subject: [PATCH 06/15] Link to denyli.st --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5b38c9f..77d6f9a 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ content blocking to the go-ipfs stack and particularly to Kubo. ## Content-blocking in Kubo 1. Grab a plugin release from the [releases](https://github.com/ipfs-shipyard/nopfs/releases) section matching your Kubo version and install the plugin file in `~/.ipfs/plugins`. - 2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download the [BadBits denylist](https://badbits.dwebops.pub/badbits.deny) and place them in `~/.config/ipfs/denylists/`. + 2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download one of the supported denylists from [Denyli.st](https://denyli.st) and place them in `~/.config/ipfs/denylists/` (ensure `.deny` extension). 3. Start Kubo (`ipfs daemon`). The plugin should be loaded automatically and existing denylists tracked for updates from that point (no restarts required). ## Denylist syntax From 28abafa61de3b1e348c147ca7411e278bcb887ee Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 15:51:31 +0200 Subject: [PATCH 07/15] Fix godoc strings --- subscription.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/subscription.go b/subscription.go index 6526b29..7be1dfe 100644 --- a/subscription.go +++ b/subscription.go @@ -8,7 +8,7 @@ import ( "time" ) -// Subscriber represents a type that subscribes to a remote URL and appends data to a local file. +// HTTPSubscriber represents a type that subscribes to a remote URL and appends data to a local file. type HTTPSubscriber struct { RemoteURL string LocalFile string @@ -16,7 +16,7 @@ type HTTPSubscriber struct { stopChannel chan struct{} } -// NewSubscriber creates a new Subscriber instance with the given parameters. +// NewHTTPSubscriber creates a new Subscriber instance with the given parameters. func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) *HTTPSubscriber { return &HTTPSubscriber{ RemoteURL: remoteURL, From 853ce79d9b97343db0d7c09456900d097585c551 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 16 Oct 2023 16:59:30 +0200 Subject: [PATCH 08/15] Allow to stop subscription --- subscription.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subscription.go b/subscription.go index 7be1dfe..4cbdb06 100644 --- a/subscription.go +++ b/subscription.go @@ -22,7 +22,7 @@ func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) *HTT RemoteURL: remoteURL, LocalFile: localFile, Interval: interval, - stopChannel: make(chan struct{}), + stopChannel: make(chan struct{}, 1), } } From 8e447c27631ba7875b73c8507bd876a0cff27189 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 17 Oct 2023 20:00:02 +0200 Subject: [PATCH 09/15] Abort subscriber early if file cannot be created or written to --- go.sum | 2 -- subscription.go | 11 +++++++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/go.sum b/go.sum index 6efff9f..b80ea0e 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= -github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI= -github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk= github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3 h1:sgrhALL6mBoZsNvJ2zUcITcN6IW3y14ej6w7gv5RcOI= github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= diff --git a/subscription.go b/subscription.go index 4cbdb06..aea6ea6 100644 --- a/subscription.go +++ b/subscription.go @@ -17,13 +17,20 @@ type HTTPSubscriber struct { } // NewHTTPSubscriber creates a new Subscriber instance with the given parameters. -func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) *HTTPSubscriber { +func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HTTPSubscriber, error) { + logger.Infof("Subscribing to remote denylist: %s", remoteURL) + f, err := os.OpenFile(localFile, os.O_CREATE, 0644) + if err != nil { + return nil, err + } + defer f.Close() + return &HTTPSubscriber{ RemoteURL: remoteURL, LocalFile: localFile, Interval: interval, stopChannel: make(chan struct{}, 1), - } + }, nil } // Subscribe starts the subscription process. From 074a4f2b8774640183493b270ebd9f2036939f40 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 17 Oct 2023 20:11:00 +0200 Subject: [PATCH 10/15] subs: Be more verbose. Reduce exported surface --- cmd/httpsubs/main.go | 6 +++++- subscription.go | 23 ++++++++++++----------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/cmd/httpsubs/main.go b/cmd/httpsubs/main.go index 976748a..74b9d52 100644 --- a/cmd/httpsubs/main.go +++ b/cmd/httpsubs/main.go @@ -20,7 +20,11 @@ func main() { fmt.Printf("%s: subscribed to %s. CTRL-C to stop\n", local, remote) - subscriber := nopfs.NewHTTPSubscriber(remote, local, 1*time.Minute) + subscriber, err := nopfs.NewHTTPSubscriber(remote, local, 1*time.Minute) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } go subscriber.Subscribe() c := make(chan os.Signal, 1) diff --git a/subscription.go b/subscription.go index aea6ea6..1d565a8 100644 --- a/subscription.go +++ b/subscription.go @@ -10,9 +10,9 @@ import ( // HTTPSubscriber represents a type that subscribes to a remote URL and appends data to a local file. type HTTPSubscriber struct { - RemoteURL string - LocalFile string - Interval time.Duration + remoteURL string + localFile string + interval time.Duration stopChannel chan struct{} } @@ -26,9 +26,9 @@ func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HT defer f.Close() return &HTTPSubscriber{ - RemoteURL: remoteURL, - LocalFile: localFile, - Interval: interval, + remoteURL: remoteURL, + localFile: localFile, + interval: interval, stopChannel: make(chan struct{}, 1), }, nil } @@ -40,13 +40,14 @@ func (s *HTTPSubscriber) Subscribe() { for { select { case <-s.stopChannel: + logger.Infof("Stopping subscription on: %s", s.localFile) if !timer.Stop() { <-timer.C } return case <-timer.C: s.downloadAndAppend() - timer.Reset(s.Interval) + timer.Reset(s.interval) } } } @@ -57,7 +58,7 @@ func (s *HTTPSubscriber) Stop() { } func (s *HTTPSubscriber) downloadAndAppend() { - localFile, err := os.OpenFile(s.LocalFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + localFile, err := os.OpenFile(s.localFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { logger.Error(err) } @@ -72,7 +73,7 @@ func (s *HTTPSubscriber) downloadAndAppend() { localFileSize := localFileInfo.Size() // Create a HTTP GET request with the Range header to download only the missing bytes - req, err := http.NewRequest("GET", s.RemoteURL, nil) + req, err := http.NewRequest("GET", s.remoteURL, nil) if err != nil { logger.Error(err) } @@ -92,11 +93,11 @@ func (s *HTTPSubscriber) downloadAndAppend() { if err != nil { logger.Error(err) } - logger.Infof("%s: appended %d bytes", s.LocalFile, resp.ContentLength) + logger.Infof("%s: appended %d bytes", s.localFile, resp.ContentLength) case (resp.StatusCode >= http.StatusBadRequest && resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) || resp.StatusCode >= http.StatusInternalServerError: - logger.Errorf("%s: server returned with unexpected code %d", s.LocalFile, resp.StatusCode) + logger.Errorf("%s: server returned with unexpected code %d", s.localFile, resp.StatusCode) // error is ignored, we continued subscribed } } From 5b5f00fdedaafc4d1e1321d617b359af01809c89 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 17 Oct 2023 20:21:08 +0200 Subject: [PATCH 11/15] Download files first on subscribe when they don't exist --- subscription.go | 48 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 15 deletions(-) diff --git a/subscription.go b/subscription.go index 1d565a8..2bc0097 100644 --- a/subscription.go +++ b/subscription.go @@ -1,8 +1,10 @@ package nopfs import ( + "errors" "fmt" "io" + "io/fs" "net/http" "os" "time" @@ -19,18 +21,30 @@ type HTTPSubscriber struct { // NewHTTPSubscriber creates a new Subscriber instance with the given parameters. func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HTTPSubscriber, error) { logger.Infof("Subscribing to remote denylist: %s", remoteURL) - f, err := os.OpenFile(localFile, os.O_CREATE, 0644) - if err != nil { - return nil, err - } - defer f.Close() - return &HTTPSubscriber{ + sub := HTTPSubscriber{ remoteURL: remoteURL, localFile: localFile, interval: interval, stopChannel: make(chan struct{}, 1), - }, nil + } + + _, err := os.Stat(localFile) + // if not found, we perform a first sync before returning. + // this is necessary as otherwise the Blocker does not find much + // of the file + if err != nil && errors.Is(err, fs.ErrNotExist) { + logger.Infof("Performing first sync on: %s", localFile) + err := sub.downloadAndAppend() + if err != nil { + return nil, err + } + } + if err != nil { + return nil, err + } + + return &sub, nil } // Subscribe starts the subscription process. @@ -46,7 +60,10 @@ func (s *HTTPSubscriber) Subscribe() { } return case <-timer.C: - s.downloadAndAppend() + err := s.downloadAndAppend() + if err != nil { + logger.Error(err) + } timer.Reset(s.interval) } } @@ -57,17 +74,17 @@ func (s *HTTPSubscriber) Stop() { s.stopChannel <- struct{}{} } -func (s *HTTPSubscriber) downloadAndAppend() { +func (s *HTTPSubscriber) downloadAndAppend() error { localFile, err := os.OpenFile(s.localFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { - logger.Error(err) + return err } defer localFile.Close() // Get the file size of the local file localFileInfo, err := localFile.Stat() if err != nil { - logger.Error(err) + return err } localFileSize := localFileInfo.Size() @@ -75,7 +92,7 @@ func (s *HTTPSubscriber) downloadAndAppend() { // Create a HTTP GET request with the Range header to download only the missing bytes req, err := http.NewRequest("GET", s.remoteURL, nil) if err != nil { - logger.Error(err) + return err } rangeHeader := fmt.Sprintf("bytes=%d-", localFileSize) @@ -83,7 +100,7 @@ func (s *HTTPSubscriber) downloadAndAppend() { resp, err := http.DefaultClient.Do(req) if err != nil { - logger.Error(err) + return err } defer resp.Body.Close() @@ -91,13 +108,14 @@ func (s *HTTPSubscriber) downloadAndAppend() { case resp.StatusCode == http.StatusPartialContent: _, err = io.Copy(localFile, resp.Body) if err != nil { - logger.Error(err) + return err } logger.Infof("%s: appended %d bytes", s.localFile, resp.ContentLength) case (resp.StatusCode >= http.StatusBadRequest && resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) || resp.StatusCode >= http.StatusInternalServerError: - logger.Errorf("%s: server returned with unexpected code %d", s.localFile, resp.StatusCode) + return fmt.Errorf("%s: server returned with unexpected code %d", s.localFile, resp.StatusCode) // error is ignored, we continued subscribed } + return nil } From ec23b9797554e37811218aa3446781b4a92ca3f3 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 17 Oct 2023 20:26:16 +0200 Subject: [PATCH 12/15] Fix subscriber logic --- subscription.go | 3 +-- subscription_test.go | 5 ++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/subscription.go b/subscription.go index 2bc0097..184c331 100644 --- a/subscription.go +++ b/subscription.go @@ -39,8 +39,7 @@ func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HT if err != nil { return nil, err } - } - if err != nil { + } else if err != nil { return nil, err } diff --git a/subscription_test.go b/subscription_test.go index a18e462..e3b24ca 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -66,7 +66,10 @@ func TestHTTPSubscriber(t *testing.T) { localFile := "test-local-file.txt" defer os.Remove(localFile) - subscriber := NewHTTPSubscriber(remoteServer.URL, localFile, 500*time.Millisecond) + subscriber, err := NewHTTPSubscriber(remoteServer.URL, localFile, 500*time.Millisecond) + if err != nil { + t.Fatal(err) + } go subscriber.Subscribe() // Allow some time for subscription to run From 752624cf82bef14043e61c08ac1352e1dd6b59d7 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 17 Oct 2023 23:01:46 +0200 Subject: [PATCH 13/15] More logging --- subscription.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/subscription.go b/subscription.go index 184c331..e226d3b 100644 --- a/subscription.go +++ b/subscription.go @@ -70,7 +70,7 @@ func (s *HTTPSubscriber) Subscribe() { // Stop stops the subscription process. func (s *HTTPSubscriber) Stop() { - s.stopChannel <- struct{}{} + close(s.stopChannel) } func (s *HTTPSubscriber) downloadAndAppend() error { @@ -97,6 +97,8 @@ func (s *HTTPSubscriber) downloadAndAppend() error { rangeHeader := fmt.Sprintf("bytes=%d-", localFileSize) req.Header.Set("Range", rangeHeader) + logger.Debug("%s: requesting bytes from %d: %s", s.localFile, localFileSize, req.URL) + resp, err := http.DefaultClient.Do(req) if err != nil { return err From 6dcea67b3d0ada2b6ec1dc799ff761eedfd71892 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Tue, 17 Oct 2023 23:18:57 +0200 Subject: [PATCH 14/15] Subscribe automatically --- cmd/httpsubs/main.go | 1 - subscription.go | 6 ++++-- subscription_test.go | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/httpsubs/main.go b/cmd/httpsubs/main.go index 74b9d52..158c88e 100644 --- a/cmd/httpsubs/main.go +++ b/cmd/httpsubs/main.go @@ -25,7 +25,6 @@ func main() { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - go subscriber.Subscribe() c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) diff --git a/subscription.go b/subscription.go index e226d3b..b2ccea1 100644 --- a/subscription.go +++ b/subscription.go @@ -43,11 +43,13 @@ func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HT return nil, err } + go sub.subscribe() + return &sub, nil } -// Subscribe starts the subscription process. -func (s *HTTPSubscriber) Subscribe() { +// subscribe starts the subscription process. +func (s *HTTPSubscriber) subscribe() { timer := time.NewTimer(0) for { diff --git a/subscription_test.go b/subscription_test.go index e3b24ca..f8ad6dd 100644 --- a/subscription_test.go +++ b/subscription_test.go @@ -70,7 +70,6 @@ func TestHTTPSubscriber(t *testing.T) { if err != nil { t.Fatal(err) } - go subscriber.Subscribe() // Allow some time for subscription to run time.Sleep(2 * time.Second) From 9c413f3817a3b40667c27c7e2d170ebfe080bc0b Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Wed, 18 Oct 2023 19:43:15 +0200 Subject: [PATCH 15/15] Remove unreachable code --- denylist.go | 1 - 1 file changed, 1 deletion(-) diff --git a/denylist.go b/denylist.go index e92d2f7..7d49926 100644 --- a/denylist.go +++ b/denylist.go @@ -300,7 +300,6 @@ func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineN line = "" limRdr.N = maxLineSize // reset } - return nil } // parseLine processes every full-line read and puts it into the BlocksDB etc.