diff --git a/fhevm-engine/fhevm-go-native/fhevm/api.go b/fhevm-engine/fhevm-go-native/fhevm/api.go index 76f18e7..5a2c4b6 100644 --- a/fhevm-engine/fhevm-go-native/fhevm/api.go +++ b/fhevm-engine/fhevm-go-native/fhevm/api.go @@ -8,11 +8,13 @@ import ( "math/big" "os" "sort" + "strings" "sync" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + _ "github.com/mattn/go-sqlite3" grpc "google.golang.org/grpc" ) @@ -116,6 +118,10 @@ type ChainStorageApi interface { } type ExecutorApi interface { + // Initialize the executor with the host logger + // HostLogger is an implementation of FHELogger from the host chain, + // used to delegate logging. If set to nil, logging will be disabled. + InitLogger(hostLogger FHELogger, ctx string) // Create a session for a single transaction to capture all fhe // operations inside the state. We also schedule asynchronous // compute in background to have operations inside @@ -134,6 +140,11 @@ type ExtraData struct { FheRandSeed [32]byte } +// Implement String method for ExtraData +func (ed ExtraData) String() string { + return fmt.Sprintf("ExtraData {FheRandSeed: %s}", common.BytesToHash(ed.FheRandSeed[:]).TerminalString()) +} + type ExecutorSession interface { Execute(input []byte, ed ExtraData, output []byte) error ContractAddress() common.Address @@ -157,6 +168,20 @@ type CacheBlockData struct { materializedCiphertexts map[string][]byte } +// Implement the fmt.Stringer interface +func (c CacheBlockData) String() string { + if len(c.materializedCiphertexts) == 0 { + return "MaterializedCiphertexts: none" + } + var sb strings.Builder + sb.WriteString("MaterializedCiphertexts: ") + for key, value := range c.materializedCiphertexts { + sb.WriteString(fmt.Sprintf(" %s: %s, ", key, common.BytesToHash(value).TerminalString())) + } + + return sb.String() +} + type BlockCiphertextQueue struct { queue []*ComputationToInsert // filter duplicates @@ -178,6 +203,7 @@ type ApiImpl struct { executorUrl string contractStorageAddress common.Address cache *CiphertextCache + logger ProxyLogger } type SessionImpl struct { @@ -194,6 +220,17 @@ type ComputationOperand struct { FheUintType FheUintType } +// Implement the fmt.Stringer interface +func (c ComputationOperand) String() string { + return fmt.Sprintf( + "ComputationOperand {IsScalar: %t, Handle: %s, CompressedCiphertext len: %d, FheUintType: %s}", + c.IsScalar, + common.BytesToHash(c.Handle).TerminalString(), + len(c.CompressedCiphertext), + c.FheUintType, + ) +} + type ComputationToInsert struct { segmentId SegmentId Operation FheOp @@ -202,6 +239,23 @@ type ComputationToInsert struct { CommitBlockId int64 } +// Return Handle as TerminalString +func (c ComputationToInsert) Handle() string { + return common.BytesToHash(c.OutputHandle).TerminalString() +} + +// Implement the fmt.Stringer interface +func (c ComputationToInsert) String() string { + return fmt.Sprintf( + "ComputationToInsert { SegmentId: %d, Operation: %s, OutputHandle: %s, Operands: %v, CommitBlockId: %d}", + c.segmentId, + c.Operation, + c.Handle(), + c.Operands, + c.CommitBlockId, + ) +} + type SessionComputationStore struct { insertedHandles map[string]int invalidatedSegments map[SegmentId]bool @@ -210,12 +264,18 @@ type SessionComputationStore struct { blockNumber int64 cache *CiphertextCache contractStorageAddress common.Address + logger ProxyLogger } type EvmStorageComputationStore struct { currentBlockNumber int64 contractStorageAddress common.Address cache *CiphertextCache + logger ProxyLogger +} + +func (executorApi *ApiImpl) InitLogger(hostLogger FHELogger, ctx string) { + executorApi.logger = log(hostLogger, ctx) } func (executorApi *ApiImpl) CreateSession(blockNumber int64) ExecutorSession { @@ -229,12 +289,16 @@ func (executorApi *ApiImpl) CreateSession(blockNumber int64) ExecutorSession { blockNumber: blockNumber, cache: executorApi.cache, contractStorageAddress: executorApi.contractStorageAddress, + logger: executorApi.logger, }, } } func (executorApi *ApiImpl) PreloadCiphertexts(blockNumber int64, api ChainStorageApi) error { + log := log(&executorApi.logger, "preload") + computations := executorApi.loadComputationsFromStateToCache(blockNumber, api) + log.Info("Preload ciphertexts", "block", blockNumber, "length", computations) if computations > 0 { return executorProcessPendingComputations(executorApi) } @@ -246,8 +310,9 @@ func (executorApi *ApiImpl) loadComputationsFromStateToCache(startBlockNumber in loadStartTime := time.Now() computations := 0 defer func() { + log := log(&executorApi.logger, "preload") duration := time.Since(loadStartTime) - fmt.Printf("ciphertext cache preloaded with %d ciphertexts in %dms\n", computations, duration.Milliseconds()) + log.Info("Preload done", "computations", computations, "duration", duration) }() // TODO: figure out the limit how long in future blocks we should preload @@ -342,8 +407,12 @@ func (executorApi *ApiImpl) loadComputationsFromStateToCache(startBlockNumber in } func (sessionApi *SessionImpl) Commit(blockNumber int64, storage ChainStorageApi) error { + log := log(&sessionApi.apiImpl.logger, "commit") + + log.Debug("Session store ciphertexts", "block", blockNumber) err := sessionApi.sessionStore.Commit(storage) if err != nil { + log.Error("Commit failed", "block", blockNumber, "error", err) return err } @@ -351,13 +420,16 @@ func (sessionApi *SessionImpl) Commit(blockNumber int64, storage ChainStorageApi if err != nil { return err } - return nil } func (sessionApi *SessionImpl) Execute(dataOrig []byte, ed ExtraData, outputOrig []byte) error { + log := log(&sessionApi.apiImpl.logger, "session::execute") + if len(dataOrig) < 4 { - return fmt.Errorf("input data must be at least 4 bytes for signature, got %d", len(dataOrig)) + err := fmt.Errorf("input data must be at least 4 bytes for signature, got %d", len(dataOrig)) + log.Error("Execute failed", "error", err) + return err } // make copies so we could assume array is immutable later @@ -374,12 +446,26 @@ func (sessionApi *SessionImpl) Execute(dataOrig []byte, ed ExtraData, outputOrig if len(output) >= 32 { // where to get output handle from? outputHandle := output[0:32] - return method.runFunction(sessionApi, callData, ed, outputHandle) + handle := common.BytesToHash(outputHandle).TerminalString() + + log.Debug("Call", "method", *method, "calldata len", len(callData), + "extra data", ed, "handle", handle) + + err := method.runFunction(sessionApi, callData, ed, outputHandle) + if err != nil { + log.Error("Computation not inserted", method, "handle", handle, "error", err) + } + + return err } else { - return errors.New("no output data provided") + err := errors.New("no output data provided") + log.Error("Execute failed", "error", err) + return err } } else { - return fmt.Errorf("signature %d not recognized", signature) + err := fmt.Errorf("signature %d not recognized", signature) + log.Error("Execute failed", "error", err) + return err } } @@ -417,6 +503,8 @@ func (dbApi *SessionComputationStore) InsertComputationBatch(computations []Comp } func (dbApi *SessionComputationStore) InsertComputation(computation ComputationToInsert) error { + log := log(&dbApi.logger, "session::execute") + _, found := dbApi.insertedHandles[string(computation.OutputHandle)] if !found { // preserve insertion order @@ -427,6 +515,8 @@ func (dbApi *SessionComputationStore) InsertComputation(computation ComputationT // he can have faster commit computation.CommitBlockId = dbApi.blockNumber + 5 dbApi.inserts = append(dbApi.inserts, computation) + log.Info("Insert computation", + "inserts count", len(dbApi.inserts), "computation", computation) } return nil @@ -445,8 +535,6 @@ func (dbApi *SessionComputationStore) Commit(storage ChainStorageApi) error { dbApi.invalidatedSegments = make(map[SegmentId]bool) dbApi.segmentCount = 0 - fmt.Printf("Inserting %d computations into the cache\n", len(finalInserts)) - evmInserter := EvmStorageComputationStore{ currentBlockNumber: dbApi.blockNumber, contractStorageAddress: dbApi.contractStorageAddress, @@ -574,6 +662,10 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain // in 5 or 10 blocks from current block, depending on how much they pay. // We create buckets, how many blocks in the future user wants // his ciphertexts to be evaluated + + log := log(&dbApi.logger, "evm_store") + log.Info("Processing computations", "count", len(computations)) + buckets := make(map[int64][]*ComputationToInsert) // index the buckets for ind, comp := range computations { @@ -583,6 +675,10 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain buckets[comp.CommitBlockId] = append(buckets[comp.CommitBlockId], &computations[ind]) } + if len(buckets) != 0 { + log.Debug("New buckets added", "buckets", len(buckets)) + } + // collect all their keys and sort because golang doesn't traverse map // in deterministic order allKeys := make([]int, 0) @@ -603,6 +699,13 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain for idx, comp := range bucket { layout := blockQueueStorageLayout(queueBlockNumber, int64(idx)) ciphertextsInBlock = ciphertextsInBlock.Add(ciphertextsInBlock, one) + + log.Info("Persist computation to LateCommit queue", + "handle", comp.Handle(), + "commit block", queueBlockNumber, + "count addr", countAddress.TerminalString(), + "ciphertextsInBlock", ciphertextsInBlock.Int64()) + metadata := computationMetadata(*comp) evmStorage.SetState(dbApi.contractStorageAddress, layout.metadata, metadata) evmStorage.SetState(dbApi.contractStorageAddress, layout.outputHandle, common.BytesToHash(comp.OutputHandle)) @@ -632,6 +735,7 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain queueBlockNumber := int64(key) bucket := buckets[queueBlockNumber] ctsStorage := dbApi.cache.ciphertextsToCompute[queueBlockNumber] + if ctsStorage == nil { ctsStorage = &BlockCiphertextQueue{ queue: make([]*ComputationToInsert, 0), @@ -649,8 +753,14 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain dbApi.hydrateComputationFromEvmState(evmStorage, comp) ctsStorage.queue = append(ctsStorage.queue, comp) ctsStorage.enqueuedCiphertext[string(comp.OutputHandle)] = true + + log.Debug("Add computation to Cache", + "commit block", queueBlockNumber, + "handle", comp.Handle(), + "cache length", len(ctsStorage.queue)) } } + } // notify about work available @@ -663,7 +773,9 @@ func (dbApi *EvmStorageComputationStore) InsertComputationBatch(evmStorage Chain } func (dbApi *EvmStorageComputationStore) hydrateComputationFromEvmState(evmStorage ChainStorageApi, comp *ComputationToInsert) error { + log := log(&dbApi.logger, "evm_store") + // hydrate operands from storage for idx := range comp.Operands { if !comp.Operands[idx].IsScalar { if len(comp.Operands[idx].Handle) != 32 { @@ -672,6 +784,9 @@ func (dbApi *EvmStorageComputationStore) hydrateComputationFromEvmState(evmStora hash := common.BytesToHash(comp.Operands[idx].Handle) resultCt := ReadBytesToAddress(evmStorage, dbApi.contractStorageAddress, hash) comp.Operands[idx].CompressedCiphertext = resultCt + + log.Info("Hydrate computation", "handle", comp.Handle(), + "operand_handle", hash.TerminalString(), "ciphertext len", len(resultCt)) } } @@ -745,11 +860,15 @@ func ReadBytesToAddress(api ChainStorageApi, contractAddress common.Address, add } func (executorApi *ApiImpl) flushFheResultsToState(blockNumber int64, api ChainStorageApi) error { + log := log(&executorApi.logger, "flush") + // cleanup the queue for the block number countAddress := blockNumberToQueueItemCountAddress(blockNumber) ciphertextsInBlock := api.GetState(executorApi.contractStorageAddress, countAddress).Big() ctCount := ciphertextsInBlock.Int64() + log.Debug("Flush ciphertexts", "block number", blockNumber, "count addr", countAddress.TerminalString(), "count", ctCount) + zero := common.BigToHash(big.NewInt(0)) one := big.NewInt(1) @@ -762,6 +881,10 @@ func (executorApi *ApiImpl) flushFheResultsToState(blockNumber int64, api ChainS ctAddr := blockQueueStorageLayout(blockNumber, int64(i)) metadata := bytesToMetadata(api.GetState(executorApi.contractStorageAddress, ctAddr.metadata)) outputHandle := api.GetState(executorApi.contractStorageAddress, ctAddr.outputHandle) + + log.Debug("Reset computation LateCommit queue", "block number", blockNumber, + "handle", outputHandle.TerminalString()) + handlesToMaterialize = append(handlesToMaterialize, outputHandle) api.SetState(executorApi.contractStorageAddress, ctAddr.metadata, zero) api.SetState(executorApi.contractStorageAddress, ctAddr.outputHandle, zero) @@ -781,6 +904,10 @@ func (executorApi *ApiImpl) flushFheResultsToState(blockNumber int64, api ChainS // set 0 as count if ctCount > 0 { api.SetState(executorApi.contractStorageAddress, countAddress, zero) + + log.Debug("Reset count addr", + "block number", blockNumber, + "count addr", countAddress.TerminalString(), "count", ctCount) } // materialize handles in storage assuming they exist in the cache @@ -798,6 +925,8 @@ func (executorApi *ApiImpl) materializeHandlesInStorage(blockNumber int64, handl executorApi.cache.lock.Unlock() }() + log := log(&executorApi.logger, "materialize") + executorApi.cache.latestBlockFlushed = blockNumber contractAddr := executorApi.contractStorageAddress @@ -814,6 +943,9 @@ func (executorApi *ApiImpl) materializeHandlesInStorage(blockNumber int64, handl return errors.New("ciphertext not found in cache") } + log.Info("Persist ciphertext to state", "block number", blockNumber, "handle", + handle.TerminalString(), "ciphertext length", len(ciphertext)) + putBytesToAddress(api, contractAddr, handle, ciphertext) } @@ -855,7 +987,9 @@ func ciphertextCacheGc(cache *CiphertextCache) { cache.lastCacheGc = time.Now() } -func InitExecutor() (ExecutorApi, error) { +func InitExecutor(hostLogger FHELogger) (ExecutorApi, error) { + log := log(hostLogger, "module::fhevm") + executorUrl, hasUrl := os.LookupEnv("FHEVM_EXECUTOR_URL") if !hasUrl { return nil, errors.New("FHEVM_EXECUTOR_URL is not configured") @@ -865,9 +999,8 @@ func InitExecutor() (ExecutorApi, error) { if !hasAddr { return nil, errors.New("FHEVM_EXECUTOR_URL is set but FHEVM_CONTRACT_ADDRESS is not set") } - fhevmContractAddress := common.HexToAddress(contractAddr) - fmt.Printf("Coprocessor contract address: %s\n", fhevmContractAddress) + fhevmContractAddress := common.HexToAddress(contractAddr) aclContractAddressHex := os.Getenv("ACL_CONTRACT_ADDRESS") if !common.IsHexAddress(aclContractAddressHex) { return nil, fmt.Errorf("bad or missing ACL_CONTRACT_ADDRESS: %s", aclContractAddressHex) @@ -877,6 +1010,12 @@ func InitExecutor() (ExecutorApi, error) { // pick hardcoded value in the beginning, we can change later storageAddress := common.HexToAddress("0x0000000000000000000000000000000000000070") + log.Info("FHEVM initialized", + "Executor addr", executorUrl, + "FHEVM contract", contractAddr, + "ACL contract", aclContractAddressHex, + "Storage contract", storageAddress.Hex()) + workAvailableChan := make(chan bool, 10) cache := &CiphertextCache{ @@ -902,6 +1041,8 @@ func InitExecutor() (ExecutorApi, error) { } func executorWorkerThread(impl *ApiImpl) { + log := log(&impl.logger, "worker") + for { // try reading notification from channel <-impl.cache.workAvailableChan @@ -912,13 +1053,14 @@ func executorWorkerThread(impl *ApiImpl) { err := executorProcessPendingComputations(impl) if err != nil { - fmt.Printf("executor error while processing pending computations: %s\n", err) + log.Error("Failed to compute", "error", err.Error()) } } } func executorProcessPendingComputations(impl *ApiImpl) error { - startTime := time.Now() + log := log(&impl.logger, "sync_compute") + impl.cache.lock.Lock() defer func() { impl.cache.lock.Unlock() @@ -926,13 +1068,6 @@ func executorProcessPendingComputations(impl *ApiImpl) error { availableCts := len(impl.cache.ciphertextsToCompute) - defer func() { - if availableCts > 0 { - duration := time.Since(startTime).Milliseconds() - fmt.Printf("executor computations completed in %dms\n", duration) - } - }() - // empty channel from multiple notifications before processing for len(impl.cache.workAvailableChan) > 0 { <-impl.cache.workAvailableChan @@ -959,6 +1094,9 @@ func executorProcessPendingComputations(impl *ApiImpl) error { ctToBlockIndex := make(map[string]int64) for block, compute := range impl.cache.ciphertextsToCompute { + log.Debug("Processing block", + "commit block", block, "computations", len(compute.queue)) + for _, ct := range compute.queue { syncInputs := make([]*SyncInput, 0, len(ct.Operands)) resultHandles := make([][]byte, 0, 1) @@ -984,29 +1122,49 @@ func executorProcessPendingComputations(impl *ApiImpl) error { } } - request.Computations = append(request.Computations, &SyncComputation{ + comp := &SyncComputation{ Operation: FheOperation(ct.Operation), Inputs: syncInputs, ResultHandles: resultHandles, - }) + } + + request.Computations = append(request.Computations, comp) + log.Debug("Add operation", "op", comp.Operation, "handle", ct.Handle()) + ctToBlockIndex[string(ct.OutputHandle)] = block } } - fmt.Printf("sending grpc request with %d computations and %d ciphertexts\n", len(request.Computations), len(request.CompressedCiphertexts)) + log.Info("Sending grpc request", + "computations", len(request.Computations), + "compressed ciphertexts", len(request.CompressedCiphertexts)) + if len(request.Computations) != 0 { + for _, compCt := range request.CompressedCiphertexts { + log.Debug("Request with compressed ciphertext", "handle", common.BytesToHash(compCt.Handle).TerminalString(), + "compCt len", len(compCt.Serialization)) + } + } + + startTime := time.Now() client := NewFhevmExecutorClient(conn) response, err := client.SyncCompute(context.Background(), &request) if err != nil { return err } + ciphertexts := response.GetResultCiphertexts() if ciphertexts == nil { return errors.New(response.GetError().String()) } + if availableCts > 0 { + log.Debug("Computations completed", "duration", time.Since(startTime)) + } + + log.Info("Response", "ciphertexts count", len(ciphertexts.Ciphertexts)) + outCts := ciphertexts.Ciphertexts - fmt.Printf("got %d ciphertext responses from the executor\n", len(outCts)) for _, ct := range outCts { theBlock, exists := ctToBlockIndex[string(ct.Handle)] if !exists { @@ -1022,6 +1180,8 @@ func executorProcessPendingComputations(impl *ApiImpl) error { } blockData.materializedCiphertexts[string(ct.Handle)] = ct.Serialization + + log.Debug("Response ciphertext", "handle", common.BytesToHash(ct.Handle).TerminalString(), "len", len(ct.Serialization)) } // reset map of the queue diff --git a/fhevm-engine/fhevm-go-native/fhevm/fhelib.go b/fhevm-engine/fhevm-go-native/fhevm/fhelib.go index fd52374..5846f74 100644 --- a/fhevm-engine/fhevm-go-native/fhevm/fhelib.go +++ b/fhevm-engine/fhevm-go-native/fhevm/fhelib.go @@ -17,6 +17,13 @@ type FheLibMethod struct { NonScalarDisabled bool } +func (m FheLibMethod) String() string { + return fmt.Sprintf( + "FheLibMethod(Name: %s, ArgTypes: %s, ScalarSupport: %t, NonScalarDisabled: %t)", + m.Name, m.ArgTypes, m.ScalarSupport, m.NonScalarDisabled, + ) +} + var signatureToFheLibMethod = map[uint32]*FheLibMethod{} func FheLibMethods() []*FheLibMethod { diff --git a/fhevm-engine/fhevm-go-native/fhevm/logger.go b/fhevm-engine/fhevm-go-native/fhevm/logger.go new file mode 100644 index 0000000..8cbd32b --- /dev/null +++ b/fhevm-engine/fhevm-go-native/fhevm/logger.go @@ -0,0 +1,128 @@ +package fhevm + +import ( + "fmt" + "os" +) + +// Map FheOp to its string representations +var fheOpNames = map[FheOp]string{ + FheAdd: "FheAdd", + FheSub: "FheSub", + FheMul: "FheMul", + FheDiv: "FheDiv", + FheRem: "FheRem", + FheBitAnd: "FheBitAnd", + FheBitOr: "FheBitOr", + FheBitXor: "FheBitXor", + FheShl: "FheShl", + FheShr: "FheShr", + FheRotl: "FheRotl", + FheRotr: "FheRotr", + FheEq: "FheEq", + FheNe: "FheNe", + FheGe: "FheGe", + FheGt: "FheGt", + FheLe: "FheLe", + FheLt: "FheLt", + FheMin: "FheMin", + FheMax: "FheMax", + FheNeg: "FheNeg", + FheNot: "FheNot", + FheCast: "FheCast", + TrivialEncrypt: "TrivialEncrypt", + FheIfThenElse: "FheIfThenElse", + FheRand: "FheRand", + FheRandBounded: "FheRandBounded", +} + +// String implements the fmt.Stringer interface for FheOp +func (op FheOp) String() string { + if name, ok := fheOpNames[op]; ok { + return name + } + return fmt.Sprintf("UnknownFheOp(%d)", op) +} + +// A FHELogger writes key/value pairs to a Handler +type FHELogger interface { + Trace(msg string, ctx ...interface{}) + Debug(msg string, ctx ...interface{}) + Info(msg string, ctx ...interface{}) + Warn(msg string, ctx ...interface{}) + Error(msg string, ctx ...interface{}) + Crit(msg string, ctx ...interface{}) +} + +// ProxyLogger is a concrete implementation of FHELogger that adds extra context to all calls. +type ProxyLogger struct { + // logger is the underlying logger that ProxyLogger delegates to. + // This should be the concrete logger implementation of the Host + logger FHELogger + ctx []interface{} +} + +// log creates a new ProxyLogger instance with the given logger and context. +func log(logger FHELogger, ctx ...interface{}) ProxyLogger { + return ProxyLogger{ + logger: logger, + ctx: ctx, + } +} + +// Trace adds extra context and delegates the call. +func (p *ProxyLogger) Trace(msg string, ctx ...interface{}) { + if p.logger == nil { + return + } + + p.logger.Trace(msg, append(p.ctx, ctx...)...) +} + +// Debug adds extra context and delegates the call. +func (p *ProxyLogger) Debug(msg string, ctx ...interface{}) { + if p.logger == nil { + return + } + + p.logger.Debug(msg, append(p.ctx, ctx...)...) +} + +// Info adds extra context and delegates the call. +func (p *ProxyLogger) Info(msg string, ctx ...interface{}) { + if p.logger == nil { + return + } + + p.logger.Info(msg, append(p.ctx, ctx...)...) +} + +// Warn adds extra context and delegates the call. +func (p *ProxyLogger) Warn(msg string, ctx ...interface{}) { + if p.logger == nil { + return + } + + p.logger.Warn(msg, append(p.ctx, ctx...)...) +} + +// Error adds extra context and delegates the call. +func (p *ProxyLogger) Error(msg string, ctx ...interface{}) { + if p.logger == nil { + return + } + + p.logger.Error(msg, append(p.ctx, ctx...)...) +} + +// Crit adds extra context and delegates the call. +// It terminates the process after logging the message. +// This is useful for fatal errors. +func (p *ProxyLogger) Crit(msg string, ctx ...interface{}) { + if p.logger != nil { + p.logger.Crit(msg, append(p.ctx, ctx...)...) + } + + // Exit the process + os.Exit(1) +}