-
Notifications
You must be signed in to change notification settings - Fork 406
/
msg_dispatcher.go
210 lines (185 loc) · 7.34 KB
/
msg_dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package keeper
import (
"fmt"
wasmvmtypes "github.com/CosmWasm/wasmvm/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/CosmWasm/wasmd/x/wasm/types"
)
// Messenger is an extension point for custom wasmd message handling
type Messenger interface {
// DispatchMsg encodes the wasmVM message and dispatches it.
DispatchMsg(ctx sdk.Context, contractAddr sdk.AccAddress, contractIBCPortID string, msg wasmvmtypes.CosmosMsg) (events []sdk.Event, data [][]byte, err error)
}
// replyer is a subset of keeper that can handle replies to submessages
type replyer interface {
reply(ctx sdk.Context, contractAddress sdk.AccAddress, reply wasmvmtypes.Reply) ([]byte, error)
}
// MessageDispatcher coordinates message sending and submessage reply/ state commits
type MessageDispatcher struct {
messenger Messenger
keeper replyer
}
// NewMessageDispatcher constructor
func NewMessageDispatcher(messenger Messenger, keeper replyer) *MessageDispatcher {
return &MessageDispatcher{messenger: messenger, keeper: keeper}
}
// DispatchMessages sends all messages.
func (d MessageDispatcher) DispatchMessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.CosmosMsg) error {
for _, msg := range msgs {
events, _, err := d.messenger.DispatchMsg(ctx, contractAddr, ibcPort, msg)
if err != nil {
return err
}
// redispatch all events, (type sdk.EventTypeMessage will be filtered out in the handler)
ctx.EventManager().EmitEvents(events)
}
return nil
}
// dispatchMsgWithGasLimit sends a message with gas limit applied
func (d MessageDispatcher) dispatchMsgWithGasLimit(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msg wasmvmtypes.CosmosMsg, gasLimit uint64) (events []sdk.Event, data [][]byte, err error) {
limitedMeter := sdk.NewGasMeter(gasLimit)
subCtx := ctx.WithGasMeter(limitedMeter)
// catch out of gas panic and just charge the entire gas limit
defer func() {
if r := recover(); r != nil {
// if it's not an OutOfGas error, raise it again
if _, ok := r.(sdk.ErrorOutOfGas); !ok {
// log it to get the original stack trace somewhere (as panic(r) keeps message but stacktrace to here
moduleLogger(ctx).Info("SubMsg rethrowing panic: %#v", r)
panic(r)
}
ctx.GasMeter().ConsumeGas(gasLimit, "Sub-Message OutOfGas panic")
err = sdkerrors.Wrap(sdkerrors.ErrOutOfGas, "SubMsg hit gas limit")
}
}()
events, data, err = d.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg)
// make sure we charge the parent what was spent
spent := subCtx.GasMeter().GasConsumed()
ctx.GasMeter().ConsumeGas(spent, "From limited Sub-Message")
return events, data, err
}
// DispatchSubmessages builds a sandbox to execute these messages and returns the execution result to the contract
// that dispatched them, both on success as well as failure
func (d MessageDispatcher) DispatchSubmessages(ctx sdk.Context, contractAddr sdk.AccAddress, ibcPort string, msgs []wasmvmtypes.SubMsg) ([]byte, error) {
var rsp []byte
for _, msg := range msgs {
switch msg.ReplyOn {
case wasmvmtypes.ReplySuccess, wasmvmtypes.ReplyError, wasmvmtypes.ReplyAlways, wasmvmtypes.ReplyNever:
default:
return nil, sdkerrors.Wrap(types.ErrInvalid, "replyOn value")
}
// first, we build a sub-context which we can use inside the submessages
subCtx, commit := ctx.CacheContext()
em := sdk.NewEventManager()
subCtx = subCtx.WithEventManager(em)
// check how much gas left locally, optionally wrap the gas meter
gasRemaining := ctx.GasMeter().Limit() - ctx.GasMeter().GasConsumed()
limitGas := msg.GasLimit != nil && (*msg.GasLimit < gasRemaining)
var err error
var events []sdk.Event
var data [][]byte
if limitGas {
events, data, err = d.dispatchMsgWithGasLimit(subCtx, contractAddr, ibcPort, msg.Msg, *msg.GasLimit)
} else {
events, data, err = d.messenger.DispatchMsg(subCtx, contractAddr, ibcPort, msg.Msg)
}
// if it succeeds, commit state changes from submessage, and pass on events to Event Manager
var filteredEvents []sdk.Event
if err == nil {
commit()
filteredEvents = filterEvents(append(em.Events(), events...))
ctx.EventManager().EmitEvents(filteredEvents)
} // on failure, revert state from sandbox, and ignore events (just skip doing the above)
// we only callback if requested. Short-circuit here the cases we don't want to
if (msg.ReplyOn == wasmvmtypes.ReplySuccess || msg.ReplyOn == wasmvmtypes.ReplyNever) && err != nil {
return nil, err
}
if msg.ReplyOn == wasmvmtypes.ReplyNever || (msg.ReplyOn == wasmvmtypes.ReplyError && err == nil) {
continue
}
// otherwise, we create a SubMsgResult and pass it into the calling contract
var result wasmvmtypes.SubMsgResult
if err == nil {
// just take the first one for now if there are multiple sub-sdk messages
// and safely return nothing if no data
var responseData []byte
if len(data) > 0 {
responseData = data[0]
}
result = wasmvmtypes.SubMsgResult{
Ok: &wasmvmtypes.SubMsgResponse{
Events: sdkEventsToWasmVMEvents(filteredEvents),
Data: responseData,
},
}
} else {
// Issue #759 - we don't return error string for worries of non-determinism
moduleLogger(ctx).Info("Redacting submessage error", "cause", err)
result = wasmvmtypes.SubMsgResult{
Err: redactError(err).Error(),
}
}
// now handle the reply, we use the parent context, and abort on error
reply := wasmvmtypes.Reply{
ID: msg.ID,
Result: result,
}
// we can ignore any result returned as there is nothing to do with the data
// and the events are already in the ctx.EventManager()
rspData, err := d.keeper.reply(ctx, contractAddr, reply)
switch {
case err != nil:
return nil, sdkerrors.Wrap(err, "reply")
case rspData != nil:
rsp = rspData
}
}
return rsp, nil
}
// Issue #759 - we don't return error string for worries of non-determinism
func redactError(err error) error {
// Do not redact system errors
// SystemErrors must be created in x/wasm and we can ensure determinism
if wasmvmtypes.ToSystemError(err) != nil {
return err
}
// FIXME: do we want to hardcode some constant string mappings here as well?
// Or better document them? (SDK error string may change on a patch release to fix wording)
// sdk/11 is out of gas
// sdk/5 is insufficient funds (on bank send)
// (we can theoretically redact less in the future, but this is a first step to safety)
codespace, code, _ := sdkerrors.ABCIInfo(err, false)
return fmt.Errorf("codespace: %s, code: %d", codespace, code)
}
func filterEvents(events []sdk.Event) []sdk.Event {
// pre-allocate space for efficiency
res := make([]sdk.Event, 0, len(events))
for _, ev := range events {
if ev.Type != "message" {
res = append(res, ev)
}
}
return res
}
func sdkEventsToWasmVMEvents(events []sdk.Event) []wasmvmtypes.Event {
res := make([]wasmvmtypes.Event, len(events))
for i, ev := range events {
res[i] = wasmvmtypes.Event{
Type: ev.Type,
Attributes: sdkAttributesToWasmVMAttributes(ev.Attributes),
}
}
return res
}
func sdkAttributesToWasmVMAttributes(attrs []abci.EventAttribute) []wasmvmtypes.EventAttribute {
res := make([]wasmvmtypes.EventAttribute, len(attrs))
for i, attr := range attrs {
res[i] = wasmvmtypes.EventAttribute{
Key: string(attr.Key),
Value: string(attr.Value),
}
}
return res
}