Skip to content

Commit

Permalink
Merge pull request #95 from rod-hynes/master
Browse files Browse the repository at this point in the history
New dataStore implementation
  • Loading branch information
rod-hynes committed Jun 2, 2015
2 parents 85d64b0 + 34b092e commit 8b5cbce
Show file tree
Hide file tree
Showing 8 changed files with 897 additions and 47 deletions.
3 changes: 2 additions & 1 deletion psiphon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ const (
TUNNEL_SSH_KEEP_ALIVE_PERIOD_MIN = 60 * time.Second
TUNNEL_SSH_KEEP_ALIVE_PERIOD_MAX = 120 * time.Second
ESTABLISH_TUNNEL_TIMEOUT_SECONDS = 300
ESTABLISH_TUNNEL_PAUSE_PERIOD = 10 * time.Second
ESTABLISH_TUNNEL_WORK_TIME_SECONDS = 60 * time.Second
ESTABLISH_TUNNEL_PAUSE_PERIOD = 5 * time.Second
PORT_FORWARD_FAILURE_THRESHOLD = 10
HTTP_PROXY_ORIGIN_SERVER_TIMEOUT = 15 * time.Second
HTTP_PROXY_MAX_IDLE_CONNECTIONS_PER_HOST = 50
Expand Down
136 changes: 92 additions & 44 deletions psiphon/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,27 @@ import (
// connect to; establishes and monitors tunnels; and runs local proxies which
// route traffic through the tunnels.
type Controller struct {
config *Config
sessionId string
componentFailureSignal chan struct{}
shutdownBroadcast chan struct{}
runWaitGroup *sync.WaitGroup
establishedTunnels chan *Tunnel
failedTunnels chan *Tunnel
tunnelMutex sync.Mutex
establishedOnce bool
tunnels []*Tunnel
nextTunnel int
startedConnectedReporter bool
isEstablishing bool
establishWaitGroup *sync.WaitGroup
stopEstablishingBroadcast chan struct{}
candidateServerEntries chan *ServerEntry
establishPendingConns *Conns
untunneledPendingConns *Conns
untunneledDialConfig *DialConfig
splitTunnelClassifier *SplitTunnelClassifier
config *Config
sessionId string
componentFailureSignal chan struct{}
shutdownBroadcast chan struct{}
runWaitGroup *sync.WaitGroup
establishedTunnels chan *Tunnel
failedTunnels chan *Tunnel
tunnelMutex sync.Mutex
establishedOnce bool
tunnels []*Tunnel
nextTunnel int
startedConnectedReporter bool
isEstablishing bool
establishWaitGroup *sync.WaitGroup
stopEstablishingBroadcast chan struct{}
candidateServerEntries chan *ServerEntry
establishPendingConns *Conns
untunneledPendingConns *Conns
untunneledDialConfig *DialConfig
splitTunnelClassifier *SplitTunnelClassifier
signalFetchRemoteServerList chan struct{}
}

// NewController initializes a new controller.
Expand Down Expand Up @@ -97,6 +98,9 @@ func NewController(config *Config) (controller *Controller, err error) {
establishPendingConns: new(Conns),
untunneledPendingConns: untunneledPendingConns,
untunneledDialConfig: untunneledDialConfig,
// A buffer allows at least one signal to be sent even when the receiver is
// not listening. Senders should not block.
signalFetchRemoteServerList: make(chan struct{}, 1),
}

controller.splitTunnelClassifier = NewSplitTunnelClassifier(config, controller)
Expand Down Expand Up @@ -180,35 +184,54 @@ func (controller *Controller) SignalComponentFailure() {
}

// remoteServerListFetcher fetches an out-of-band list of server entries
// for more tunnel candidates. It fetches immediately, retries after failure
// with a wait period, and refetches after success with a longer wait period.
// for more tunnel candidates. It fetches when signalled, with retries
// on failure.
func (controller *Controller) remoteServerListFetcher() {
defer controller.runWaitGroup.Done()

loop:
var lastFetchTime time.Time

fetcherLoop:
for {
if !WaitForNetworkConnectivity(
controller.config.NetworkConnectivityChecker,
controller.shutdownBroadcast) {
break
// Wait for a signal before fetching
select {
case <-controller.signalFetchRemoteServerList:
case <-controller.shutdownBroadcast:
break fetcherLoop
}

err := FetchRemoteServerList(
controller.config, controller.untunneledDialConfig)
// Skip fetch entirely (i.e., send no request at all, even when ETag would save
// on response size) when a recent fetch was successful
if time.Now().Before(lastFetchTime.Add(FETCH_REMOTE_SERVER_LIST_STALE_PERIOD)) {
continue
}

retryLoop:
for {
// Don't attempt to fetch while there is no network connectivity,
// to avoid alert notice noise.
if !WaitForNetworkConnectivity(
controller.config.NetworkConnectivityChecker,
controller.shutdownBroadcast) {
break fetcherLoop
}

err := FetchRemoteServerList(
controller.config, controller.untunneledDialConfig)

if err == nil {
lastFetchTime = time.Now()
break retryLoop
}

var duration time.Duration
if err != nil {
NoticeAlert("failed to fetch remote server list: %s", err)
duration = FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD
} else {
duration = FETCH_REMOTE_SERVER_LIST_STALE_PERIOD
}
timeout := time.After(duration)
select {
case <-timeout:
// Fetch again
case <-controller.shutdownBroadcast:
break loop

timeout := time.After(FETCH_REMOTE_SERVER_LIST_RETRY_PERIOD)
select {
case <-timeout:
case <-controller.shutdownBroadcast:
break fetcherLoop
}
}
}

Expand Down Expand Up @@ -619,7 +642,9 @@ func (controller *Controller) establishCandidateGenerator() {
loop:
// Repeat until stopped
for {
// Yield each server entry returned by the iterator

// Send each iterator server entry to the establish workers
startTime := time.Now()
for {
serverEntry, err := iterator.Next()
if err != nil {
Expand All @@ -642,12 +667,33 @@ loop:
case <-controller.shutdownBroadcast:
break loop
}

if time.Now().After(startTime.Add(ESTABLISH_TUNNEL_WORK_TIME_SECONDS)) {
// Start over, after a brief pause, with a new shuffle of the server
// entries, and potentially some newly fetched server entries.
break
}
}
// Free up resources now, but don't reset until after the pause.
iterator.Close()

// Trigger a fetch remote server list, since we may have failed to
// connect with all known servers. Don't block sending signal, since
// this signal may have already been sent.
// Don't wait for fetch remote to succeed, since it may fail and
// enter a retry loop and we're better off trying more known servers.
// TODO: synchronize the fetch response, so it can be incorporated
// into the server entry iterator as soon as available.
select {
case controller.signalFetchRemoteServerList <- *new(struct{}):
default:
}
iterator.Reset()

// After a complete iteration of candidate servers, pause before iterating again.
// This helps avoid some busy wait loop conditions, and also allows some time for
// network conditions to change.
// network conditions to change. Also allows for fetch remote to complete,
// in typical conditions (it isn't strictly necessary to wait for this, there will
// be more rounds if required).
timeout := time.After(ESTABLISH_TUNNEL_PAUSE_PERIOD)
select {
case <-timeout:
Expand All @@ -657,6 +703,8 @@ loop:
case <-controller.shutdownBroadcast:
break loop
}

iterator.Reset()
}

NoticeInfo("stopped candidate generator")
Expand Down
10 changes: 10 additions & 0 deletions psiphon/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ func controllerRun(t *testing.T, protocol string) {
}
case "ListeningHttpProxyPort":
httpProxyPort = int(payload["port"].(float64))
case "ConnectingServer":
serverProtocol := payload["protocol"]
if serverProtocol != protocol {
t.Errorf("wrong protocol selected: %s", serverProtocol)
t.FailNow()
}
}
}))

Expand All @@ -118,6 +124,10 @@ func controllerRun(t *testing.T, protocol string) {

select {
case <-tunnelEstablished:

// Allow for known race condition described in NewHttpProxy():
time.Sleep(1 * time.Second)

// Test: fetch website through tunnel
fetchWebsite(t, httpProxyPort)

Expand Down
37 changes: 37 additions & 0 deletions psiphon/dataStore.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build windows

/*
* Copyright (c) 2015, Psiphon Inc.
* All rights reserved.
Expand Down Expand Up @@ -88,6 +90,9 @@ func InitDataStore(config *Config) (err error) {
(region text not null primary key,
etag text not null,
data blob not null);
create table if not exists urlETags
(url text not null primary key,
etag text not null);
create table if not exists keyValue
(key text not null primary key,
value text not null);
Expand Down Expand Up @@ -627,6 +632,38 @@ func GetSplitTunnelRoutesData(region string) (data []byte, err error) {
return data, nil
}

// SetUrlETag stores an ETag for the specfied URL.
// Note: input URL is treated as a string, and is not
// encoded or decoded or otherwise canonicalized.
func SetUrlETag(url, etag string) error {
return transactionWithRetry(func(transaction *sql.Tx) error {
_, err := transaction.Exec(`
insert or replace into urlETags (url, etag)
values (?, ?);
`, url, etag)
if err != nil {
// Note: ContextError() would break canRetry()
return err
}
return nil
})
}

// GetUrlETag retrieves a previously stored an ETag for the
// specfied URL. If not found, it returns an empty string value.
func GetUrlETag(url string) (etag string, err error) {
checkInitDataStore()
rows := singleton.db.QueryRow("select etag from urlETags where url = ?;", url)
err = rows.Scan(&etag)
if err == sql.ErrNoRows {
return "", nil
}
if err != nil {
return "", ContextError(err)
}
return etag, nil
}

// SetKeyValue stores a key/value pair.
func SetKeyValue(key, value string) error {
return transactionWithRetry(func(transaction *sql.Tx) error {
Expand Down
Loading

0 comments on commit 8b5cbce

Please sign in to comment.