diff --git a/README.md b/README.md index 1cb8aa9..77d6f9a 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ content blocking to the go-ipfs stack and particularly to Kubo. ## Content-blocking in Kubo 1. Grab a plugin release from the [releases](https://github.com/ipfs-shipyard/nopfs/releases) section matching your Kubo version and install the plugin file in `~/.ipfs/plugins`. - 2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download the [BadBits denylist](https://badbits.dwebops.pub/badbits.deny) and place them in `~/.config/ipfs/denylists/`. + 2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download one of the supported denylists from [Denyli.st](https://denyli.st) and place them in `~/.config/ipfs/denylists/` (ensure `.deny` extension). 3. Start Kubo (`ipfs daemon`). The plugin should be loaded automatically and existing denylists tracked for updates from that point (no restarts required). ## Denylist syntax @@ -82,6 +82,22 @@ hints: //QmbK7LDv5NNBvYQzNfm2eED17SNLt1yNMapcUhSuNLgkqz ``` +You can create double-hashes by hand with the following command: + +``` +printf "QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768/my/path" \ + | ipfs add --raw-leaves --only-hash --quiet \ + | ipfs cid format -f '%M' -b base58btc +``` + +where: + - `QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768` must always be a + CidV0. If you have a CIDv1 you need to convert it to CIDv0 first. i.e + `ipfs cid format -v0 bafybeihrw75yfhdx5qsqgesdnxejtjybscwuclpusvxkuttep6h7pkgmze` + - `/my/path` is optional depending on whether you want to block a specific path. No wildcards supported here! + - The command above should give `QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8`. Use it as `//QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8` on the denylist. + + ## Kubo plugin NOpfs Kubo plugin pre-built binary releases are available in the diff --git a/cmd/httpsubs/main.go b/cmd/httpsubs/main.go new file mode 100644 index 0000000..158c88e --- /dev/null +++ b/cmd/httpsubs/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "os" + "os/signal" + "time" + + "github.com/ipfs-shipyard/nopfs" +) + +func main() { + if len(os.Args) != 3 { + fmt.Fprintln(os.Stderr, "Usage: program ") + os.Exit(1) + } + + local := os.Args[1] + remote := os.Args[2] + + fmt.Printf("%s: subscribed to %s. CTRL-C to stop\n", local, remote) + + subscriber, err := nopfs.NewHTTPSubscriber(remote, local, 1*time.Minute) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + fmt.Println("Stopping") + subscriber.Stop() +} diff --git a/denylist.go b/denylist.go index b6ba739..7d49926 100644 --- a/denylist.go +++ b/denylist.go @@ -194,44 +194,19 @@ func (dl *Denylist) parseAndFollow(follow bool) error { return err } - // we will update N as we go after every line. - // Fixme: this is going to play weird as the buffered reader will - // read-ahead and consume N. + // we will update N as we go after every line. Fixme: this is + // going to play weird as the buffered reader will read-ahead + // and consume N. limRdr := &io.LimitedReader{ R: dl.f, N: maxLineSize, } r := bufio.NewReader(limRdr) - lineNumber := dl.Header.headerLines - for { - line, err := r.ReadString('\n') - // limit reader exhausted - if err == io.EOF && len(line) >= maxLineSize { - err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) - logger.Error(err) - dl.Close() - return err - } - // keep waiting for data - if err == io.EOF { - break - } - if err != nil { - logger.Error(err) - return err - } - lineNumber++ - if err := dl.parseLine(line, lineNumber); err != nil { - logger.Error(err) - } - limRdr.N = maxLineSize // reset - - } // we finished reading the file as it EOF'ed. if !follow { - return nil + return dl.followLines(r, limRdr, lineNumber, nil) } // We now wait for new lines. @@ -261,55 +236,70 @@ func (dl *Denylist) parseAndFollow(follow bool) error { } } - // Is this the right way of tailing a file? Pretty sure there are a - // bunch of gotchas. It seems to work when saving on top of a file - // though. Also, important that the limitedReader is there to avoid - // parsing a huge lines. Also, this could be done by just having - // watchers on the folder, but requires a small refactoring. - go func() { - line := "" - limRdr.N = maxLineSize // reset + go dl.followLines(r, limRdr, lineNumber, waitForWrite) + return nil +} - for { - partialLine, err := r.ReadString('\n') - line += partialLine - - // limit reader exhausted - if err == io.EOF && limRdr.N == 0 { - err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) - logger.Error(err) - dl.Close() - return - } - // keep waiting for data - if err == io.EOF { - err := waitForWrite() +// followLines reads lines from a buffered reader on top of a limited reader, +// that we reset on every line. This enforces line-length limits. +// If we pass a waitWrite() function, then it waits when finding EOF. +// +// Is this the right way of tailing a file? Pretty sure there are a +// bunch of gotchas. It seems to work when saving on top of a file +// though. Also, important that the limitedReader is there to avoid +// parsing a huge lines. Also, this could be done by just having +// watchers on the folder, but requires a small refactoring. +func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64, waitWrite func() error) error { + line := "" + limRdr.N = maxLineSize // reset + + for { + partialLine, err := r.ReadString('\n') + + // limit reader exhausted + if err == io.EOF && limRdr.N == 0 { + err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1) + logger.Error(err) + dl.Close() + return err + } + + // Record how much of a line we have + line += partialLine + + if err == io.EOF { + if waitWrite != nil { // keep waiting + err := waitWrite() if err != nil { logger.Error(err) dl.Close() - return + return err } continue + } else { // Finished + return nil } - if err != nil { - logger.Error(err) - dl.Close() - return - } + } + if err != nil { + logger.Error(err) + dl.Close() + return err + } - lineNumber++ - // we have read up to \n - if err := dl.parseLine(line, lineNumber); err != nil { - logger.Error(err) - // log error and continue with next line + // if we are here, no EOF, no error and ReadString() + // found an \n so we have a full line. + + lineNumber++ + // we have read up to \n + if err := dl.parseLine(line, lineNumber); err != nil { + logger.Error(err) + // log error and continue with next line - } - // reset for next line - line = "" - limRdr.N = 2 << 20 // reset } - }() - return nil + // reset for next line + line = "" + limRdr.N = maxLineSize // reset + } } // parseLine processes every full-line read and puts it into the BlocksDB etc. diff --git a/go.sum b/go.sum index 6efff9f..b80ea0e 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= -github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI= -github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk= github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3 h1:sgrhALL6mBoZsNvJ2zUcITcN6IW3y14ej6w7gv5RcOI= github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk= github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s= diff --git a/subscription.go b/subscription.go new file mode 100644 index 0000000..b2ccea1 --- /dev/null +++ b/subscription.go @@ -0,0 +1,124 @@ +package nopfs + +import ( + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "os" + "time" +) + +// HTTPSubscriber represents a type that subscribes to a remote URL and appends data to a local file. +type HTTPSubscriber struct { + remoteURL string + localFile string + interval time.Duration + stopChannel chan struct{} +} + +// NewHTTPSubscriber creates a new Subscriber instance with the given parameters. +func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HTTPSubscriber, error) { + logger.Infof("Subscribing to remote denylist: %s", remoteURL) + + sub := HTTPSubscriber{ + remoteURL: remoteURL, + localFile: localFile, + interval: interval, + stopChannel: make(chan struct{}, 1), + } + + _, err := os.Stat(localFile) + // if not found, we perform a first sync before returning. + // this is necessary as otherwise the Blocker does not find much + // of the file + if err != nil && errors.Is(err, fs.ErrNotExist) { + logger.Infof("Performing first sync on: %s", localFile) + err := sub.downloadAndAppend() + if err != nil { + return nil, err + } + } else if err != nil { + return nil, err + } + + go sub.subscribe() + + return &sub, nil +} + +// subscribe starts the subscription process. +func (s *HTTPSubscriber) subscribe() { + timer := time.NewTimer(0) + + for { + select { + case <-s.stopChannel: + logger.Infof("Stopping subscription on: %s", s.localFile) + if !timer.Stop() { + <-timer.C + } + return + case <-timer.C: + err := s.downloadAndAppend() + if err != nil { + logger.Error(err) + } + timer.Reset(s.interval) + } + } +} + +// Stop stops the subscription process. +func (s *HTTPSubscriber) Stop() { + close(s.stopChannel) +} + +func (s *HTTPSubscriber) downloadAndAppend() error { + localFile, err := os.OpenFile(s.localFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + defer localFile.Close() + + // Get the file size of the local file + localFileInfo, err := localFile.Stat() + if err != nil { + return err + } + + localFileSize := localFileInfo.Size() + + // Create a HTTP GET request with the Range header to download only the missing bytes + req, err := http.NewRequest("GET", s.remoteURL, nil) + if err != nil { + return err + } + + rangeHeader := fmt.Sprintf("bytes=%d-", localFileSize) + req.Header.Set("Range", rangeHeader) + + logger.Debug("%s: requesting bytes from %d: %s", s.localFile, localFileSize, req.URL) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + switch { + case resp.StatusCode == http.StatusPartialContent: + _, err = io.Copy(localFile, resp.Body) + if err != nil { + return err + } + logger.Infof("%s: appended %d bytes", s.localFile, resp.ContentLength) + case (resp.StatusCode >= http.StatusBadRequest && + resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) || + resp.StatusCode >= http.StatusInternalServerError: + return fmt.Errorf("%s: server returned with unexpected code %d", s.localFile, resp.StatusCode) + // error is ignored, we continued subscribed + } + return nil +} diff --git a/subscription_test.go b/subscription_test.go new file mode 100644 index 0000000..f8ad6dd --- /dev/null +++ b/subscription_test.go @@ -0,0 +1,87 @@ +package nopfs + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +func createTestServer() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulate serving a file from the server + data := []byte("This is the remote content.") + contentLength := len(data) + + // Get the "Range" header from the request + rangeHeader := r.Header.Get("Range") + + if rangeHeader != "" { + var start, end int + + // Check for a range request in the format "bytes %d-" + if n, _ := fmt.Sscanf(rangeHeader, "bytes=%d-", &start); n == 1 { + // Handle open end range requests + if start >= contentLength { + http.Error(w, "Invalid Range header", http.StatusRequestedRangeNotSatisfiable) + return + } + end = contentLength - 1 + } else if n, _ := fmt.Sscanf(rangeHeader, "bytes=%d-%d", &start, &end); n == 2 { + // Check for valid byte range + if start < 0 || end >= contentLength || start > end { + http.Error(w, "Invalid Range header", http.StatusRequestedRangeNotSatisfiable) + return + } + } else { + http.Error(w, "Invalid Range header", http.StatusBadRequest) + return + } + + // Calculate the content range and length for the response + contentRange := fmt.Sprintf("bytes %d-%d/%d", start, end, contentLength) + w.Header().Set("Content-Range", contentRange) + w.Header().Set("Content-Length", fmt.Sprint(end-start+1)) + w.WriteHeader(http.StatusPartialContent) + + // Write the selected byte range to the response + _, _ = w.Write(data[start : end+1]) + } else { + // If no "Range" header, serve the entire content + w.Header().Set("Content-Range", fmt.Sprintf("bytes 0-%d/%d", contentLength-1, contentLength)) + w.Header().Set("Content-Length", fmt.Sprint(contentLength)) + w.WriteHeader(http.StatusPartialContent) + + _, _ = w.Write(data) + } + })) +} + +func TestHTTPSubscriber(t *testing.T) { + remoteServer := createTestServer() + defer remoteServer.Close() + + localFile := "test-local-file.txt" + defer os.Remove(localFile) + + subscriber, err := NewHTTPSubscriber(remoteServer.URL, localFile, 500*time.Millisecond) + if err != nil { + t.Fatal(err) + } + + // Allow some time for subscription to run + time.Sleep(2 * time.Second) + subscriber.Stop() + + localFileContent, err := os.ReadFile(localFile) + if err != nil { + t.Errorf("Error reading local file: %v", err) + } + + expectedContent := "This is the remote content." + if string(localFileContent) != expectedContent { + t.Errorf("Local file content is incorrect. Got: %s, Expected: %s", string(localFileContent), expectedContent) + } +}