Skip to content

Commit

Permalink
Issue #150
Browse files Browse the repository at this point in the history
CHANGE:
  - kvStoreInterfaces/redis.go Now houses common redis calls used
    **throughout** the application. The `server/peerStore/redisStore.go` file
    just implements a wrapper around these.
  • Loading branch information
Ianleeclark committed Nov 25, 2016
1 parent b47e4b5 commit 790826d
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 192 deletions.
16 changes: 8 additions & 8 deletions announce/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net/url"
"strconv"
"strings"
"github.com/GrappigPanda/notorious/server/peerStore"
r "github.com/GrappigPanda/notorious/kvStoreInterfaces"
)

func (a *AnnounceData) ParseAnnounceData(req *http.Request) (err error) {
Expand Down Expand Up @@ -63,7 +63,7 @@ func (a *AnnounceData) ParseAnnounceData(req *http.Request) (err error) {
a.Event = "started"
}

a.RequestContext.redisClient = peerStore.OpenClient()
a.RequestContext.redisClient = r.OpenClient()

return
}
Expand Down Expand Up @@ -110,8 +110,8 @@ func (a *AnnounceData) StartedEventHandler() (err error) {
ipport = fmt.Sprintf("%s:%d", a.IP, a.Port)
}

peerStore.RedisSetKeyVal(a.RequestContext.redisClient, keymember, ipport)
if peerStore.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) {
r.RedisSetKeyVal(a.RequestContext.redisClient, keymember, ipport)
if r.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) {
fmt.Printf("Adding host %s to %s\n", ipport, keymember)
}

Expand Down Expand Up @@ -146,7 +146,7 @@ func (a *AnnounceData) CompletedEventHandler() {
keymember := fmt.Sprintf("%s:complete", a.InfoHash)
// TODO(ian): DRY!
ipport := fmt.Sprintf("%s:%s", a.IP, a.Port)
if peerStore.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) {
if r.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) {
fmt.Printf("Adding host %s to %s:complete\n", ipport, a.InfoHash)
}
}
Expand All @@ -157,15 +157,15 @@ func (a *AnnounceData) removeFromKVStorage(subkey string) {
keymember := fmt.Sprintf("%s:%s", a.InfoHash, subkey)

fmt.Printf("Removing host %s from %v\n", ipport, keymember)
peerStore.RedisRemoveKeysValue(a.RequestContext.redisClient, keymember, ipport)
r.RedisRemoveKeysValue(a.RequestContext.redisClient, keymember, ipport)
}

func (a *AnnounceData) infoHashExists() bool {
return peerStore.RedisGetBoolKeyVal(a.InfoHash)
return r.RedisGetBoolKeyVal(a.InfoHash)
}

func (a *AnnounceData) createInfoHashKey() {
peerStore.CreateNewTorrentKey(a.InfoHash)
r.CreateNewTorrentKey(a.InfoHash)
}

// ParseInfoHash parses the encoded info hash. Such a simple solution for a
Expand Down
175 changes: 175 additions & 0 deletions kvStoreInterfaces/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package kvStoreInterface

import (
"gopkg.in/redis.v3"
"bytes"
"fmt"
"strings"
"time"
)

// EXPIRETIME signifies how long a peer will live under the specified info_hash
// until the reaper removes it.
var EXPIRETIME int64 = 5 * 60

// OpenClient opens a connection to redis.
func OpenClient() (client *redis.Client) {
client = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})

return
}

// RedisSetIPMember sets a key as a member of an infohash and sets a timeout.
func RedisSetIPMember(infoHash, ipPort string) (retval int) {
c := OpenClient()

keymember := concatenateKeyMember(infoHash, "ip")

currTime := int64(time.Now().UTC().AddDate(0, 0, 1).Unix())

key := fmt.Sprintf("%s:%v", ipPort, currTime)

if err := c.SAdd(keymember, key).Err(); err != nil {
retval = 0
panic("Failed to add key")

} else {
retval = 1
}

return
}

// RedisSetKeyVal Sets a key to the specified value. Used mostly with adding a
// peer into an info_hash
func RedisSetKeyVal(c *redis.Client, keymember string, value string) {
// RedisSetKeyVal sets a key:member's value to value. Returns nothing as of
// yet.
currTime := int64(time.Now().UTC().Unix())
currTime += EXPIRETIME
value = fmt.Sprintf("%v:%v", value, currTime)

if sz := strings.Split(value, ":"); len(sz) >= 1 {
// If the value being added can be converted to an int, it is a ip:port key
// and we can set an expiration on it.
c.SAdd(keymember, value)
}
}

// RedisGetKeyVal Lookup a peer in the specified infohash at `key`
func RedisGetKeyVal(key string) []string {
c := OpenClient()

// RedisGetKeyVal retrieves a value from the Redis store by looking up the
// provided key. If the key does not yet exist, we Create the key in the KV
// storage or if the value is empty, we add the current requester to the
// list.
keymember := concatenateKeyMember(key, "complete")

val, err := c.SMembers(keymember).Result()
if err != nil {
// Fail because the key doesn't exist in the KV storage.
CreateNewTorrentKey(keymember)
}

return val
}

