Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC subscription bug fix. #24

Merged
merged 3 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client/client-1.0/grpc_client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func initCommandList() {

commandList[0] = `{"action":"get","path":"Vehicle/Speed","requestId":"232"}`
commandList[1] = `{"action":"set", "path":"Vehicle/Body/Lights/IsLeftIndicatorOn", "value":"true", "requestId":"245"}`
commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},{"type":"timebased","parameter": {"period":"5000"}}],"requestId":"246"}`
commandList[2] = `{"action":"subscribe","path":"Vehicle.Speed","filter":{"type":"curvelog","parameter":{"maxerr":"2","bufsize":"15"}},"requestId":"285"}`
/* commandList[2] = `{"action":"subscribe","path":"Vehicle","filter":[{"type":"paths","parameter":["Speed", "Chassis.Accelerator.PedalPosition"]},{"type":"timebased","parameter": {"period":"5000"}}],"requestId":"246"}`*/
commandList[3] = `{"action":"unsubscribe","subscriptionId":"X","requestId":"240"}` // X is replaced according to input

/* different variants
Expand Down
2 changes: 1 addition & 1 deletion feeder/feeder-template/feederv1/feederv1.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func statestorageSet(path string, val string, ts string) int {
}
return 0
case "redis":
dp := `{"val":"` + val + `", "ts":"` + ts + `"}`
dp := `{"value":"` + val + `", "ts":"` + ts + `"}`
err := redisClient.Set(path, dp, time.Duration(0)).Err()
if err != nil {
utils.Error.Printf("Job failed. Err=%s", err)
Expand Down
4 changes: 2 additions & 2 deletions feeder/feeder-template/feederv2/feederv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,15 @@ func statestorageSet(path string, val string, ts string) int {
}
return 0
case "redis":
dp := `{"val":"` + val + `", "ts":"` + ts + `"}`
dp := `{"value":"` + val + `", "ts":"` + ts + `"}`
err := redisClient.Set(path, dp, time.Duration(0)).Err()
if err != nil {
utils.Error.Printf("Job failed. Err=%s", err)
return -1
}
return 0
case "memcache":
dp := `{"val":"` + val + `", "ts":"` + ts + `"}`
dp := `{"value":"` + val + `", "ts":"` + ts + `"}`
err := memcacheClient.Set(&memcache.Item{Key: path, Value: []byte(dp)})
if err != nil {
utils.Error.Printf("Job failed. Err=%s", err)
Expand Down
15 changes: 8 additions & 7 deletions server/vissv2server/serviceMgr/curvelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func clCapture1dim(clChan chan CLPack, subscriptionId int, path string, bufSize
}
mcloseClSubId.Unlock()
dp := getVehicleData(path)
utils.Info.Printf("dp=%s", dp)
utils.MapRequest(dp, &dpMap)
_, ts := readRing(&aRingBuffer, 0) // read latest written
if ts != dpMap["ts"].(string) {
Expand All @@ -442,11 +443,11 @@ func clCapture1dim(clChan chan CLPack, subscriptionId int, path string, bufSize
var clPack CLPack
clPack.SubscriptionId = subscriptionId
if len(extraData) > 0 {
clPack.DataPack = `{"path":"` + path + `","data":` + extraData + "}"
clPack.DataPack = `{"path":"` + path + `","dp":` + extraData + "}"
clChan <- clPack
}
if lastSelected > 0 {
clPack.DataPack = `{"path":"` + path + `","data":` + data + "}"
clPack.DataPack = `{"path":"` + path + `","dp":` + data + "}"
clChan <- clPack
}
setRingTail(&aRingBuffer, lastSelected) // update tail pointer
Expand Down Expand Up @@ -628,7 +629,7 @@ func transformDataPoint(aRingBuffer *RingBuffer, index int, tsBase time.Time) (C
func returnSingleDp(clChan chan CLPack, subscriptionId int, path string) {
dp := getVehicleData(path)
var clPack CLPack
clPack.DataPack = `{"path":"` + path + `","data":` + dp + "}"
clPack.DataPack = `{"path":"` + path + `","dp":` + dp + "}"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
}
Expand All @@ -637,7 +638,7 @@ func returnSingleDp2(clChan chan CLPack, subscriptionId int, paths Dim2Elem) {
dp1 := getVehicleData(paths.Path1)
dp2 := getVehicleData(paths.Path2)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + dp1 + "}," + `{"path":"` + paths.Path2 + `","data":` + dp2 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + dp1 + "}," + `{"path":"` + paths.Path2 + `","dp":` + dp2 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
}
Expand All @@ -647,7 +648,7 @@ func returnSingleDp3(clChan chan CLPack, subscriptionId int, paths Dim3Elem) {
dp2 := getVehicleData(paths.Path2)
dp3 := getVehicleData(paths.Path3)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + dp1 + `},{"path":"` + paths.Path2 + `","data":` + dp2 + `},{"path":"` + paths.Path3 + `","data":` + dp3 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + dp1 + `},{"path":"` + paths.Path2 + `","dp":` + dp2 + `},{"path":"` + paths.Path3 + `","dp":` + dp3 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
}
Expand Down Expand Up @@ -687,7 +688,7 @@ func clCapture2dim(clChan chan CLPack, subscriptionId int, paths Dim2Elem, bufSi
if (currentBufSize == bufSize) || (closeClSession == true) {
data1, data2, updatedTail := clAnalyze2dim(&aRingBuffer1, &aRingBuffer2, currentBufSize, maxError)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + data1 + "}," + `{"path":"` + paths.Path2 + `","data":` + data2 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + data1 + "}," + `{"path":"` + paths.Path2 + `","dp":` + data2 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
setRingTail(&aRingBuffer1, updatedTail)
Expand Down Expand Up @@ -812,7 +813,7 @@ func clCapture3dim(clChan chan CLPack, subscriptionId int, paths Dim3Elem, bufSi
if (currentBufSize == bufSize) || (closeClSession == true) {
data1, data2, data3, updatedTail := clAnalyze3dim(&aRingBuffer1, &aRingBuffer2, &aRingBuffer3, currentBufSize, maxError)
var clPack CLPack
clPack.DataPack = `[{"path":"` + paths.Path1 + `","data":` + data1 + `},{"path":"` + paths.Path2 + `","data":` + data2 + `},{"path":"` + paths.Path3 + `","data":` + data3 + "}]"
clPack.DataPack = `[{"path":"` + paths.Path1 + `","dp":` + data1 + `},{"path":"` + paths.Path2 + `","dp":` + data2 + `},{"path":"` + paths.Path3 + `","dp":` + data3 + "}]"
clPack.SubscriptionId = subscriptionId
clChan <- clPack
setRingTail(&aRingBuffer1, updatedTail)
Expand Down
3 changes: 2 additions & 1 deletion utils/grcputils.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,7 @@ func createJSON(value string, key string) string {

func createJsonData(dataPack []*pb.DataPackages_DataPackage) string {
data := ""

Info.Printf("createJsonData:len(dataPack)=%d", len(dataPack))
if len(dataPack) > 1 {
data += "["
}
Expand All @@ -835,6 +835,7 @@ func createJsonData(dataPack []*pb.DataPackages_DataPackage) string {
} else {
path = DecompressPath(dataPack[i].GetPathC())
}
Info.Printf("createJsonData:path=%s", path)
dp := getJsonDp(dataPack[i])
data += `{"path":"` + path + `","dp":` + dp + `},`
}
Expand Down