forked from qasaur/gremgo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
response.go
120 lines (108 loc) · 3.06 KB
/
response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package gremgo
import (
"encoding/json"
"errors"
)
type response struct {
data interface{}
requestId string
code int
}
func (c *Client) handleResponse(msg []byte) (err error) {
resp, err := marshalResponse(msg)
if err != nil {
return
}
if resp.code == 407 { //Server request authentication
return c.authenticate(resp.requestId)
}
c.saveResponse(resp)
return
}
// marshalResponse creates a response struct for every incoming response for further manipulation
func marshalResponse(msg []byte) (resp response, err error) {
var j map[string]interface{}
err = json.Unmarshal(msg, &j)
if err != nil {
return
}
status := j["status"].(map[string]interface{})
result := j["result"].(map[string]interface{})
code := status["code"].(float64)
resp.code = int(code)
err = responseDetectError(resp.code)
if err != nil {
resp.data = err // Modify response vehicle to have error (if exists) as data
} else {
resp.data = result["data"]
}
err = nil
resp.requestId = j["requestId"].(string)
return
}
// saveResponse makes the response available for retrieval by the requester. Mutexes are used for thread safety.
func (c *Client) saveResponse(resp response) {
c.respMutex.Lock()
var container []interface{}
existingData, ok := c.results.Load(resp.requestId) // Retrieve old data container (for requests with multiple responses)
if ok {
container = existingData.([]interface{})
}
newdata := append(container, resp.data) // Create new data container with new data
c.results.Store(resp.requestId, newdata) // Add new data to buffer for future retrieval
respNotifier, load := c.responseNotifier.LoadOrStore(resp.requestId, make(chan int, 1))
_=load
if resp.code != 206 {
respNotifier.(chan int) <- 1
}
c.respMutex.Unlock()
}
// retrieveResponse retrieves the response saved by saveResponse.
func (c *Client) retrieveResponse(id string) (data []interface{}) {
resp, _ := c.responseNotifier.Load(id)
n := <-resp.(chan int)
if n == 1 {
if dataI, ok := c.results.Load(id); ok {
data = dataI.([]interface{})
close(resp.(chan int))
c.responseNotifier.Delete(id)
c.deleteResponse(id)
}
}
return
}
// deleteRespones deletes the response from the container. Used for cleanup purposes by requester.
func (c *Client) deleteResponse(id string) {
c.results.Delete(id)
return
}
// responseDetectError detects any possible errors in responses from Gremlin Server and generates an error for each code
func responseDetectError(code int) (err error) {
switch {
case code == 200:
break
case code == 204:
break
case code == 206:
break
case code == 401:
err = errors.New("UNAUTHORIZED")
case code == 407:
err = errors.New("AUTHENTICATE")
case code == 498:
err = errors.New("MALFORMED REQUEST")
case code == 499:
err = errors.New("INVALID REQUEST ARGUMENTS")
case code == 500:
err = errors.New("SERVER ERROR")
case code == 597:
err = errors.New("SCRIPT EVALUATION ERROR")
case code == 598:
err = errors.New("SERVER TIMEOUT")
case code == 599:
err = errors.New("SERVER SERIALIZATION ERROR")
default:
err = errors.New("UNKNOWN ERROR")
}
return
}