Skip to content

Commit

Permalink
[#7]: feat(API): API unification, move all SDK interfaces to the API
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Feb 15, 2022
2 parents cd52d66 + b1b14bd commit f8605e5
Show file tree
Hide file tree
Showing 27 changed files with 737 additions and 251 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
name: Linux

on:
push:
branches:
- master
- beta
- stable
tags-ignore:
- "**"
paths-ignore:
- "**.md"
- "**.yaml"
- "**.yml"
pull_request:
paths-ignore:
- "**.md"
- "**.yaml"
- "**.yml"

jobs:
golang:
name: Build (Go ${{ matrix.go }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
timeout-minutes: 60
strategy:
fail-fast: true
matrix:
go: ["1.17.7"]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v2 # action page: <https://github.com/actions/setup-go>
with:
go-version: ${{ matrix.go }}

- name: Check out code
uses: actions/checkout@v2

- name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules>
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-

- name: Install Go dependencies
run: go mod download

- name: Run golang tests with coverage
run: make test
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ SHELL = /bin/sh
test_coverage:
rm -rf coverage-ci
mkdir ./coverage-ci
go test -v -race -cover -tags=debug -coverpkg=./... -failfast -coverprofile=./coverage-ci/pipeline_jobs.out -covermode=atomic ./plugins/v2/jobs/pipeline
go test -v -race -cover -tags=debug -coverpkg=./... -failfast -coverprofile=./coverage-ci/pipeline_jobs.out -covermode=atomic ./plugins/jobs/pipeline
echo 'mode: atomic' > ./coverage-ci/summary.txt
tail -q -n +2 ./coverage-ci/*.out >> ./coverage-ci/summary.txt

test: ## Run application tests
go test -v -race -tags=debug ./plugins/v2/jobs/pipeline
go test -v -race -tags=debug ./plugins/jobs/pipeline

generate-proto:
protoc -I./proto/jobs/v1beta --go_out=./proto/jobs/v1beta jobs.proto
Expand Down
13 changes: 13 additions & 0 deletions bst/bst.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package bst

// Storage is general in-memory BST storage implementation
type Storage interface {
// Insert inserts to a vertex with topic ident connection uuid
Insert(uuid string, topic string)
// Remove removes uuid from topic, if the uuid is single for a topic, whole vertex will be removed
Remove(uuid, topic string)
// Get will return all connections associated with the topic
Get(topic string) map[string]struct{}
// Contains checks if the BST contains a topic
Contains(topic string) bool
}
20 changes: 20 additions & 0 deletions event_bus/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package event_bus //nolint:stylecheck

import (
"fmt"
)

type EventBus interface {
SubscribeAll(subID string, ch chan<- Event) error
SubscribeP(subID string, pattern string, ch chan<- Event) error
Unsubscribe(subID string)
UnsubscribeP(subID, pattern string)
Len() uint
Send(ev Event)
}

type Event interface {
Type() fmt.Stringer
Plugin() string
Message() string
}
12 changes: 1 addition & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/goccy/go-json v0.9.4
github.com/prometheus/client_golang v1.12.1
github.com/roadrunner-server/sdk/v2 v2.8.1
github.com/roadrunner-server/goridge/v3 v3.3.1
github.com/stretchr/testify v1.7.0
github.com/valyala/fasthttp v1.33.0
go.uber.org/zap v1.21.0
Expand All @@ -17,28 +17,18 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/kr/pretty v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/roadrunner-server/errors v1.1.1 // indirect
github.com/roadrunner-server/goridge/v3 v3.3.1 // indirect
github.com/roadrunner-server/tcplisten v1.1.1 // indirect
github.com/shirou/gopsutil v3.21.11+incompatible // indirect
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
)
24 changes: 1 addition & 23 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vb
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/goccy/go-json v0.9.4 h1:L8MLKG2mvVXiQu07qB6hmfqeSYQdOnqPot2GhsIwIaI=
github.com/goccy/go-json v0.9.4/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
Expand Down Expand Up @@ -129,8 +127,6 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down Expand Up @@ -195,33 +191,22 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/roadrunner-server/errors v1.1.1 h1:BzrB+xZE+iTISVfXSzIL9YbpPt6oHoRHgOBjlU/pigQ=
github.com/roadrunner-server/errors v1.1.1/go.mod h1:MzHjhRZIZc1ooMyYllUhNs0aTqRUbwcgUSO0TN7kCII=
github.com/roadrunner-server/goridge/v3 v3.3.1 h1:IYdm+smDfKl09AfFgKJeSNpSTp7KTgO3XfGPKrxs0vQ=
github.com/roadrunner-server/goridge/v3 v3.3.1/go.mod h1:f7SPSt9HUw5kbCc6Ofk4eEUU1xh2qHf/NznrTaW+aLA=
github.com/roadrunner-server/sdk/v2 v2.8.1 h1:QneTXD31gBmiEV5q+Cd4yDgoPCkKJZj+zrV1PLXp6dM=
github.com/roadrunner-server/sdk/v2 v2.8.1/go.mod h1:oqohHdPseV3P3woXk3H1XUnU8YeprC63O8wRmjCkP5Q=
github.com/roadrunner-server/tcplisten v1.1.1 h1:uVJVdg/zaasD2A4Mg+GyMlsUy2nLp9ADKec/REzql9Y=
github.com/roadrunner-server/tcplisten v1.1.1/go.mod h1:2MjzsggdgxCca4p2k3YJdWdo/QnQehiOTy0knlE226c=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.33.0 h1:mHBKd98J5NcXuBddgjvim1i3kWzlng1SzLhrnBOU9g8=
Expand All @@ -233,8 +218,6 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
Expand Down Expand Up @@ -335,7 +318,6 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand All @@ -349,7 +331,6 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -376,11 +357,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158 h1:rm+CHSpPEEW2IsXUib1ThaHIjuBVZjxNgSKmBLFfD4c=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
35 changes: 35 additions & 0 deletions internal/race_checker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//go:build race

package internal

import (
"crypto/sha512"
"fmt"
"runtime"
)

func SetChecker(b []byte) {
if len(b) == 0 {
return
}
c := checkIfConst(b)
go c.isStillConst()
runtime.SetFinalizer(c, (*constSlice).isStillConst)
}

type constSlice struct {
b []byte
checksum [64]byte
}

func checkIfConst(b []byte) *constSlice {
c := &constSlice{b: b}
c.checksum = sha512.Sum512(c.b)
return c
}

func (c *constSlice) isStillConst() {
if sha512.Sum512(c.b) != c.checksum {
panic(fmt.Sprintf("mutable access detected 0x%012x", &c.b[0]))
}
}
5 changes: 5 additions & 0 deletions internal/race_checker_unsafe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//go:build !race

package internal

func SetChecker(_ []byte) {}
38 changes: 38 additions & 0 deletions internal/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package internal

import (
"reflect"
"unsafe"
)

// AsBytes returns a slice that refers to the data backing the string s.
func AsBytes(s string) []byte {
// get the pointer to the data of the string
p := unsafe.Pointer((*reflect.StringHeader)(unsafe.Pointer(&s)).Data)

var b []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&b))
hdr.Data = uintptr(p)
// we need to set the cap and len for the string to byte convert
// because string is shorter than []bytes
hdr.Cap = len(s)
hdr.Len = len(s)

// checker to check mutable access to the data
SetChecker(b)
return b
}

// AsString returns a string that refers to the data backing the slice s.
func AsString(b []byte) string {
p := unsafe.Pointer((*reflect.SliceHeader)(unsafe.Pointer(&b)).Data)

var s string
hdr := (*reflect.StringHeader)(unsafe.Pointer(&s))
hdr.Data = uintptr(p)
hdr.Len = len(b)

// checker to check mutable access to the data
SetChecker(b)
return s
}
20 changes: 20 additions & 0 deletions ipc/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ipc

import (
"context"
"os/exec"

"github.com/roadrunner-server/api/v2/worker"
)

// Factory is responsible for wrapping given command into tasks WorkerProcess.
type Factory interface {
// SpawnWorkerWithTimeout creates new WorkerProcess process based on given command with context.
// Process must not be started.
SpawnWorkerWithTimeout(context.Context, *exec.Cmd) (worker.BaseProcess, error)
// SpawnWorker creates new WorkerProcess process based on given command.
// Process must not be started.
SpawnWorker(*exec.Cmd) (worker.BaseProcess, error)
// Close the factory and underlying connections.
Close() error
}
23 changes: 23 additions & 0 deletions payload/payload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package payload

import (
"github.com/roadrunner-server/api/v2/internal"
)

// Payload carries binary header and body to stack and
// back to the server.
type Payload struct {
// Context represent payload context, might be omitted.
Context []byte

// body contains binary payload to be processed by WorkerProcess.
Body []byte

// Type of codec used to decode/encode payload
Codec byte
}

// String returns payload body as string
func (p *Payload) String() string {
return internal.AsString(p.Body)
}
2 changes: 1 addition & 1 deletion plugins/informer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"github.com/roadrunner-server/api/v2/plugins/jobs"
"github.com/roadrunner-server/sdk/v2/state/process"
"github.com/roadrunner-server/api/v2/state/process"
)

// Statistic interfaces ==============
Expand Down
6 changes: 3 additions & 3 deletions plugins/jobs/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"github.com/roadrunner-server/api/v2/plugins/jobs/pipeline"
priorityqueue "github.com/roadrunner-server/sdk/v2/priority_queue"
"github.com/roadrunner-server/api/v2/pq"
)

// Consumer represents a single jobs driver interface
Expand Down Expand Up @@ -38,6 +38,6 @@ type Acknowledger interface {

// Constructor constructs Consumer interface. Endure abstraction.
type Constructor interface {
ConsumerFromConfig(configKey string, queue priorityqueue.Queue) (Consumer, error)
ConsumerFromPipeline(pipe *pipeline.Pipeline, queue priorityqueue.Queue) (Consumer, error)
ConsumerFromConfig(configKey string, queue pq.Queue) (Consumer, error)
ConsumerFromPipeline(pipe *pipeline.Pipeline, queue pq.Queue) (Consumer, error)
}
Loading

0 comments on commit f8605e5

Please sign in to comment.