Skip to content

Commit

Permalink
Tried to implement go-auto-reloading.
Browse files Browse the repository at this point in the history
Whole day of my life wasted: golang/go#20461.

Fanfuckingtastic.

Fuck you go.
  • Loading branch information
kevin-matthew committed Jan 29, 2024
1 parent 0a79837 commit 60aff86
Show file tree
Hide file tree
Showing 8 changed files with 493 additions and 110 deletions.
3 changes: 2 additions & 1 deletion doc/manual.org
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ use the Vorlage, not develop for it.
of the life cycle for the document.
5. *Shutdown Phase*: Vorlage unloads all the processors and deloads
all supporting libraries. All connections and requests will be
terminated.
terminated. The processors must shut everything down, and must free all
memory allocated.

* Documents
Documents are UTF-8 encoded files. The text is not canonicalized, so a
Expand Down
292 changes: 262 additions & 30 deletions doccomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,154 @@ import (
vorlageproc "ellem.so/vorlageproc"
"fmt"
"io"
"reflect"
"regexp"
"sync"
"sync/atomic"
)

var validProcessorName = regexp.MustCompile(`^[a-z0-9_\-]+$`)

// will also delete any processors in any list that have nil
func (c *Compiler) rebuildProcessors() (err error) {

// completely rebuild the processor list
newlist := make([]vorlageproc.Processor, 0, len(c.cprocessors)+len(c.goprocessors))
newlistinfo := make([]vorlageproc.ProcessorInfo, 0, len(c.cprocessors)+len(c.goprocessors))

// testarr will be a list of pointers to structures that implement vorlage.ProcessorInfo
var testarr = make([]interface{}, 0, len(newlist))
addProc := func(arr interface{}) {
lenr := reflect.ValueOf(arr).Len()
for i := 0; i < lenr; i++ {
v := reflect.ValueOf(arr).Index(i)
testarr = append(testarr, v.Interface())
}
}

// arrptr must be a pointer to an array to pointers
removenulls := func(arrptr interface{}) {
arreml := reflect.ValueOf(arrptr).Elem()
lenr := arreml.Len()
replacement := reflect.MakeSlice(arreml.Type(), 0, lenr)
for i := 0; i < lenr; i++ {
v := arreml.Index(i)
if !v.IsNil() {
replacement = reflect.Append(replacement, v)
}
}
t := replacement.Interface()
a := arrptr
c := reflect.ValueOf(arrptr).Elem()
reflect.ValueOf(arrptr).Elem().Set(replacement)
_ = a
_ = c
_ = t
}

// remove null values to clean up
removenulls(&c.cprocessors)
removenulls(&c.goprocessors)

// copy each type of processor into testarr.
// clang / shared object
addProc(c.cprocessors)
addProc(c.goprocessors)

// find any processors that are no longer in testarr but remain in
// c.processors, those need to be removed.
for i := range c.processors {
var j int
for j = 0; j < len(testarr); j++ {
ptr := reflect.ValueOf(c.processors[i]).Pointer()
ptr2 := reflect.ValueOf(testarr[j]).Pointer()
if ptr == ptr2 {
// we still need this one
break
}
}
if j == len(testarr) {
// this address in processors was not found in the upstream copies.
// thus this processor is no longer needed. Shut it down.
ptr := reflect.ValueOf(c.processors[i]).Pointer()
Logger.Alertf("%s (@ %x) is no longer needed, shutting down", c.processorInfos[i].Name, ptr)
err = c.processors[i].Shutdown()
if err != nil {
Logger.Alertf("error returned from shutdown.. this shouldn't happen as it will be ignored: %s", err)
}
}
}

// now carry over old processors and add new ones.
for i := range testarr {
// is this processor loaed in c.processors?
var j int
for j = 0; j < len(c.processors); j++ {
ptr := reflect.ValueOf(c.processors[j]).Pointer()
ptr2 := reflect.ValueOf(testarr[i]).Pointer()
if ptr == ptr2 {
// yup. loaded already carrie it over
newlist = append(newlist, c.processors[j])
newlistinfo = append(newlistinfo, c.processorInfos[j])
break
}
}
if j == len(c.processors) {
// this processor's address was not found in c.processors, put it in
newlist = append(newlist, testarr[i].(vorlageproc.Processor))
info, err := startupproc(newlist[len(newlist)-1])
if err != nil {
return err
}
newlistinfo = append(newlistinfo, info)
}
}

c.processors = newlist
c.processorInfos = newlistinfo

return nil
}

