This repository has been archived by the owner on Mar 11, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 10
/
connection.js
196 lines (165 loc) · 5.73 KB
/
connection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
'use strict'
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const withIs = require('class-is')
const assert = require('assert')
const errCode = require('err-code')
/**
* An implementation of the js-libp2p connection.
* Any libp2p transport should use an upgrader to return this connection.
*/
class Connection {
/**
* Creates an instance of Connection.
* @param {object} properties properties of the connection.
* @param {multiaddr} properties.localAddr local multiaddr of the connection.
* @param {multiaddr} properties.remoteAddr remote multiaddr of the connection.
* @param {PeerId} properties.localPeer local peer-id.
* @param {PeerId} properties.remotePeer remote peer-id.
* @param {function} properties.newStream new stream muxer function.
* @param {function} properties.close close raw connection function.
* @param {function} properties.getStreams get streams from muxer function.
* @param {object} properties.stat metadata of the connection.
* @param {string} properties.stat.direction connection establishment direction ("inbound" or "outbound").
* @param {object} properties.stat.timeline connection relevant events timestamp.
* @param {string} properties.stat.timeline.open connection opening timestamp.
* @param {string} properties.stat.timeline.upgraded connection upgraded timestamp.
* @param {string} [properties.stat.multiplexer] connection multiplexing identifier.
* @param {string} [properties.stat.encryption] connection encryption method identifier.
*/
constructor ({ localAddr, remoteAddr, localPeer, remotePeer, newStream, close, getStreams, stat }) {
assert(multiaddr.isMultiaddr(localAddr), 'localAddr must be an instance of multiaddr')
assert(multiaddr.isMultiaddr(remoteAddr), 'remoteAddr must be an instance of multiaddr')
assert(PeerId.isPeerId(localPeer), 'localPeer must be an instance of peer-id')
assert(PeerId.isPeerId(remotePeer), 'remotePeer must be an instance of peer-id')
assert(typeof newStream === 'function', 'new stream must be a function')
assert(typeof close === 'function', 'close must be a function')
assert(typeof getStreams === 'function', 'getStreams must be a function')
assert(stat, 'connection metadata object must be provided')
assert(stat.direction === 'inbound' || stat.direction === 'outbound', 'direction must be "inbound" or "outbound"')
assert(stat.timeline, 'connection timeline object must be provided in the stat object')
assert(stat.timeline.open, 'connection open timestamp must be provided')
assert(stat.timeline.upgraded, 'connection upgraded timestamp must be provided')
/**
* Connection identifier.
*/
this.id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
/**
* Observed multiaddr of the local peer
*/
this.localAddr = localAddr
/**
* Observed multiaddr of the remote peer
*/
this.remoteAddr = remoteAddr
/**
* Local peer id.
*/
this.localPeer = localPeer
/**
* Remote peer id.
*/
this.remotePeer = remotePeer
/**
* Connection metadata.
*/
this._stat = {
...stat,
status: 'open'
}
/**
* Reference to the new stream function of the multiplexer
*/
this._newStream = newStream
/**
* Reference to the close function of the raw connection
*/
this._close = close
/**
* Reference to the getStreams function of the muxer
*/
this._getStreams = getStreams
/**
* Connection streams registry
*/
this.registry = new Map()
/**
* User provided tags
*/
this.tags = []
}
/**
* Get connection metadata
* @return {Object}
*/
get stat () {
return this._stat
}
/**
* Get all the streams of the muxer.
* @return {Array<*>}
*/
get streams () {
return this._getStreams()
}
/**
* Create a new stream from this connection
* @param {string[]} protocols intended protocol for the stream
* @return {Promise<object>} with muxed+multistream-selected stream and selected protocol
*/
async newStream (protocols) {
if (this.stat.status === 'closing') {
throw errCode(new Error('the connection is being closed'), 'ERR_CONNECTION_BEING_CLOSED')
}
if (this.stat.status === 'closed') {
throw errCode(new Error('the connection is closed'), 'ERR_CONNECTION_CLOSED')
}
if (!Array.isArray(protocols)) protocols = [protocols]
const { stream, protocol } = await this._newStream(protocols)
this.addStream(stream, protocol)
return {
stream,
protocol
}
}
/**
* Add a stream when it is opened to the registry.
* @param {*} muxedStream a muxed stream
* @param {object} properties the stream properties to be registered
* @param {string} properties.protocol the protocol used by the stream
* @param {object} properties.metadata metadata of the stream
* @return {void}
*/
addStream (muxedStream, { protocol, metadata = {} }) {
// Add metadata for the stream
this.registry.set(muxedStream.id, {
protocol,
...metadata
})
}
/**
* Remove stream registry after it is closed.
* @param {string} id identifier of the stream
*/
removeStream (id) {
this.registry.delete(id)
}
/**
* Close the connection.
* @return {Promise}
*/
async close () {
if (this.stat.status === 'closed') {
return
}
if (this._closing) {
return this._closing
}
this.stat.status = 'closing'
// Close raw connection
this._closing = await this._close()
this._stat.timeline.close = Date.now()
this.stat.status = 'closed'
}
}
module.exports = withIs(Connection, { className: 'Connection', symbolName: '@libp2p/interface-connection/connection' })