Skip to content

Commit

Permalink
live log streaming works
Browse files Browse the repository at this point in the history
  • Loading branch information
AngieCris committed Sep 27, 2017
1 parent 6e38a9b commit 709af07
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 1 deletion.
7 changes: 7 additions & 0 deletions cmd/vic-machine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/vmware/vic/cmd/vic-machine/list"
"github.com/vmware/vic/cmd/vic-machine/update"
"github.com/vmware/vic/cmd/vic-machine/upgrade"
"github.com/vmware/vic/lib/install/vchlog"
viclog "github.com/vmware/vic/pkg/log"
"github.com/vmware/vic/pkg/version"
)
Expand Down Expand Up @@ -139,6 +140,12 @@ func main() {
logs = append(logs, f)
}

// create the logger for streaming VCH log messages
vchlog.Init()
logs = append(logs, vchlog.GetPipe())
go vchlog.Run()
defer vchlog.Close() // close the logger pipe when done

// Initiliaze logger with default TextFormatter
log.SetFormatter(viclog.NewTextFormatter())
// SetOutput to io.MultiWriter so that we can log to stdout and a file
Expand Down
14 changes: 13 additions & 1 deletion lib/install/management/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/vmware/govmomi/object"
"github.com/vmware/vic/lib/config"
"github.com/vmware/vic/lib/install/data"
"github.com/vmware/vic/lib/install/vchlog"
"github.com/vmware/vic/pkg/errors"
"github.com/vmware/vic/pkg/retry"
"github.com/vmware/vic/pkg/trace"
Expand All @@ -37,6 +38,7 @@ const (
uploadMaxElapsedTime = 30 * time.Minute
uploadMaxInterval = 1 * time.Minute
uploadInitialInterval = 10 * time.Second
timeFormat = "2006-01-02T15:04:05-0700"
)

func (d *Dispatcher) CreateVCH(conf *config.VirtualContainerHostConfigSpec, settings *data.InstallerData) error {
Expand All @@ -60,6 +62,16 @@ func (d *Dispatcher) CreateVCH(conf *config.VirtualContainerHostConfigSpec, sett
return errors.Errorf("Creating the appliance failed with %s. Exiting...", err)
}

// send the signal to VCH logger to indicate VCH datastore path is ready
datastoreReadySignal := vchlog.DatastoreReadySignal{
Datastore: d.session.Datastore,
LogFileName: "vic-machine-create",
Operation: trace.NewOperation(d.ctx, "vic-machine create"),
VMPathName: d.vmPathName,
Timestamp: time.Now().UTC().Format(timeFormat),
}
vchlog.Signal(datastoreReadySignal)

if err = d.uploadImages(settings.ImageFiles); err != nil {
return errors.Errorf("Uploading images failed with %s. Exiting...", err)
}
Expand Down Expand Up @@ -139,7 +151,7 @@ func (d *Dispatcher) uploadImages(files map[string]string) error {
switch err.(type) {
// if not found, do nothing
case object.DatastoreNoSuchFileError:
// otherwise force delete
// otherwise force delete
default:
log.Debugf("target delete path = %s", isoTargetPath)
err := fm.Delete(d.ctx, isoTargetPath)
Expand Down
82 changes: 82 additions & 0 deletions lib/install/vchlog/bufferedPipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vchlog

import (
"bytes"
"sync"
"io"
)

// BufferedPipe struct implements a pipe readwriter with buffer
// buffer: the internal buffer to hold data
// c: the sync locker to manage concurrent reads and writes
// closed: boolean indicating if the stream is closed
type BufferedPipe struct {
buffer *bytes.Buffer
c *sync.Cond
closed bool
}

// NewBufferedPipe returns a new buffered pipe instance
// the internal buffer is initialized to default size.
// Since internal memory is used, need to make sure that buffered data is bounded.
func NewBufferedPipe() *BufferedPipe {
var m sync.Mutex
c := sync.NewCond(&m)
return &BufferedPipe{
buffer: bytes.NewBuffer(nil),
c: c,
closed: false,
}
}

// Read is blocked until a writer in the queue is done writing (until data is available)
func (bp *BufferedPipe) Read(data []byte) (n int, err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()

// pipe closed, drop all left-over data
if bp.closed {
return 0, io.EOF
}
for bp.buffer.Len() == 0 && !bp.closed {
bp.c.Wait()
}

return bp.buffer.Read(data)
}

// Write writes to the internal buffer, and signals one of the reader in queue to start reading.
func (bp *BufferedPipe) Write(data []byte) (n int, err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()
defer bp.c.Signal()

if bp.closed {
return 0, io.ErrUnexpectedEOF
}

return bp.buffer.Write(data)
}

// Close closes the pipe.
func (bp *BufferedPipe) Close() (err error) {
bp.c.L.Lock()
defer bp.c.L.Unlock()
defer bp.c.Signal()
bp.closed = true
return nil
}
70 changes: 70 additions & 0 deletions lib/install/vchlog/vchlogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2016-2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package vchlog

import (
"path"

"github.com/vmware/govmomi/object"
"github.com/vmware/vic/pkg/trace"
)

// DatastoreReadySignal serves as a signal struct indicating datastore folder path is available
// Datastore: the govmomi datastore object
// LogFileName: the filename of the destination path on datastore
// Context: the caller context when sending the signal
// VMPathName: the datastore path
type DatastoreReadySignal struct {
Datastore *object.Datastore
LogFileName string
Operation trace.Operation
VMPathName string
Timestamp string
}

// pipe: the streaming readwriter pipe to hold log messages
var pipe *BufferedPipe

// signalChan: channel for signaling when datastore folder is ready
var signalChan chan DatastoreReadySignal

// Init initializes the logger, creates the streaming pipe and makes the singaling channel.
func Init() {
pipe = NewBufferedPipe()
signalChan = make(chan DatastoreReadySignal)
}

// Run waits until the signal arrives and uploads the streaming pipe to datastore
func Run() {
sig := <-signalChan
// suffix the log file name with caller operation ID and timestamp
logFileName := sig.LogFileName + "_time_" + sig.Timestamp + "_op_" + sig.Operation.ID()
sig.Datastore.Upload(sig.Operation.Context, pipe, path.Join(sig.VMPathName, logFileName), nil)
}

// GetPipe returns the streaming pipe of the vch logger
func GetPipe() *BufferedPipe {
return pipe
}

// Signal signals the logger that the datastore folder is ready
func Signal(sig DatastoreReadySignal) {
signalChan <- sig
}

// Close stops the logger by closing the underlying pipe
func Close() {
pipe.Close()
}

0 comments on commit 709af07

Please sign in to comment.