// everything we'd see in both doccomp-http and doccomp-cli and doccomp-pdf
type Compiler struct {

// processors is technically a list of pointers to items found in cprocessors
// and goprocessors.
// If you want to change processors, you must update cprocessors / goprocessors
// and then run updateProcessors()...
// these two arrays are associative
processors []vorlageproc.Processor
processorInfos []vorlageproc.ProcessorInfo

// used for thread safety of shutdown.
// if you change these, you need to run rebuildProcessors to take effect.
// if you set the pointers to nil, they will be marked for deletion.
// if you change the address of the pointer, they will be marked for reload.
cprocessors []*cProc
goprocessors []*goProc

// access these via the atomic.Load... funcitons
concurrentCompiles int64
concurrentReaders int32

// set to anything but 0 to have Compile reject requests.
// 1 = fully shutdown.
// 2 = shutting down but waiting for other compilers to close
// 3 = shutting down but waiting for other readers to close
// 4 = new compiles are being stalled due to a restart/reload of a processor
// the stall will continue until unstall is fed something
//
// shutdownCompilers0 and shutdownReaders0 will be listened to
// by shutdown when shutdown is in 2 and 3 states respectively. will be
// written too. If shutdown not in process, they will be nil
// todo: rename these.
atomicShutdown int32
shutdownCompilers0 chan bool
shutdownReaders0 chan bool
unstall sync.Mutex

// used for watching go reloads if AutoReloadGoFiles
gowatcher *watcher
}

