Skip to content

Commit

Permalink
Merge pull request #22 from ipfs-shipyard/subscription-http
Browse files Browse the repository at this point in the history
Add an HTTPSubscriber that allows subscribing to an append-only denylist
  • Loading branch information
hsanjuan authored Oct 18, 2023
2 parents d738993 + 9c413f3 commit 3b5bfd3
Show file tree
Hide file tree
Showing 6 changed files with 320 additions and 71 deletions.
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ content blocking to the go-ipfs stack and particularly to Kubo.
## Content-blocking in Kubo

1. Grab a plugin release from the [releases](https://github.com/ipfs-shipyard/nopfs/releases) section matching your Kubo version and install the plugin file in `~/.ipfs/plugins`.
2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download the [BadBits denylist](https://badbits.dwebops.pub/badbits.deny) and place them in `~/.config/ipfs/denylists/`.
2. Write a custom denylist file (see [syntax](#denylist-syntax) below) or simply download one of the supported denylists from [Denyli.st](https://denyli.st) and place them in `~/.config/ipfs/denylists/` (ensure `.deny` extension).
3. Start Kubo (`ipfs daemon`). The plugin should be loaded automatically and existing denylists tracked for updates from that point (no restarts required).

## Denylist syntax
Expand Down Expand Up @@ -82,6 +82,22 @@ hints:
//QmbK7LDv5NNBvYQzNfm2eED17SNLt1yNMapcUhSuNLgkqz
```

You can create double-hashes by hand with the following command:

```
printf "QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768/my/path" \
| ipfs add --raw-leaves --only-hash --quiet \
| ipfs cid format -f '%M' -b base58btc
```

where:
- `QmecDgNqCRirkc3Cjz9eoRBNwXGckJ9WvTdmY16HP88768` must always be a
CidV0. If you have a CIDv1 you need to convert it to CIDv0 first. i.e
`ipfs cid format -v0 bafybeihrw75yfhdx5qsqgesdnxejtjybscwuclpusvxkuttep6h7pkgmze`
- `/my/path` is optional depending on whether you want to block a specific path. No wildcards supported here!
- The command above should give `QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8`. Use it as `//QmSju6XPmYLG611rmK7rEeCMFVuL6EHpqyvmEU6oGx3GR8` on the denylist.


## Kubo plugin

NOpfs Kubo plugin pre-built binary releases are available in the
Expand Down
34 changes: 34 additions & 0 deletions cmd/httpsubs/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"fmt"
"os"
"os/signal"
"time"

"github.com/ipfs-shipyard/nopfs"
)

func main() {
if len(os.Args) != 3 {
fmt.Fprintln(os.Stderr, "Usage: program <local_denylist> <source_URL>")
os.Exit(1)
}

local := os.Args[1]
remote := os.Args[2]

fmt.Printf("%s: subscribed to %s. CTRL-C to stop\n", local, remote)

subscriber, err := nopfs.NewHTTPSubscriber(remote, local, 1*time.Minute)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c
fmt.Println("Stopping")
subscriber.Stop()
}
126 changes: 58 additions & 68 deletions denylist.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,44 +194,19 @@ func (dl *Denylist) parseAndFollow(follow bool) error {
return err
}

// we will update N as we go after every line.
// Fixme: this is going to play weird as the buffered reader will
// read-ahead and consume N.
// we will update N as we go after every line. Fixme: this is
// going to play weird as the buffered reader will read-ahead
// and consume N.
limRdr := &io.LimitedReader{
R: dl.f,
N: maxLineSize,
}
r := bufio.NewReader(limRdr)

lineNumber := dl.Header.headerLines
for {
line, err := r.ReadString('\n')
// limit reader exhausted
if err == io.EOF && len(line) >= maxLineSize {
err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1)
logger.Error(err)
dl.Close()
return err
}
// keep waiting for data
if err == io.EOF {
break
}
if err != nil {
logger.Error(err)
return err
}

lineNumber++
if err := dl.parseLine(line, lineNumber); err != nil {
logger.Error(err)
}
limRdr.N = maxLineSize // reset

}
// we finished reading the file as it EOF'ed.
if !follow {
return nil
return dl.followLines(r, limRdr, lineNumber, nil)
}
// We now wait for new lines.

Expand Down Expand Up @@ -261,55 +236,70 @@ func (dl *Denylist) parseAndFollow(follow bool) error {
}
}

// Is this the right way of tailing a file? Pretty sure there are a
// bunch of gotchas. It seems to work when saving on top of a file
// though. Also, important that the limitedReader is there to avoid
// parsing a huge lines. Also, this could be done by just having
// watchers on the folder, but requires a small refactoring.
go func() {
line := ""
limRdr.N = maxLineSize // reset
go dl.followLines(r, limRdr, lineNumber, waitForWrite)
return nil
}

for {
partialLine, err := r.ReadString('\n')
line += partialLine

// limit reader exhausted
if err == io.EOF && limRdr.N == 0 {
err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1)
logger.Error(err)
dl.Close()
return
}
// keep waiting for data
if err == io.EOF {
err := waitForWrite()
// followLines reads lines from a buffered reader on top of a limited reader,
// that we reset on every line. This enforces line-length limits.
// If we pass a waitWrite() function, then it waits when finding EOF.
//
// Is this the right way of tailing a file? Pretty sure there are a
// bunch of gotchas. It seems to work when saving on top of a file
// though. Also, important that the limitedReader is there to avoid
// parsing a huge lines. Also, this could be done by just having
// watchers on the folder, but requires a small refactoring.
func (dl *Denylist) followLines(r *bufio.Reader, limRdr *io.LimitedReader, lineNumber uint64, waitWrite func() error) error {
line := ""
limRdr.N = maxLineSize // reset

for {
partialLine, err := r.ReadString('\n')

// limit reader exhausted
if err == io.EOF && limRdr.N == 0 {
err = fmt.Errorf("line too long. %s:%d", dl.Filename, lineNumber+1)
logger.Error(err)
dl.Close()
return err
}

// Record how much of a line we have
line += partialLine

if err == io.EOF {
if waitWrite != nil { // keep waiting
err := waitWrite()
if err != nil {
logger.Error(err)
dl.Close()
return
return err
}
continue
} else { // Finished
return nil
}
if err != nil {
logger.Error(err)
dl.Close()
return
}
}
if err != nil {
logger.Error(err)
dl.Close()
return err
}

lineNumber++
// we have read up to \n
if err := dl.parseLine(line, lineNumber); err != nil {
logger.Error(err)
// log error and continue with next line
// if we are here, no EOF, no error and ReadString()
// found an \n so we have a full line.

lineNumber++
// we have read up to \n
if err := dl.parseLine(line, lineNumber); err != nil {
logger.Error(err)
// log error and continue with next line

}
// reset for next line
line = ""
limRdr.N = 2 << 20 // reset
}
}()
return nil
// reset for next line
line = ""
limRdr.N = maxLineSize // reset
}
}

// parseLine processes every full-line read and puts it into the BlocksDB etc.
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/ipfs/boxo v0.13.1 h1:nQ5oQzcMZR3oL41REJDcTbrvDvuZh3J9ckc9+ILeRQI=
github.com/ipfs/boxo v0.13.1/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk=
github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3 h1:sgrhALL6mBoZsNvJ2zUcITcN6IW3y14ej6w7gv5RcOI=
github.com/ipfs/boxo v0.13.2-0.20231012132507-6602207a8fa3/go.mod h1:btrtHy0lmO1ODMECbbEY1pxNtrLilvKSYLoGQt1yYCk=
github.com/ipfs/go-cid v0.4.1 h1:A/T3qGvxi4kpKWWcPC/PgbvDA2bjVLO7n4UeVwnbs/s=
Expand Down
124 changes: 124 additions & 0 deletions subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package nopfs

import (
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"time"
)

// HTTPSubscriber represents a type that subscribes to a remote URL and appends data to a local file.
type HTTPSubscriber struct {
remoteURL string
localFile string
interval time.Duration
stopChannel chan struct{}
}

// NewHTTPSubscriber creates a new Subscriber instance with the given parameters.
func NewHTTPSubscriber(remoteURL, localFile string, interval time.Duration) (*HTTPSubscriber, error) {
logger.Infof("Subscribing to remote denylist: %s", remoteURL)

sub := HTTPSubscriber{
remoteURL: remoteURL,
localFile: localFile,
interval: interval,
stopChannel: make(chan struct{}, 1),
}

_, err := os.Stat(localFile)
// if not found, we perform a first sync before returning.
// this is necessary as otherwise the Blocker does not find much
// of the file
if err != nil && errors.Is(err, fs.ErrNotExist) {
logger.Infof("Performing first sync on: %s", localFile)
err := sub.downloadAndAppend()
if err != nil {
return nil, err
}
} else if err != nil {
return nil, err
}

go sub.subscribe()

return &sub, nil
}

// subscribe starts the subscription process.
func (s *HTTPSubscriber) subscribe() {
timer := time.NewTimer(0)

for {
select {
case <-s.stopChannel:
logger.Infof("Stopping subscription on: %s", s.localFile)
if !timer.Stop() {
<-timer.C
}
return
case <-timer.C:
err := s.downloadAndAppend()
if err != nil {
logger.Error(err)
}
timer.Reset(s.interval)
}
}
}

// Stop stops the subscription process.
func (s *HTTPSubscriber) Stop() {
close(s.stopChannel)
}

func (s *HTTPSubscriber) downloadAndAppend() error {
localFile, err := os.OpenFile(s.localFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
defer localFile.Close()

// Get the file size of the local file
localFileInfo, err := localFile.Stat()
if err != nil {
return err
}

localFileSize := localFileInfo.Size()

// Create a HTTP GET request with the Range header to download only the missing bytes
req, err := http.NewRequest("GET", s.remoteURL, nil)
if err != nil {
return err
}

rangeHeader := fmt.Sprintf("bytes=%d-", localFileSize)
req.Header.Set("Range", rangeHeader)

logger.Debug("%s: requesting bytes from %d: %s", s.localFile, localFileSize, req.URL)

resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

switch {
case resp.StatusCode == http.StatusPartialContent:
_, err = io.Copy(localFile, resp.Body)
if err != nil {
return err
}
logger.Infof("%s: appended %d bytes", s.localFile, resp.ContentLength)
case (resp.StatusCode >= http.StatusBadRequest &&
resp.StatusCode != http.StatusRequestedRangeNotSatisfiable) ||
resp.StatusCode >= http.StatusInternalServerError:
return fmt.Errorf("%s: server returned with unexpected code %d", s.localFile, resp.StatusCode)
// error is ignored, we continued subscribed
}
return nil
}
Loading

0 comments on commit 3b5bfd3

Please sign in to comment.