Skip to content

Commit

Permalink
Continuous Learning (#146)
Browse files Browse the repository at this point in the history
* Implement continuous learning
* Referen to tn006_continuous_learning.md for details on the design
* Refactor the app class to handle wiring together all the different
components
* Now that we have a streaming architecture in which Analyzer sends
events to the Learner which sends events to InMemoryDB we need create
all these structs and then wire them up
   
Fix #84
  • Loading branch information
jlewi authored Jun 18, 2024
1 parent 8ca17d7 commit c869ca7
Show file tree
Hide file tree
Showing 17 changed files with 738 additions and 360 deletions.
56 changes: 0 additions & 56 deletions app/cmd/learn.go

This file was deleted.

1 change: 0 additions & 1 deletion app/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ func NewRootCmd() *cobra.Command {
rootCmd.AddCommand(NewConvertCmd())
rootCmd.AddCommand(NewVersionCmd(appName, os.Stdout))
rootCmd.AddCommand(NewServeCmd())
rootCmd.AddCommand(NewLearnCmd())
rootCmd.AddCommand(NewConfigCmd())
rootCmd.AddCommand(NewLogsCmd())
rootCmd.AddCommand(NewApplyCmd())
Expand Down
35 changes: 4 additions & 31 deletions app/cmd/serve.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package cmd

import (
"context"
"fmt"
"os"

"github.com/jlewi/monogo/helpers"

"github.com/jlewi/foyle/app/pkg/application"

"github.com/jlewi/monogo/helpers"
"github.com/spf13/cobra"
)

Expand All @@ -30,36 +30,9 @@ func NewServeCmd() *cobra.Command {
if err := app.OpenDBs(); err != nil {
return err
}

logDirs := make([]string, 0, 2)
logDirs = append(logDirs, app.Config.GetRawLogDir())

if app.Config.Learner != nil {
logDirs = append(logDirs, app.Config.Learner.LogDirs...)
}

analyzer, err := app.SetupAnalyzer()
if err != nil {
return err
}

if err := analyzer.Run(context.Background(), logDirs); err != nil {
return err
}
s, err := app.SetupServer()
if err != nil {
return err
}
defer helpers.DeferIgnoreError(app.Shutdown)

// Analyzer needs to be shutdown before the app because the app will close the database
defer helpers.DeferIgnoreError(func() error {
return analyzer.Shutdown(context.Background())
})

logVersion()
return s.Run()

defer helpers.DeferIgnoreError(app.Shutdown)
return app.Serve()
}()

if err != nil {
Expand Down
23 changes: 11 additions & 12 deletions app/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,29 @@ type Agent struct {
db *learn.InMemoryExampleDB
}

func NewAgent(cfg config.Config, client *openai.Client) (*Agent, error) {
func NewAgent(cfg config.Config, client *openai.Client, inMemoryExampleDB *learn.InMemoryExampleDB) (*Agent, error) {
if cfg.Agent == nil {
return nil, errors.New("Configuration is missing AgentConfig; configuration must define the agent field.")
}
log := zapr.NewLogger(zap.L())
log.Info("Creating agent", "config", cfg.Agent)
var db *learn.InMemoryExampleDB
if cfg.Agent.RAG != nil && cfg.Agent.RAG.Enabled {
log.Info("RAG is enabled; loading data")

if client == nil {
return nil, errors.New("OpenAI client is required for RAG")
}
var err error
db, err = learn.NewInMemoryExampleDB(cfg, client)
if err != nil {
return nil, errors.Wrap(err, "Failed to create InMemoryExampleDB")
if client == nil {
return nil, errors.New("OpenAI client is required")
}
if cfg.Agent.RAG != nil && cfg.Agent.RAG.Enabled {
if inMemoryExampleDB == nil {
return nil, errors.New("RAG is enabled but learn is nil; learn must be set to use RAG")
}
log.Info("RAG is enabled")
} else {
inMemoryExampleDB = nil
}

return &Agent{
client: client,
config: cfg,
db: db,
db: inMemoryExampleDB,
}, nil
}

Expand Down
11 changes: 9 additions & 2 deletions app/pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"os"
"testing"

"github.com/jlewi/foyle/app/pkg/learn"

"github.com/jlewi/foyle/app/api"

"github.com/jlewi/foyle/app/pkg/config"
Expand Down Expand Up @@ -77,15 +79,20 @@ func Test_Generate(t *testing.T) {
MaxResults: 3,
}
cfg.Agent.RAG.Enabled = true
agentWithRag, err := NewAgent(*cfg, client)

inMemoryDB, err := learn.NewInMemoryExampleDB(*cfg, client)
if err != nil {
t.Fatalf("Error creating in memory DB; %v", err)
}
agentWithRag, err := NewAgent(*cfg, client, inMemoryDB)

if err != nil {
t.Fatalf("Error creating agent; %v", err)
}

cfgNoRag := cfg.DeepCopy()
cfgNoRag.Agent.RAG.Enabled = false
agentNoRag, err := NewAgent(cfgNoRag, client)
agentNoRag, err := NewAgent(cfgNoRag, client, nil)

if err != nil {
t.Fatalf("Error creating agent; %v", err)
Expand Down
15 changes: 14 additions & 1 deletion app/pkg/analyze/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type Analyzer struct {

watcher *fsnotify.Watcher

blockNotifier PostBlockEvent

handleLogFileIsDone sync.WaitGroup
handleBlocksIsDone sync.WaitGroup
logFileOffsets map[string]int64
Expand Down Expand Up @@ -129,8 +131,14 @@ type blockItem struct {
id string
}

// PostBlockEvent interface for functions to post block events.
type PostBlockEvent func(id string) error

// Run runs the analyzer; continually processing logs.
func (a *Analyzer) Run(ctx context.Context, logDirs []string) error {
// blockNotifier is an optional function that will be called when a block is updated.
// This should be non blocking.
func (a *Analyzer) Run(ctx context.Context, logDirs []string, blockNotifier PostBlockEvent) error {
a.blockNotifier = blockNotifier
// Find all the current files
jsonFiles, err := findLogFilesInDirs(ctx, logDirs)
if err != nil {
Expand Down Expand Up @@ -536,6 +544,11 @@ func (a *Analyzer) handleBlockEvents(ctx context.Context) {
if err != nil {
log.Error(err, "Error processing block", "block", blockItem.id)
}
if a.blockNotifier != nil {
if err := a.blockNotifier(blockItem.id); err != nil {
log.Error(err, "Error notifying block event", "block", blockItem.id)
}
}
if a.signalBlockDone != nil {
a.signalBlockDone <- blockItem.id
}
Expand Down
2 changes: 1 addition & 1 deletion app/pkg/analyze/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func Test_Analyzer(t *testing.T) {
a.signalFileDone = fileProcessed
a.signalBlockDone = blockProccessed

if err := a.Run(context.Background(), []string{rawDir}); err != nil {
if err := a.Run(context.Background(), []string{rawDir}, nil); err != nil {
t.Fatalf("Analyze failed: %v", err)
}

Expand Down
Loading

0 comments on commit c869ca7

Please sign in to comment.