type compileRequest struct {
Expand Down Expand Up @@ -81,28 +214,51 @@ func (c compileRequest) String() string {
return str
}

// see https://github.com/golang/go/issues/20461
var AutoReloadGoFiles bool = false

// will return an error if a processor failed to start and/or is invalid
func NewCompiler(proc []vorlageproc.Processor) (c Compiler, err error) {
c.processors = proc
func NewCompiler() (c *Compiler, err error) {

// load all the infos
c.processorInfos = make([]vorlageproc.ProcessorInfo, len(proc))
for i := range c.processors {
c.processorInfos[i], err = c.processors[i].Startup()
Logger.Debugf("starting %s...", c.processorInfos[i].Name)
if err != nil {
Logger.Alertf("processor %s failed to start: %s", c.processorInfos[i].Name, err)
return c, err
}
err = validate(&(c.processorInfos[i]))
if err != nil {
return c, err
// structure set up
c = new(Compiler)

// load the go processors
c.goprocessors, err = loadGoProcessors(GoPluginLoadPath)
if err != nil {
return c, err
}
defer func() {
if AutoReloadGoFiles {
go c.watchGoPath(GoPluginLoadPath)
}
Logger.Infof("loaded processor %s", c.processorInfos[i].Name)
Logger.Debugf("%s information:\n%s", c.processorInfos[i].Name, c.processorInfos[i])
}()

// load the c processors
c.cprocessors, err = loadCProcessors(CLoadPath)
if err != nil {
return c, err
}

return c, nil
return c, c.rebuildProcessors()
}

// helper to rebuildProcessors
func startupproc(proc vorlageproc.Processor) (info vorlageproc.ProcessorInfo, err error) {
ptr := reflect.ValueOf(proc).Pointer()
info, err = proc.Startup()
Logger.Debugf("starting %s (@ %x)...", info.Name, ptr)
if err != nil {
Logger.Alertf("processor %s (@ %x) failed to start: %s", info.Name, ptr, err)
return info, err
}
err = validate(&(info))
if err != nil {
return info, err
}
Logger.Infof("successfully loaded processor %s (@ %x)", info.Name, ptr)
Logger.Debugf("%s information:\n%s", info.Name, info)
return info, err
}

func validate(info *vorlageproc.ProcessorInfo) error {
Expand Down Expand Up @@ -180,10 +336,42 @@ type ActionHandler interface {
* Do not attempt to use the streams pointed to by req... they'll be read
* when the docstream is read.
*/
func (comp *Compiler) Compile(filepath string, allInput map[string]string, allStreams map[string]vorlageproc.StreamInput, actionsHandler ActionHandler) (docstream io.ReadCloser, err CompileStatus) {
func (comp *Compiler) Compile(filepath string, allInput map[string]string,
allStreams map[string]vorlageproc.StreamInput, actionsHandler ActionHandler) (docstream io.ReadCloser, err CompileStatus) {

for i := range comp.processors {
Logger.Errorf("%v", comp.processors[i])
}

if shutdowncode := atomic.LoadInt32(&comp.atomicShutdown); shutdowncode != 0 {
var erro error
switch shutdowncode {
case 1:
erro = NewError("compiler has shutdown")
return nil, CompileStatus{erro, false}
case 2:
erro = NewError("compiler is shutting down, waiting on other compiliations to finish")
return nil, CompileStatus{erro, false}
case 3:
erro = NewError("compiler is shutting down, waiting on other readers to finish")
return nil, CompileStatus{erro, false}
case 4:
// if 4, then we will try to lock unstall. which will lock this thread
// until Compiler.cont is called
comp.unstall.Lock()
comp.unstall.Unlock()
}
}

atomic.AddUint64(&nextRid, 1)
atomic.AddInt64(&comp.concurrentCompiles, 1)
defer atomic.AddInt64(&comp.concurrentCompiles, -1)
defer func() {
newi := atomic.AddInt64(&comp.concurrentCompiles, -1)
shutdowncode := atomic.LoadInt32(&comp.atomicShutdown)
if newi == 0 && shutdowncode == 2 {
comp.shutdownCompilers0 <- true
}
}()
compReq := compileRequest{
compiler: comp,
filepath: filepath,
Expand Down Expand Up @@ -266,27 +454,71 @@ func (comp *Compiler) Compile(filepath string, allInput map[string]string, allSt
return docstream, CompileStatus{erro, false}
}

return &doc, CompileStatus{}
return doc, CompileStatus{}
}

/*
* Returns all errors that occour when shutting down each processor.
* If there is at least 1 Compile function that has not returned, Shutdown
* will return an error
*/
func (comp *Compiler) Shutdown() []error {
compiles := atomic.LoadInt64(&comp.concurrentCompiles)
if compiles != 0 {
erro := NewError("compiles still running")
erro.SetSubjectf("%d compile compRequest still processing", compiles)
return []error{erro}
func (comp *Compiler) Shutdown() {
if comp.isshutdown() {
return
}
var ret []error
if comp.gowatcher != nil {
comp.gowatcher.close()
}
comp.makestall(1)

// at this point, all readers and compilers are done.
for i := range comp.processors {
err := comp.processors[i].Shutdown()
if err != nil {
ret = append(ret, err)
Logger.Alertf("error returned from shutdown.. this shouldn't happen as it will be ignored: %s", err)
}
}
return ret
}

func (comp *Compiler) isshutdown() bool {
return atomic.LoadInt32(&comp.atomicShutdown) == 1
}

// will wait until all readers and compiles on all threads are complete.
// if code is 1, will cause a full shutdown.
// if code is 4, will stall all Compile calls until cont is called. While stalled,
// you can make changes to the processor
func (comp *Compiler) makestall(code int) {
if comp.isshutdown() {
return
}
comp.shutdownCompilers0 = make(chan bool)
comp.shutdownReaders0 = make(chan bool)
if code == 4 {
Logger.Infof("blocking compiles")
comp.unstall.Lock()
}
defer atomic.StoreInt32(&comp.atomicShutdown, int32(code))

atomic.StoreInt32(&comp.atomicShutdown, 2)
compiles := atomic.LoadInt64(&comp.concurrentCompiles)
if compiles != 0 {
Logger.Infof("waiting for %d compiles to complete...", compiles)
<-comp.shutdownCompilers0
}

atomic.StoreInt32(&comp.atomicShutdown, 3)
readers := atomic.LoadInt32(&comp.concurrentReaders)
if readers != 0 {
Logger.Infof("waiting for %d readers to close...", readers)
<-comp.shutdownReaders0
}

}

// will undo the set-limbo state that makestall made
func (comp *Compiler) cont() {
Logger.Infof("unblocking compiles")
atomic.StoreInt32(&comp.atomicShutdown, 0)
comp.unstall.Unlock()
}
Loading

0 comments on commit 60aff86

Please sign in to comment.