// RedisGetAllPeers fetches all peers from the info_hash at `key`
func RedisGetAllPeers(key string) []string {
c := OpenClient()

keymember := concatenateKeyMember(key, "complete")

val, err := c.SRandMemberN(keymember, 30).Result()
if err != nil {
// Fail because the key doesn't exist in the KV storage.
CreateNewTorrentKey(keymember)
}

if len(val) == 30 {
return val
}

keymember = concatenateKeyMember(key, "incomplete")

val2, err := c.SRandMemberN(keymember, int64(30-len(val))).Result()
if err != nil {
panic("Failed to get incomplete peers for")
} else {
val = append(val, val2...)
}

return val
}

// RedisGetCount counts all of the peers at `info_hash`
func RedisGetCount(c *redis.Client, info_hash string, member string) (retval int, err error) {
// A generic function which is used to retrieve either the complete count
// or the incomplete count for a specified `info_hash`.
keymember := concatenateKeyMember(info_hash, member)

x, err := c.SMembers(keymember).Result()
if err != nil {
// TODO(ian): Add actual error checking here.
err = fmt.Errorf("The info hash %s with member %s doesn't exist", info_hash, member)
}

retval = len(x)
return
}

// RedisGetBoolKeyVal Checks if a `key` exists
func RedisGetBoolKeyVal(key string) bool {
c := OpenClient()
ret, _ := c.Exists(key).Result()

return ret
}

// RedisSetKeyIfNotExists Set a key if it doesn't exist.
func RedisSetKeyIfNotExists(c *redis.Client, keymember string, value string) (rv bool) {
rv = RedisGetBoolKeyVal(keymember)
if !rv {
RedisSetKeyVal(c, keymember, value)
}
return
}

// RedisRemoveKeysValue Remove a `value` from `key` in the redis kv storage. `key` is typically
// a keymember of info_hash:(in)complete and the value is typically the
// ip:port concatenated.
func RedisRemoveKeysValue(c *redis.Client, key string, value string) {
c.SRem(key, value)
}

// CreateNewTorrentKey Creates a new key. By default, it adds a member
// ":ip". I don't think this ought to ever be generalized, as I just want
// Redis to function in one specific way in notorious.
func CreateNewTorrentKey(key string) {
c := OpenClient()
c.SAdd(key, "complete", "incomplete")

}

// concatenateKeyMember concatenates the key and the member delimited by the
// character ":"
func concatenateKeyMember(key string, member string) string {
var buffer bytes.Buffer

buffer.WriteString(key)
buffer.WriteString(":")
buffer.WriteString(member)

return buffer.String()
}

// CreateIPPortPair Creates a string formatted ("%s:%s", value.ip,
// value.port) looking like so: "127.0.0.1:6886" and returns this value.
func createIPPortPair(ip, port string) string {
return fmt.Sprintf("%v:%v", ip, port)
}
6 changes: 3 additions & 3 deletions reaper/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package reaper
import (
"fmt"
"github.com/GrappigPanda/notorious/database"
"github.com/GrappigPanda/notorious/server/peerStore"
r "github.com/GrappigPanda/notorious/kvStoreInterfaces"
"gopkg.in/redis.v3"
"strconv"
"strings"
Expand Down Expand Up @@ -72,7 +72,7 @@ func StartReapingScheduler(waitTime time.Duration) {
go func() {
for {
// Handle any other cleanup or Notorious-related functions
c := peerStore.OpenClient()
c := r.OpenClient()
_, err := c.Ping().Result()
if err != nil {
panic("No Redis instance detected. If deploying without Docker, install redis-server")
Expand All @@ -86,7 +86,7 @@ func StartReapingScheduler(waitTime time.Duration) {
x, err := db.GetWhitelistedTorrents()
for x.Next() {
x.Scan(infoHash, name, addedBy, dateAdded)
peerStore.CreateNewTorrentKey(*infoHash)
r.CreateNewTorrentKey(*infoHash)
}

// Start the actual peer reaper.
Expand Down
6 changes: 3 additions & 3 deletions server/announce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/GrappigPanda/notorious/bencode"
"github.com/GrappigPanda/notorious/database"
. "github.com/GrappigPanda/notorious/announce"
"github.com/GrappigPanda/notorious/server/peerStore"
r "github.com/GrappigPanda/notorious/kvStoreInterfaces"
"net"
"strconv"
"strings"
Expand Down Expand Up @@ -72,8 +72,8 @@ func formatResponseData(ips []string, data *AnnounceData) string {
// string that we respond with.
func EncodeResponse(ipport []string, data *AnnounceData) (resp string) {
ret := ""
completeCount := len(peerStore.RedisGetKeyVal(data.InfoHash))
incompleteCount := len(peerStore.RedisGetKeyVal(data.InfoHash))
completeCount := len(r.RedisGetKeyVal(data.InfoHash))
incompleteCount := len(r.RedisGetKeyVal(data.InfoHash))
ret += bencode.EncodeKV("complete", bencode.EncodeInt(completeCount))

ret += bencode.EncodeKV("incomplete", bencode.EncodeInt(incompleteCount))
Expand Down
1 change: 1 addition & 0 deletions server/peerStore/peerstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ type PeerStore interface {
KeyExists(string) bool
GetKeyVal(string) []string
GetAllPeers(string) []string
SetIPMember(string, string) int
}
Loading

0 comments on commit 790826d

Please sign in to comment.