-
Notifications
You must be signed in to change notification settings - Fork 0
/
attach.ts
201 lines (180 loc) · 7.19 KB
/
attach.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import { Packr, UnpackrStream, addExtension, unpack } from "msgpackr";
import { EventEmitter } from "node:events";
import { createLogger, prettyRPCMessage } from "./logger.ts";
import {
MessageType,
type AttachParams,
type BaseEvents,
type EventHandler,
type Nvim,
type RPCMessage,
type RPCNotification,
type RPCRequest,
type RPCResponse,
} from "./types.ts";
const packr = new Packr({ useRecords: false });
const unpackrStream = new UnpackrStream({ useRecords: false });
[0, 1, 2].forEach((type) => {
// https://neovim.io/doc/user/api.html#api-definitions
// decode Buffer, Window, and Tabpage as numbers
// Buffer id: 0 prefix: nvim_buf_
// Window id: 1 prefix: nvim_win_
// Tabpage id: 2 prefix: nvim_tabpage_
addExtension({ type, unpack: (buffer) => unpack(buffer) as number });
});
export async function attach<ApiInfo extends BaseEvents = BaseEvents>({
socket,
client,
logging,
}: AttachParams): Promise<Nvim<ApiInfo>> {
const logger = createLogger(client, logging);
const messageOutQueue: RPCMessage[] = [];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const notificationHandlers = new Map<string, Record<string, EventHandler<any, unknown>>>();
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const requestHandlers = new Map<string, EventHandler<any, unknown>>();
const emitter = new EventEmitter({ captureRejections: true });
let lastReqId = 0;
let handlerId = 0;
const nvimSocket = await Bun.connect({
unix: socket,
socket: {
binaryType: "uint8array",
data(_, data) {
// Sometimes RPC messages are split into multiple socket messages.
// `unpackrStream` handles collecting all socket messages if the RPC message
// is split and decoding it.
unpackrStream.write(data);
},
error(_, error) {
logger?.error("socket error", error);
},
end() {
logger?.debug("connection closed by neovim");
},
close() {
logger?.debug("connection closed by bunvim");
},
},
});
function processMessageOutQueue() {
// All writing to neovim happens through this function.
// Outgoing RPC messages are added to the `messageOutQueue` and sent ASAP
if (!messageOutQueue.length) return;
const message = messageOutQueue.shift();
if (!message) {
logger?.error("Cannot process undefined message");
return;
}
logger?.debug(prettyRPCMessage(message, "out"));
nvimSocket.write(packr.pack(message) as unknown as Uint8Array);
processMessageOutQueue();
}
function runNotificationHandlers(message: RPCNotification) {
// message[1] notification name
// message[2] args
const handlers = notificationHandlers.get(message[1]);
if (!handlers) return;
// eslint-disable-next-line @typescript-eslint/no-misused-promises
Object.entries(handlers).forEach(async ([id, handler]) => {
const result = await handler(message[2]);
// remove notification handler if it returns specifically `true`
// other truthy values won't trigger the removal
// eslint-disable-next-line
if (result === true) delete handlers[id];
});
}
unpackrStream.on("data", (message: RPCMessage) => {
(async () => {
logger?.debug(prettyRPCMessage(message, "in"));
if (message[0] === MessageType.NOTIFY) {
// asynchronously run notification handlers.
// RPCNotifications don't need a response
runNotificationHandlers(message);
}
if (message[0] === MessageType.RESPONSE) {
// message[1] reqId
// message[2] error
// message[3] result
emitter.emit(`response-${message[1]}`, message[2], message[3]);
}
if (message[0] === MessageType.REQUEST) {
// message[1] reqId
// message[2] method name
// message[3] args
const handler = requestHandlers.get(message[2]);
// RPCRequests block neovim until a response is received.
// RPCResponse is added to beginning of queue to be sent ASAP.
if (!handler) {
const notFound: RPCResponse = [
MessageType.RESPONSE,
message[1],
`no handler for method ${message[2]} found`,
null,
];
messageOutQueue.unshift(notFound);
} else {
try {
const result = await handler(message[3]);
const response: RPCResponse = [
MessageType.RESPONSE,
message[1],
null,
result,
];
messageOutQueue.unshift(response);
} catch (err) {
const response: RPCResponse = [
MessageType.RESPONSE,
message[1],
String(err),
null,
];
messageOutQueue.unshift(response);
}
}
}
// Continue processing queue
processMessageOutQueue();
})().catch((err: unknown) => logger?.error("unpackrStream error", err));
});
const call: Nvim["call"] = (func, args) => {
const reqId = ++lastReqId;
const request: RPCRequest = [MessageType.REQUEST, reqId, func as string, args];
return new Promise((resolve, reject) => {
// Register response listener before adding request to queue to avoid
// response coming in before listener was set up.
emitter.once(`response-${reqId}`, (error, result) => {
if (error) reject(error as Error);
resolve(result as unknown);
});
messageOutQueue.push(request);
// Start processing queue if we're not already
processMessageOutQueue();
});
};
await call("nvim_set_client_info", [
client.name,
client.version ?? {},
client.type ?? "msgpack-rpc",
client.methods ?? {},
client.attributes ?? {},
]);
const channelId = (await call("nvim_get_api_info", []))[0] as number;
return {
call,
channelId,
logger: logger,
onNotification(notification, callback) {
const handlers = notificationHandlers.get(notification as string) ?? {};
handlers[++handlerId] = callback;
notificationHandlers.set(notification as string, handlers);
},
onRequest(method, callback) {
requestHandlers.set(method as string, callback);
},
detach() {
nvimSocket.end();
},
};
}