-
Notifications
You must be signed in to change notification settings - Fork 21
/
response.go
202 lines (185 loc) · 7.65 KB
/
response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package gremtune
import (
"encoding/json"
"fmt"
)
const (
statusSuccess = 200
statusNoContent = 204
statusPartialContent = 206
statusUnauthorized = 401
statusAuthenticate = 407
statusMalformedRequest = 498
statusInvalidRequestArguments = 499
statusServerError = 500
statusScriptEvaluationError = 597
statusServerTimeout = 598
statusServerSerializationError = 599
)
// Status struct is used to hold properties returned from requests to the gremlin server
type Status struct {
Message string `json:"message"`
Code int `json:"code"`
Attributes map[string]interface{} `json:"attributes"`
}
// Result struct is used to hold properties returned for results from requests to the gremlin server
type Result struct {
// Query Response Data
Data json.RawMessage `json:"data"`
Meta map[string]interface{} `json:"meta"`
}
// AsyncResponse structs holds the entire response from requests to the gremlin server
type AsyncResponse struct {
Response Response `json:"response"` //Partial Response object
ErrorMessage string `json:"errorMessage"` // Error message if there was an error
}
// Response structs holds the entire response from requests to the gremlin server
type Response struct {
RequestID string `json:"requestId"`
Status Status `json:"status"`
Result Result `json:"result"`
}
// ToString returns a string representation of the Response struct
func (r Response) ToString() string {
return fmt.Sprintf("Response \nRequestID: %v, \nStatus: {%#v}, \nResult: {%#v}\n", r.RequestID, r.Status, r.Result)
}
func (c *Client) handleResponse(msg []byte) (err error) {
resp, err := marshalResponse(msg)
if resp.Status.Code == statusAuthenticate { //Server request authentication
return c.authenticate(resp.RequestID)
}
c.saveResponse(resp, err)
return
}
// marshalResponse creates a response struct for every incoming response for further manipulation
func marshalResponse(msg []byte) (resp Response, err error) {
err = json.Unmarshal(msg, &resp)
if err != nil {
return
}
err = resp.detectError()
return
}
// saveResponse makes the response available for retrieval by the requester. Mutexes are used for thread safety.
func (c *Client) saveResponse(resp Response, err error) {
c.Lock()
defer c.Unlock()
var container []interface{}
existingData, ok := c.results.Load(resp.RequestID) // Retrieve old data container (for requests with multiple responses)
if ok {
container = existingData.([]interface{})
}
newdata := append(container, resp) // Create new data container with new data
c.results.Store(resp.RequestID, newdata) // Add new data to buffer for future retrieval
respNotifier, load := c.responseNotifier.LoadOrStore(resp.RequestID, make(chan error, 1))
responseStatusNotifier, load := c.responseStatusNotifier.LoadOrStore(resp.RequestID, make(chan int, 1))
_ = load
if cap(responseStatusNotifier.(chan int)) > len(responseStatusNotifier.(chan int)) {
// Channel is not full so adding the response status to the channel else it will cause the method to wait till the response is read by requester
responseStatusNotifier.(chan int) <- resp.Status.Code
}
if resp.Status.Code != statusPartialContent {
respNotifier.(chan error) <- err
}
}
// retrieveResponseAsync retrieves the response saved by saveResponse and send the retrieved reponse to the channel .
func (c *Client) retrieveResponseAsync(id string, responseChannel chan AsyncResponse) {
var responseProcessedIndex int
responseNotifier, _ := c.responseNotifier.Load(id)
responseStatusNotifier, _ := c.responseStatusNotifier.Load(id)
for status := range responseStatusNotifier.(chan int) {
_ = status
if dataI, ok := c.results.Load(id); ok {
d := dataI.([]interface{})
// Only retrieve all but one from the partial responses saved in results Map that are not sent to responseChannel
for i := responseProcessedIndex; i < len(d)-1; i++ {
responseProcessedIndex++
var asyncResponse AsyncResponse = AsyncResponse{}
asyncResponse.Response = d[i].(Response)
// Send the Partial response object to the responseChannel
responseChannel <- asyncResponse
}
}
//Checks to see If there was an Error or full response has been provided by Neptune
if len(responseNotifier.(chan error)) > 0 {
//Checks to see If there was an Error or will get nil when final reponse has been provided by Neptune
err := <-responseNotifier.(chan error)
if dataI, ok := c.results.Load(id); ok {
d := dataI.([]interface{})
// Retrieve all the partial responses that are not sent to responseChannel
for i := responseProcessedIndex; i < len(d); i++ {
responseProcessedIndex++
asyncResponse := AsyncResponse{}
asyncResponse.Response = d[i].(Response)
//when final partial response it sent it also sends the error message if there was an error on the last partial response retrival
if responseProcessedIndex == len(d) && err != nil {
asyncResponse.ErrorMessage = err.Error()
}
// Send the Partial response object to the responseChannel
responseChannel <- asyncResponse
}
}
// All the Partial response object including the final one has been sent to the responseChannel
break
}
}
// All the Partial response object including the final one has been sent to the responseChannel so closing responseStatusNotifier, responseNotifier, responseChannel and removing all the reponse stored
close(responseStatusNotifier.(chan int))
close(responseNotifier.(chan error))
c.responseNotifier.Delete(id)
c.responseStatusNotifier.Delete(id)
c.deleteResponse(id)
close(responseChannel)
}
// retrieveResponse retrieves the response saved by saveResponse.
func (c *Client) retrieveResponse(id string) (data []Response, err error) {
resp, _ := c.responseNotifier.Load(id)
responseStatusNotifier, _ := c.responseStatusNotifier.Load(id)
err = <-resp.(chan error)
if err == nil {
if dataI, ok := c.results.Load(id); ok {
d := dataI.([]interface{})
data = make([]Response, len(d))
for i := range d {
data[i] = d[i].(Response)
}
close(resp.(chan error))
close(responseStatusNotifier.(chan int))
c.responseNotifier.Delete(id)
c.responseStatusNotifier.Delete(id)
c.deleteResponse(id)
}
}
return
}
// deleteRespones deletes the response from the container. Used for cleanup purposes by requester.
func (c *Client) deleteResponse(id string) {
c.results.Delete(id)
return
}
// responseDetectError detects any possible errors in responses from Gremlin Server and generates an error for each code
func (r *Response) detectError() (err error) {
switch r.Status.Code {
case statusSuccess, statusNoContent, statusPartialContent:
break
case statusUnauthorized:
err = fmt.Errorf("UNAUTHORIZED - Response Message: %s", r.Status.Message)
case statusAuthenticate:
err = fmt.Errorf("AUTHENTICATE - Response Message: %s", r.Status.Message)
case statusMalformedRequest:
err = fmt.Errorf("MALFORMED REQUEST - Response Message: %s", r.Status.Message)
case statusInvalidRequestArguments:
err = fmt.Errorf("INVALID REQUEST ARGUMENTS - Response Message: %s", r.Status.Message)
case statusServerError:
err = fmt.Errorf("SERVER ERROR - Response Message: %s", r.Status.Message)
case statusScriptEvaluationError:
err = fmt.Errorf("SCRIPT EVALUATION ERROR - Response Message: %s", r.Status.Message)
case statusServerTimeout:
err = fmt.Errorf("SERVER TIMEOUT - Response Message: %s", r.Status.Message)
case statusServerSerializationError:
err = fmt.Errorf("SERVER SERIALIZATION ERROR - Response Message: %s", r.Status.Message)
default:
err = fmt.Errorf("UNKNOWN ERROR - Response Message: %s", r.Status.Message)
}
return
}