diff --git a/announce/announce.go b/announce/announce.go index c04a077..918f911 100644 --- a/announce/announce.go +++ b/announce/announce.go @@ -6,7 +6,7 @@ import ( "net/url" "strconv" "strings" - "github.com/GrappigPanda/notorious/server/peerStore" + r "github.com/GrappigPanda/notorious/kvStoreInterfaces" ) func (a *AnnounceData) ParseAnnounceData(req *http.Request) (err error) { @@ -63,7 +63,7 @@ func (a *AnnounceData) ParseAnnounceData(req *http.Request) (err error) { a.Event = "started" } - a.RequestContext.redisClient = peerStore.OpenClient() + a.RequestContext.redisClient = r.OpenClient() return } @@ -110,8 +110,8 @@ func (a *AnnounceData) StartedEventHandler() (err error) { ipport = fmt.Sprintf("%s:%d", a.IP, a.Port) } - peerStore.RedisSetKeyVal(a.RequestContext.redisClient, keymember, ipport) - if peerStore.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) { + r.RedisSetKeyVal(a.RequestContext.redisClient, keymember, ipport) + if r.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) { fmt.Printf("Adding host %s to %s\n", ipport, keymember) } @@ -146,7 +146,7 @@ func (a *AnnounceData) CompletedEventHandler() { keymember := fmt.Sprintf("%s:complete", a.InfoHash) // TODO(ian): DRY! ipport := fmt.Sprintf("%s:%s", a.IP, a.Port) - if peerStore.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) { + if r.RedisSetKeyIfNotExists(a.RequestContext.redisClient, keymember, ipport) { fmt.Printf("Adding host %s to %s:complete\n", ipport, a.InfoHash) } } @@ -157,15 +157,15 @@ func (a *AnnounceData) removeFromKVStorage(subkey string) { keymember := fmt.Sprintf("%s:%s", a.InfoHash, subkey) fmt.Printf("Removing host %s from %v\n", ipport, keymember) - peerStore.RedisRemoveKeysValue(a.RequestContext.redisClient, keymember, ipport) + r.RedisRemoveKeysValue(a.RequestContext.redisClient, keymember, ipport) } func (a *AnnounceData) infoHashExists() bool { - return peerStore.RedisGetBoolKeyVal(a.InfoHash) + return r.RedisGetBoolKeyVal(a.InfoHash) } func (a *AnnounceData) createInfoHashKey() { - peerStore.CreateNewTorrentKey(a.InfoHash) + r.CreateNewTorrentKey(a.InfoHash) } // ParseInfoHash parses the encoded info hash. Such a simple solution for a diff --git a/kvStoreInterfaces/redis.go b/kvStoreInterfaces/redis.go new file mode 100644 index 0000000..83a1fe6 --- /dev/null +++ b/kvStoreInterfaces/redis.go @@ -0,0 +1,175 @@ +package kvStoreInterface + +import ( + "gopkg.in/redis.v3" + "bytes" + "fmt" + "strings" + "time" +) + +// EXPIRETIME signifies how long a peer will live under the specified info_hash +// until the reaper removes it. +var EXPIRETIME int64 = 5 * 60 + +// OpenClient opens a connection to redis. +func OpenClient() (client *redis.Client) { + client = redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + Password: "", + DB: 0, + }) + + return +} + +// RedisSetIPMember sets a key as a member of an infohash and sets a timeout. +func RedisSetIPMember(infoHash, ipPort string) (retval int) { + c := OpenClient() + + keymember := concatenateKeyMember(infoHash, "ip") + + currTime := int64(time.Now().UTC().AddDate(0, 0, 1).Unix()) + + key := fmt.Sprintf("%s:%v", ipPort, currTime) + + if err := c.SAdd(keymember, key).Err(); err != nil { + retval = 0 + panic("Failed to add key") + + } else { + retval = 1 + } + + return +} + +// RedisSetKeyVal Sets a key to the specified value. Used mostly with adding a +// peer into an info_hash +func RedisSetKeyVal(c *redis.Client, keymember string, value string) { + // RedisSetKeyVal sets a key:member's value to value. Returns nothing as of + // yet. + currTime := int64(time.Now().UTC().Unix()) + currTime += EXPIRETIME + value = fmt.Sprintf("%v:%v", value, currTime) + + if sz := strings.Split(value, ":"); len(sz) >= 1 { + // If the value being added can be converted to an int, it is a ip:port key + // and we can set an expiration on it. + c.SAdd(keymember, value) + } +} + +// RedisGetKeyVal Lookup a peer in the specified infohash at `key` +func RedisGetKeyVal(key string) []string { + c := OpenClient() + + // RedisGetKeyVal retrieves a value from the Redis store by looking up the + // provided key. If the key does not yet exist, we Create the key in the KV + // storage or if the value is empty, we add the current requester to the + // list. + keymember := concatenateKeyMember(key, "complete") + + val, err := c.SMembers(keymember).Result() + if err != nil { + // Fail because the key doesn't exist in the KV storage. + CreateNewTorrentKey(keymember) + } + + return val +} + +// RedisGetAllPeers fetches all peers from the info_hash at `key` +func RedisGetAllPeers(key string) []string { + c := OpenClient() + + keymember := concatenateKeyMember(key, "complete") + + val, err := c.SRandMemberN(keymember, 30).Result() + if err != nil { + // Fail because the key doesn't exist in the KV storage. + CreateNewTorrentKey(keymember) + } + + if len(val) == 30 { + return val + } + + keymember = concatenateKeyMember(key, "incomplete") + + val2, err := c.SRandMemberN(keymember, int64(30-len(val))).Result() + if err != nil { + panic("Failed to get incomplete peers for") + } else { + val = append(val, val2...) + } + + return val +} + +// RedisGetCount counts all of the peers at `info_hash` +func RedisGetCount(c *redis.Client, info_hash string, member string) (retval int, err error) { + // A generic function which is used to retrieve either the complete count + // or the incomplete count for a specified `info_hash`. + keymember := concatenateKeyMember(info_hash, member) + + x, err := c.SMembers(keymember).Result() + if err != nil { + // TODO(ian): Add actual error checking here. + err = fmt.Errorf("The info hash %s with member %s doesn't exist", info_hash, member) + } + + retval = len(x) + return +} + +// RedisGetBoolKeyVal Checks if a `key` exists +func RedisGetBoolKeyVal(key string) bool { + c := OpenClient() + ret, _ := c.Exists(key).Result() + + return ret +} + +// RedisSetKeyIfNotExists Set a key if it doesn't exist. +func RedisSetKeyIfNotExists(c *redis.Client, keymember string, value string) (rv bool) { + rv = RedisGetBoolKeyVal(keymember) + if !rv { + RedisSetKeyVal(c, keymember, value) + } + return +} + +// RedisRemoveKeysValue Remove a `value` from `key` in the redis kv storage. `key` is typically +// a keymember of info_hash:(in)complete and the value is typically the +// ip:port concatenated. +func RedisRemoveKeysValue(c *redis.Client, key string, value string) { + c.SRem(key, value) +} + +// CreateNewTorrentKey Creates a new key. By default, it adds a member +// ":ip". I don't think this ought to ever be generalized, as I just want +// Redis to function in one specific way in notorious. +func CreateNewTorrentKey(key string) { + c := OpenClient() + c.SAdd(key, "complete", "incomplete") + +} + +// concatenateKeyMember concatenates the key and the member delimited by the +// character ":" +func concatenateKeyMember(key string, member string) string { + var buffer bytes.Buffer + + buffer.WriteString(key) + buffer.WriteString(":") + buffer.WriteString(member) + + return buffer.String() +} + +// CreateIPPortPair Creates a string formatted ("%s:%s", value.ip, +// value.port) looking like so: "127.0.0.1:6886" and returns this value. +func createIPPortPair(ip, port string) string { + return fmt.Sprintf("%v:%v", ip, port) +} diff --git a/reaper/reaper.go b/reaper/reaper.go index e265724..eaae9bf 100644 --- a/reaper/reaper.go +++ b/reaper/reaper.go @@ -3,7 +3,7 @@ package reaper import ( "fmt" "github.com/GrappigPanda/notorious/database" - "github.com/GrappigPanda/notorious/server/peerStore" + r "github.com/GrappigPanda/notorious/kvStoreInterfaces" "gopkg.in/redis.v3" "strconv" "strings" @@ -72,7 +72,7 @@ func StartReapingScheduler(waitTime time.Duration) { go func() { for { // Handle any other cleanup or Notorious-related functions - c := peerStore.OpenClient() + c := r.OpenClient() _, err := c.Ping().Result() if err != nil { panic("No Redis instance detected. If deploying without Docker, install redis-server") @@ -86,7 +86,7 @@ func StartReapingScheduler(waitTime time.Duration) { x, err := db.GetWhitelistedTorrents() for x.Next() { x.Scan(infoHash, name, addedBy, dateAdded) - peerStore.CreateNewTorrentKey(*infoHash) + r.CreateNewTorrentKey(*infoHash) } // Start the actual peer reaper. diff --git a/server/announce_response.go b/server/announce_response.go index 070fe8f..d6e86eb 100644 --- a/server/announce_response.go +++ b/server/announce_response.go @@ -6,7 +6,7 @@ import ( "github.com/GrappigPanda/notorious/bencode" "github.com/GrappigPanda/notorious/database" . "github.com/GrappigPanda/notorious/announce" - "github.com/GrappigPanda/notorious/server/peerStore" + r "github.com/GrappigPanda/notorious/kvStoreInterfaces" "net" "strconv" "strings" @@ -72,8 +72,8 @@ func formatResponseData(ips []string, data *AnnounceData) string { // string that we respond with. func EncodeResponse(ipport []string, data *AnnounceData) (resp string) { ret := "" - completeCount := len(peerStore.RedisGetKeyVal(data.InfoHash)) - incompleteCount := len(peerStore.RedisGetKeyVal(data.InfoHash)) + completeCount := len(r.RedisGetKeyVal(data.InfoHash)) + incompleteCount := len(r.RedisGetKeyVal(data.InfoHash)) ret += bencode.EncodeKV("complete", bencode.EncodeInt(completeCount)) ret += bencode.EncodeKV("incomplete", bencode.EncodeInt(incompleteCount)) diff --git a/server/peerStore/peerstore.go b/server/peerStore/peerstore.go index a09be4f..5ada357 100644 --- a/server/peerStore/peerstore.go +++ b/server/peerStore/peerstore.go @@ -7,4 +7,5 @@ type PeerStore interface { KeyExists(string) bool GetKeyVal(string) []string GetAllPeers(string) []string + SetIPMember(string, string) int } diff --git a/server/peerStore/redisStore.go b/server/peerStore/redisStore.go index c3232c9..93b56f5 100644 --- a/server/peerStore/redisStore.go +++ b/server/peerStore/redisStore.go @@ -1,202 +1,41 @@ package peerStore import ( - "bytes" - "fmt" + r "github.com/GrappigPanda/notorious/kvStoreInterfaces" "gopkg.in/redis.v3" - "strings" - "time" ) type RedisStore struct { client *redis.Client } -// EXPIRETIME signifies how long a peer will live under the specified info_hash -// until the reaper removes it. -var EXPIRETIME int64 = 5 * 60 - -// OpenClient opens a connection to redis. -func OpenClient() (client *redis.Client) { - client = redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", - DB: 0, - }) - - return -} - func (p *RedisStore) SetKeyIfNotExists(key, value string) (retval bool) { - return redisSetKeyIfNotExists(p.client, key, value) + return r.RedisSetKeyIfNotExists(p.client, key, value) } func (p *RedisStore) SetKV(key, value string) { - redisSetKeyVal(p.client, key, value) + r.RedisSetKeyVal(p.client, key, value) } func (p *RedisStore) RemoveKV(key, value string) { // TODO(ian): Refactor this so we don't have to delete a value from a key if value != "" || value == "" { - redisRemoveKeysValue(p.client, key, value) + r.RedisRemoveKeysValue(p.client, key, value) } } func (p *RedisStore) KeyExists(key string) (retval bool) { - return redisGetBoolKeyVal(key) + return r.RedisGetBoolKeyVal(key) } func (p *RedisStore) GetKeyVal(key string) []string { - return redisGetKeyVal(key) -} - -// RedisSetIPMember sets a key as a member of an infohash and sets a timeout. -func redisSetIPMember(infoHash, ipPort string) (retval int) { - c := OpenClient() - - keymember := concatenateKeyMember(infoHash, "ip") - - currTime := int64(time.Now().UTC().AddDate(0, 0, 1).Unix()) - - key := fmt.Sprintf("%s:%v", ipPort, currTime) - - if err := c.SAdd(keymember, key).Err(); err != nil { - retval = 0 - panic("Failed to add key") - - } else { - retval = 1 - } - - return -} - -// RedisSetKeyVal Sets a key to the specified value. Used mostly with adding a -// peer into an info_hash -func redisSetKeyVal(c *redis.Client, keymember string, value string) { - // RedisSetKeyVal sets a key:member's value to value. Returns nothing as of - // yet. - currTime := int64(time.Now().UTC().Unix()) - currTime += EXPIRETIME - value = fmt.Sprintf("%v:%v", value, currTime) - - if sz := strings.Split(value, ":"); len(sz) >= 1 { - // If the value being added can be converted to an int, it is a ip:port key - // and we can set an expiration on it. - c.SAdd(keymember, value) - } -} - -// RedisGetKeyVal Lookup a peer in the specified infohash at `key` -func redisGetKeyVal(key string) []string { - c := OpenClient() - - // RedisGetKeyVal retrieves a value from the Redis store by looking up the - // provided key. If the key does not yet exist, we create the key in the KV - // storage or if the value is empty, we add the current requester to the - // list. - keymember := concatenateKeyMember(key, "complete") - - val, err := c.SMembers(keymember).Result() - if err != nil { - // Fail because the key doesn't exist in the KV storage. - createNewTorrentKey(keymember) - } - - return val -} - -// RedisGetAllPeers fetches all peers from the info_hash at `key` -func redisGetAllPeers(key string) []string { - c := OpenClient() - - keymember := concatenateKeyMember(key, "complete") - - val, err := c.SRandMemberN(keymember, 30).Result() - if err != nil { - // Fail because the key doesn't exist in the KV storage. - createNewTorrentKey(keymember) - } - - if len(val) == 30 { - return val - } - - keymember = concatenateKeyMember(key, "incomplete") - - val2, err := c.SRandMemberN(keymember, int64(30-len(val))).Result() - if err != nil { - panic("Failed to get incomplete peers for") - } else { - val = append(val, val2...) - } - - return val -} - -// RedisGetCount counts all of the peers at `info_hash` -func redisGetCount(c *redis.Client, info_hash string, member string) (retval int, err error) { - // A generic function which is used to retrieve either the complete count - // or the incomplete count for a specified `info_hash`. - keymember := concatenateKeyMember(info_hash, member) - - x, err := c.SMembers(keymember).Result() - if err != nil { - // TODO(ian): Add actual error checking here. - err = fmt.Errorf("The info hash %s with member %s doesn't exist", info_hash, member) - } - - retval = len(x) - return -} - -// RedisGetBoolKeyVal Checks if a `key` exists -func redisGetBoolKeyVal(key string) bool { - c := OpenClient() - ret, _ := c.Exists(key).Result() - - return ret -} - -// RedisSetKeyIfNotExists Set a key if it doesn't exist. -func redisSetKeyIfNotExists(c *redis.Client, keymember string, value string) (rv bool) { - rv = redisGetBoolKeyVal(keymember) - if !rv { - redisSetKeyVal(c, keymember, value) - } - return -} - -// RedisRemoveKeysValue Remove a `value` from `key` in the redis kv storage. `key` is typically -// a keymember of info_hash:(in)complete and the value is typically the -// ip:port concatenated. -func redisRemoveKeysValue(c *redis.Client, key string, value string) { - c.SRem(key, value) + return r.RedisGetKeyVal(key) } -// CreateNewTorrentKey creates a new key. By default, it adds a member -// ":ip". I don't think this ought to ever be generalized, as I just want -// Redis to function in one specific way in notorious. -func createNewTorrentKey(key string) { - c := OpenClient() - c.SAdd(key, "complete", "incomplete") - -} - -// concatenateKeyMember concatenates the key and the member delimited by the -// character ":" -func concatenateKeyMember(key string, member string) string { - var buffer bytes.Buffer - - buffer.WriteString(key) - buffer.WriteString(":") - buffer.WriteString(member) - - return buffer.String() +func (p *RedisStore) GetAllPeers(key string) []string { + return r.RedisGetAllPeers(key) } -// createIPPortPair creates a string formatted ("%s:%s", value.ip, -// value.port) looking like so: "127.0.0.1:6886" and returns this value. -func createIPPortPair(ip, port string) string { - return fmt.Sprintf("%v:%v", ip, port) +func (p *RedisStore) SetIPMember(infoHash, ipPort string) (retval int) { + return r.RedisSetIPMember(infoHash, ipPort) } diff --git a/server/server.go b/server/server.go index c30e680..3bb84de 100644 --- a/server/server.go +++ b/server/server.go @@ -6,6 +6,7 @@ import ( "github.com/GrappigPanda/notorious/database" . "github.com/GrappigPanda/notorious/announce" "github.com/GrappigPanda/notorious/server/peerStore" + r "github.com/GrappigPanda/notorious/kvStoreInterfaces" "net/http" ) @@ -14,7 +15,7 @@ import ( type applicationContext struct { config config.ConfigStruct trackerLevel int - peerStoreClient *peerStore.PeerStore + peerStoreClient peerStore.PeerStore } type scrapeData struct { @@ -49,14 +50,14 @@ var FIELDS = []string{"port", "uploaded", "downloaded", "left", "event", "compac func (app *applicationContext) worker(data *AnnounceData) []string { if app.peerStoreClient.KeyExists(data.InfoHash) { - x := peerStore.GetKeyVal(data.InfoHash) + x := app.peerStoreClient.GetKeyVal(data.InfoHash) - app.peerStoreClient.RedisSetIPMember(data.InfoHash, fmt.Sprintf("%s:%s", data.IP, data.Port)) + app.peerStoreClient.SetIPMember(data.InfoHash, fmt.Sprintf("%s:%s", data.IP, data.Port)) return x } else { - app.peerStoreClient.SetKV(data.InfoHash) + r.CreateNewTorrentKey(data.InfoHash) } return app.worker(data) @@ -158,12 +159,10 @@ func writeResponse(w http.ResponseWriter, values string) { // RunServer spins up the server and muxes the routes. func RunServer() { - peerStore := new(peerStore.RedisStore) - app := applicationContext{ config: config.LoadConfig(), trackerLevel: RATIOLESS, - peerStoreClient: peerStore.redisStore, + peerStoreClient: new(peerStore.RedisStore), } mux := http.NewServeMux()