From b5e748f4dc106f96f561e2eac681727d4c411b1c Mon Sep 17 00:00:00 2001 From: seefs001 Date: Sat, 28 Sep 2024 03:28:19 +0800 Subject: [PATCH] feat: Add Axiom handler for xlog and update related components - Implement AxiomHandler in xlog_handlers/axiom_handler.go - Update xlog/xlog.go to support Axiom integration - Modify xhttpc/xhttpc.go for improved HTTP client functionality - Enhance examples/xlog_example/main.go with Axiom logging example - Update .gitignore to exclude Axiom-related configuration files --- .gitignore | 3 +- examples/xlog_example/main.go | 39 ++++++- xhttpc/xhttpc.go | 19 ++++ xlog/xlog.go | 110 ++++++++++++-------- xlog_handlers/axiom_handler.go | 180 +++++++++++++++++++++++++++++++++ 5 files changed, 304 insertions(+), 47 deletions(-) create mode 100644 xlog_handlers/axiom_handler.go diff --git a/.gitignore b/.gitignore index ba5f758..abf5057 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,5 @@ go.work.sum .env .log .idea/ -.vscode/ \ No newline at end of file +.vscode/ +app.* \ No newline at end of file diff --git a/examples/xlog_example/main.go b/examples/xlog_example/main.go index b86638d..621ab50 100644 --- a/examples/xlog_example/main.go +++ b/examples/xlog_example/main.go @@ -1,10 +1,18 @@ package main import ( + "os" + "time" + + "github.com/seefs001/xox/xenv" + "github.com/seefs001/xox/xhttpc" "github.com/seefs001/xox/xlog" + "github.com/seefs001/xox/xlog_handlers" ) func main() { + xenv.Load() + // Use the default console logger xlog.Info("This is an info message") xlog.Warn("This is a warning message") @@ -21,12 +29,39 @@ func main() { xlog.Error("Failed to add file logger", "error", err) } - // Now logs will be output to both console and file without duplication - xlog.Info("This message will be logged to both console and file") + // Add Axiom handler if environment variables are set + axiomApiToken := os.Getenv("AXIOM_API_TOKEN") + axiomDataset := os.Getenv("AXIOM_DATASET") + if axiomApiToken != "" && axiomDataset != "" { + xlog.Info("Axiom handler adding") + axiomHandler := xlog_handlers.NewAxiomHandler(axiomApiToken, axiomDataset) + + // Enable debug mode for Axiom handler + axiomHandler.SetDebug(true) + axiomHandler.SetLogOptions(xhttpc.LogOptions{ + LogHeaders: true, + LogBody: true, + LogResponse: true, + MaxBodyLogSize: 1024, + HeaderKeysToLog: []string{"Content-Type", "Authorization"}, + }) + + xlog.Add(axiomHandler) + xlog.Info("Axiom handler added") + } + + // Now logs will be output to console, file, and Axiom (if configured) + xlog.Info("This message will be logged to all configured handlers") // Use the Catch function to wrap operations that may produce errors xlog.Catch(func() error { // Simulate an operation that may produce an error return nil // or return an error }) + + // Add a small delay to allow logs to be sent + time.Sleep(2 * time.Second) + + // Gracefully shutdown all handlers + xlog.Shutdown() } diff --git a/xhttpc/xhttpc.go b/xhttpc/xhttpc.go index bf5992e..dded578 100644 --- a/xhttpc/xhttpc.go +++ b/xhttpc/xhttpc.go @@ -172,6 +172,11 @@ func WithLogOptions(options LogOptions) ClientOption { } } +// SetLogOptions sets the logging options for debug mode +func (c *Client) SetLogOptions(options LogOptions) { + c.logOptions = options +} + // SetBaseURL sets the base URL for all requests func (c *Client) SetBaseURL(url string) *Client { c.baseURL = url @@ -647,3 +652,17 @@ func basicAuth(username, password string) string { auth := username + ":" + password return base64.StdEncoding.EncodeToString([]byte(auth)) } + +// WithBearerToken sets bearer auth token for all requests +func WithBearerToken(token string) ClientOption { + return func(c *Client) { + c.SetBearerToken(token) + } +} + +// WithBaseURL sets the base URL for all requests +func WithBaseURL(url string) ClientOption { + return func(c *Client) { + c.SetBaseURL(url) + } +} diff --git a/xlog/xlog.go b/xlog/xlog.go index 00017d4..205a905 100644 --- a/xlog/xlog.go +++ b/xlog/xlog.go @@ -20,6 +20,7 @@ var ( defaultHandler slog.Handler logConfig LogConfig defaultLevel slog.Level + handlers []slog.Handler // Add this line ) // LogConfig represents the configuration for logging. @@ -200,12 +201,67 @@ func (h *ColorConsoleHandler) WithGroup(name string) slog.Handler { return &newHandler } -// Add replaces the current handler with a new one +// Add adds a new handler to the existing handlers func Add(handler slog.Handler) { - defaultHandler = handler + handlers = append(handlers, handler) // Update this line + if mh, ok := defaultHandler.(*MultiHandler); ok { + // If defaultHandler is already a MultiHandler, add the new handler to it + mh.handlers = append(mh.handlers, handler) + } else { + // If not, create a new MultiHandler with both handlers + defaultHandler = NewMultiHandler(defaultHandler, handler) + } defaultLogger = slog.New(defaultHandler) } +// NewMultiHandler creates a new MultiHandler +func NewMultiHandler(handlers ...slog.Handler) *MultiHandler { + return &MultiHandler{handlers: handlers} +} + +// MultiHandler implements a handler that writes to multiple handlers +type MultiHandler struct { + handlers []slog.Handler +} + +// Enabled implements the Handler interface +func (h *MultiHandler) Enabled(ctx context.Context, level slog.Level) bool { + for _, handler := range h.handlers { + if handler.Enabled(ctx, level) { + return true + } + } + return false +} + +// Handle implements the Handler interface +func (h *MultiHandler) Handle(ctx context.Context, r slog.Record) error { + for _, handler := range h.handlers { + if err := handler.Handle(ctx, r); err != nil { + return err + } + } + return nil +} + +// WithAttrs implements the Handler interface +func (h *MultiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithAttrs(attrs) + } + return NewMultiHandler(handlers...) +} + +// WithGroup implements the Handler interface +func (h *MultiHandler) WithGroup(name string) slog.Handler { + handlers := make([]slog.Handler, len(h.handlers)) + for i, handler := range h.handlers { + handlers[i] = handler.WithGroup(name) + } + return NewMultiHandler(handlers...) +} + // FileConfig represents the configuration for file logging. type FileConfig struct { Filename string @@ -330,50 +386,16 @@ func Catch(f func() error) { } } -// MultiHandler implements a multi-handler that writes to multiple handlers. -type MultiHandler struct { - handlers []slog.Handler +// Add this interface if it doesn't exist +type ShutdownHandler interface { + Shutdown() } -// NewMultiHandler creates a new MultiHandler. -func NewMultiHandler(handlers ...slog.Handler) *MultiHandler { - return &MultiHandler{handlers: handlers} -} - -// Enabled implements the slog.Handler interface. -func (h *MultiHandler) Enabled(ctx context.Context, level slog.Level) bool { - for _, handler := range h.handlers { - if handler.Enabled(ctx, level) { - return true +// Add a function to shutdown all handlers +func Shutdown() { + for _, handler := range handlers { + if sh, ok := handler.(ShutdownHandler); ok { + sh.Shutdown() } } - return false -} - -// Handle implements the slog.Handler interface. -func (h *MultiHandler) Handle(ctx context.Context, r slog.Record) error { - for _, handler := range h.handlers { - if err := handler.Handle(ctx, r); err != nil { - return err - } - } - return nil -} - -// WithAttrs implements the slog.Handler interface. -func (h *MultiHandler) WithAttrs(attrs []slog.Attr) slog.Handler { - handlers := make([]slog.Handler, len(h.handlers)) - for i, handler := range h.handlers { - handlers[i] = handler.WithAttrs(attrs) - } - return NewMultiHandler(handlers...) -} - -// WithGroup implements the slog.Handler interface. -func (h *MultiHandler) WithGroup(name string) slog.Handler { - handlers := make([]slog.Handler, len(h.handlers)) - for i, handler := range h.handlers { - handlers[i] = handler.WithGroup(name) - } - return NewMultiHandler(handlers...) } diff --git a/xlog_handlers/axiom_handler.go b/xlog_handlers/axiom_handler.go new file mode 100644 index 0000000..569dd5a --- /dev/null +++ b/xlog_handlers/axiom_handler.go @@ -0,0 +1,180 @@ +package xlog_handlers + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/seefs001/xox/x" + "github.com/seefs001/xox/xhttpc" +) + +type AxiomHandler struct { + client *xhttpc.Client + dataset string + debug bool + logOptions xhttpc.LogOptions + buffer []map[string]interface{} + bufferSize int + mu sync.Mutex + sending bool + shutdownCh chan struct{} + wg sync.WaitGroup +} + +func NewAxiomHandler(apiToken, dataset string) *AxiomHandler { + client := xhttpc.NewClient( + xhttpc.WithBearerToken(apiToken), + xhttpc.WithBaseURL("https://api.axiom.co"), + ) + return &AxiomHandler{ + client: client, + dataset: dataset, + buffer: make([]map[string]interface{}, 0), + bufferSize: 100, // Adjust this value as needed + shutdownCh: make(chan struct{}), + } +} + +func (h *AxiomHandler) Enabled(ctx context.Context, level slog.Level) bool { + return true +} + +func (h *AxiomHandler) Handle(ctx context.Context, r slog.Record) error { + data := make(map[string]interface{}) + data["level"] = r.Level.String() + data["message"] = r.Message + data["time"] = r.Time.Format(time.RFC3339) + + r.Attrs(func(a slog.Attr) bool { + data[a.Key] = a.Value.Any() + return true + }) + + h.mu.Lock() + h.buffer = append(h.buffer, data) + shouldSend := len(h.buffer) >= h.bufferSize && !h.sending + h.mu.Unlock() + + if shouldSend { + h.wg.Add(1) + go func() { + defer h.wg.Done() + h.sendLogs(ctx) + }() + } + + return nil +} + +func (h *AxiomHandler) sendLogs(ctx context.Context) { + h.mu.Lock() + if h.sending { + h.mu.Unlock() + return + } + h.sending = true + logs := h.buffer + h.buffer = make([]map[string]interface{}, 0) + h.mu.Unlock() + + select { + case <-ctx.Done(): + return + case <-h.shutdownCh: + return + default: + // Proceed with sending logs + } + + if len(logs) == 0 { + h.mu.Lock() + h.sending = false + h.mu.Unlock() + return + } + + fmt.Println("lllll" + x.MustToJSON(logs)) + jsonData, err := json.Marshal(logs) + if err != nil { + fmt.Printf("failed to marshal log data: %v\n", err) + h.mu.Lock() + h.sending = false + h.mu.Unlock() + return + } + + url := fmt.Sprintf("/v1/datasets/%s/ingest", h.dataset) + resp, err := h.client.Post(ctx, url, bytes.NewReader(jsonData)) + if err != nil { + fmt.Printf("failed to send log to Axiom: %v\n", err) + } else { + defer resp.Body.Close() + if resp.StatusCode >= 400 { + fmt.Printf("Axiom API error: %s\n", resp.Status) + } + } + + h.mu.Lock() + h.sending = false + h.mu.Unlock() +} + +func (h *AxiomHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return h +} + +func (h *AxiomHandler) WithGroup(name string) slog.Handler { + return h +} + +func (h *AxiomHandler) SetDebug(debug bool) { + h.debug = debug + h.client.SetDebug(debug) +} + +func (h *AxiomHandler) SetLogOptions(options xhttpc.LogOptions) { + h.logOptions = options + h.client.SetLogOptions(options) +} + +func (h *AxiomHandler) Shutdown() { + close(h.shutdownCh) + h.wg.Wait() + + // Flush any remaining logs + h.mu.Lock() + remainingLogs := h.buffer + h.buffer = nil + h.mu.Unlock() + + if len(remainingLogs) > 0 { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + h.sendLogsImmediate(ctx, remainingLogs) + } +} + +func (h *AxiomHandler) sendLogsImmediate(ctx context.Context, logs []map[string]interface{}) { + // Similar to sendLogs, but sends immediately without buffering + // jsonData, err := json.Marshal(logs) + // if err != nil { + // fmt.Printf("failed to marshal log data: %v\n", err) + // return + // } + + url := fmt.Sprintf("/v1/datasets/%s/ingest", h.dataset) + resp, err := h.client.PostJSON(ctx, url, logs) + if err != nil { + fmt.Printf("failed to send log to Axiom immediately: %v\n", err) + } else { + defer resp.Body.Close() + if resp.StatusCode >= 400 { + fmt.Printf("Axiom API error: %s\n", resp.Status) + } + } +}