From c3b1aa46aa415ee675f5182b58c230a4d8d1e7c0 Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Fri, 8 Jul 2022 12:28:25 +0530 Subject: [PATCH] WebRTC transport implementation --- go.mod | 29 +- go.sum | 85 +++- p2p/transport/webrtc/connection.go | 250 +++++++++++ p2p/transport/webrtc/datachannel.go | 340 +++++++++++++++ p2p/transport/webrtc/deadline.go | 77 ++++ p2p/transport/webrtc/deadline_test.go | 56 +++ p2p/transport/webrtc/errors.go | 49 +++ p2p/transport/webrtc/fetch_ip_linux_test.go | 10 + p2p/transport/webrtc/fetch_ip_test.go | 40 ++ p2p/transport/webrtc/listener.go | 309 ++++++++++++++ p2p/transport/webrtc/pb/Makefile | 9 + p2p/transport/webrtc/pb/message.pb.go | 413 +++++++++++++++++++ p2p/transport/webrtc/pb/message.proto | 20 + p2p/transport/webrtc/sdp.go | 110 +++++ p2p/transport/webrtc/transport.go | 432 ++++++++++++++++++++ p2p/transport/webrtc/transport_test.go | 274 +++++++++++++ p2p/transport/webrtc/udp_mux.go | 351 ++++++++++++++++ p2p/transport/webrtc/udp_mux_conn.go | 250 +++++++++++ p2p/transport/webrtc/udp_mux_test.go | 60 +++ p2p/transport/webrtc/util.go | 59 +++ p2p/transport/webrtc/util_test.go | 72 ++++ 21 files changed, 3282 insertions(+), 13 deletions(-) create mode 100644 p2p/transport/webrtc/connection.go create mode 100644 p2p/transport/webrtc/datachannel.go create mode 100644 p2p/transport/webrtc/deadline.go create mode 100644 p2p/transport/webrtc/deadline_test.go create mode 100644 p2p/transport/webrtc/errors.go create mode 100644 p2p/transport/webrtc/fetch_ip_linux_test.go create mode 100644 p2p/transport/webrtc/fetch_ip_test.go create mode 100644 p2p/transport/webrtc/listener.go create mode 100644 p2p/transport/webrtc/pb/Makefile create mode 100644 p2p/transport/webrtc/pb/message.pb.go create mode 100644 p2p/transport/webrtc/pb/message.proto create mode 100644 p2p/transport/webrtc/sdp.go create mode 100644 p2p/transport/webrtc/transport.go create mode 100644 p2p/transport/webrtc/transport_test.go create mode 100644 p2p/transport/webrtc/udp_mux.go create mode 100644 p2p/transport/webrtc/udp_mux_conn.go create mode 100644 p2p/transport/webrtc/udp_mux_test.go create mode 100644 p2p/transport/webrtc/util.go create mode 100644 p2p/transport/webrtc/util_test.go diff --git a/go.mod b/go.mod index 016d915280..7ce563b7ef 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 github.com/google/gopacket v1.1.19 + github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/golang-lru v0.5.4 github.com/ipfs/go-cid v0.3.2 @@ -47,6 +48,13 @@ require ( github.com/multiformats/go-multistream v0.3.3 github.com/multiformats/go-varint v0.0.6 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 + github.com/pion/datachannel v1.5.2 + github.com/pion/dtls/v2 v2.1.5 + github.com/pion/ice/v2 v2.2.6 + github.com/pion/logging v0.2.2 + github.com/pion/stun v0.3.5 + github.com/pion/transport v0.13.1 + github.com/pion/webrtc/v3 v3.1.43 github.com/prometheus/client_golang v1.13.0 github.com/raulk/go-watchdog v1.3.0 github.com/stretchr/testify v1.8.0 @@ -61,13 +69,12 @@ require ( require ( github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/containerd/cgroups v1.0.4 // indirect github.com/coreos/go-systemd/v22 v22.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgraph-io/badger v1.6.2 // indirect - github.com/dgraph-io/ristretto v0.0.2 // indirect + github.com/dgraph-io/ristretto v0.1.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect @@ -75,15 +82,16 @@ require ( github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect + github.com/golang/glog v1.0.0 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/huin/goupnp v1.0.3 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/goprocess v0.1.4 // indirect github.com/klauspost/cpuid/v2 v2.1.1 // indirect github.com/koron/go-ssdp v0.0.3 // indirect + github.com/kr/pretty v0.3.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/marten-seemann/qpack v0.3.0 // indirect github.com/marten-seemann/qtls-go1-18 v0.1.3 // indirect @@ -94,9 +102,18 @@ require ( github.com/miekg/dns v1.1.50 // indirect github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect github.com/multiformats/go-base36 v0.1.0 // indirect - github.com/onsi/ginkgo v1.16.5 // indirect github.com/onsi/ginkgo/v2 v2.2.0 // indirect github.com/opencontainers/runtime-spec v1.0.2 // indirect + github.com/pion/interceptor v0.1.12 // indirect + github.com/pion/mdns v0.0.5 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.10 // indirect + github.com/pion/rtp v1.7.13 // indirect + github.com/pion/sctp v1.8.2 // indirect + github.com/pion/sdp/v3 v3.0.6 // indirect + github.com/pion/srtp/v2 v2.0.10 // indirect + github.com/pion/turn/v2 v2.0.8 // indirect + github.com/pion/udp v0.1.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect @@ -104,7 +121,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect - github.com/syndtr/goleveldb v1.0.0 // indirect + github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/dig v1.15.0 // indirect go.uber.org/multierr v1.8.0 // indirect diff --git a/go.sum b/go.sum index 7e93ae7a51..46818d0964 100644 --- a/go.sum +++ b/go.sum @@ -42,7 +42,6 @@ github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 h1:cTp8I5+VIo github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -61,7 +60,6 @@ github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6r github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= @@ -85,6 +83,7 @@ github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSV github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -95,8 +94,9 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= github.com/dgraph-io/badger v1.6.2 h1:mNw0qs90GVgGGWylh0umH5iag1j6n/PeJtNvL6KY/x8= github.com/dgraph-io/badger v1.6.2/go.mod h1:JW2yswe3V058sS0kZ2h/AXeDSqFjxnZcRrVH//y2UQE= -github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= +github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -146,6 +146,8 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= +github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= @@ -177,8 +179,9 @@ github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -286,12 +289,14 @@ github.com/koron/go-ssdp v0.0.3/go.mod h1:b2MxI6yh02pKrsyNoQUsk4+YNikaGhe4894J+Q github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c= @@ -401,6 +406,8 @@ github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.2.0 h1:3ZNA3L1c5FYDFTTxbFeVGGD8jYvjYauHD30YgLxVsNI= @@ -408,6 +415,7 @@ github.com/onsi/ginkgo/v2 v2.2.0/go.mod h1:MEH45j8TBi6u9BMogfbp0stKC5cdGjumZj5Y7 github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/onsi/gomega v1.20.1 h1:PA/3qinGoukvymdIDV8pii6tiZgC8kbmJO6Z5+b002Q= github.com/opencontainers/runtime-spec v1.0.2 h1:UfAcuLBJB9Coz72x1hgl8O5RVzTdNiaglX6v2DM6FI0= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= @@ -415,6 +423,48 @@ github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTm github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/pion/datachannel v1.5.2 h1:piB93s8LGmbECrpO84DnkIVWasRMk3IimbcXkTQLE6E= +github.com/pion/datachannel v1.5.2/go.mod h1:FTGQWaHrdCwIJ1rw6xBIfZVkslikjShim5yr05XFuCQ= +github.com/pion/dtls/v2 v2.1.3/go.mod h1:o6+WvyLDAlXF7YiPB/RlskRoeK+/JtuaZa5emwQcWus= +github.com/pion/dtls/v2 v2.1.5 h1:jlh2vtIyUBShchoTDqpCCqiYCyRFJ/lvf/gQ8TALs+c= +github.com/pion/dtls/v2 v2.1.5/go.mod h1:BqCE7xPZbPSubGasRoDFJeTsyJtdD1FanJYL0JGheqY= +github.com/pion/ice/v2 v2.2.6 h1:R/vaLlI1J2gCx141L5PEwtuGAGcyS6e7E0hDeJFq5Ig= +github.com/pion/ice/v2 v2.2.6/go.mod h1:SWuHiOGP17lGromHTFadUe1EuPgFh/oCU6FCMZHooVE= +github.com/pion/interceptor v0.1.11/go.mod h1:tbtKjZY14awXd7Bq0mmWvgtHB5MDaRN7HV3OZ/uy7s8= +github.com/pion/interceptor v0.1.12 h1:CslaNriCFUItiXS5o+hh5lpL0t0ytQkFnUcbbCs2Zq8= +github.com/pion/interceptor v0.1.12/go.mod h1:bDtgAD9dRkBZpWHGKaoKb42FhDHTG2rX8Ii9LRALLVA= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns v0.0.5 h1:Q2oj/JB3NqfzY9xGZ1fPzZzK7sDSD8rZPOvcIQ10BCw= +github.com/pion/mdns v0.0.5/go.mod h1:UgssrvdD3mxpi8tMxAXbsppL3vJ4Jipw1mTCW+al01g= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.9/go.mod h1:qVPhiCzAm4D/rxb6XzKeyZiQK69yJpbUDJSF7TgrqNo= +github.com/pion/rtcp v1.2.10 h1:nkr3uj+8Sp97zyItdN60tE/S6vk4al5CPRR6Gejsdjc= +github.com/pion/rtcp v1.2.10/go.mod h1:ztfEwXZNLGyF1oQDttz/ZKIBaeeg/oWbRYqzBM9TL1I= +github.com/pion/rtp v1.7.13 h1:qcHwlmtiI50t1XivvoawdCGTP4Uiypzfrsap+bijcoA= +github.com/pion/rtp v1.7.13/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/sctp v1.8.0/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= +github.com/pion/sctp v1.8.2 h1:yBBCIrUMJ4yFICL3RIvR4eh/H2BTTvlligmSTy+3kiA= +github.com/pion/sctp v1.8.2/go.mod h1:xFe9cLMZ5Vj6eOzpyiKjT9SwGM4KpK/8Jbw5//jc+0s= +github.com/pion/sdp/v3 v3.0.5/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= +github.com/pion/sdp/v3 v3.0.6 h1:WuDLhtuFUUVpTfus9ILC4HRyHsW6TdugjEX/QY9OiUw= +github.com/pion/sdp/v3 v3.0.6/go.mod h1:iiFWFpQO8Fy3S5ldclBkpXqmWy02ns78NOKoLLL0YQw= +github.com/pion/srtp/v2 v2.0.10 h1:b8ZvEuI+mrL8hbr/f1YiJFB34UMrOac3R3N1yq2UN0w= +github.com/pion/srtp/v2 v2.0.10/go.mod h1:XEeSWaK9PfuMs7zxXyiN252AHPbH12NX5q/CFDWtUuA= +github.com/pion/stun v0.3.5 h1:uLUCBCkQby4S1cf6CGuR9QrVOKcvUwFeemaC865QHDg= +github.com/pion/stun v0.3.5/go.mod h1:gDMim+47EeEtfWogA37n6qXZS88L5V6LqFcf+DZA2UA= +github.com/pion/transport v0.12.2/go.mod h1:N3+vZQD9HlDP5GWkZ85LohxNsDcNgofQmyL6ojX5d8Q= +github.com/pion/transport v0.12.3/go.mod h1:OViWW9SP2peE/HbwBvARicmAVnesphkNkCVZIWJ6q9A= +github.com/pion/transport v0.13.0/go.mod h1:yxm9uXpK9bpBBWkITk13cLo1y5/ur5VQpG22ny6EP7g= +github.com/pion/transport v0.13.1 h1:/UH5yLeQtwm2VZIPjxwnNFxjS4DFhyLfS4GlfuKUzfA= +github.com/pion/transport v0.13.1/go.mod h1:EBxbqzyv+ZrmDb82XswEE0BjfQFtuw1Nu6sjnjWCsGg= +github.com/pion/turn/v2 v2.0.8 h1:KEstL92OUN3k5k8qxsXHpr7WWfrdp7iJZHx99ud8muw= +github.com/pion/turn/v2 v2.0.8/go.mod h1:+y7xl719J8bAEVpSXBXvTxStjJv3hbz9YFflvkpcGPw= +github.com/pion/udp v0.1.1 h1:8UAPvyqmsxK8oOjloDk4wUt63TzFe9WEJkg5lChlj7o= +github.com/pion/udp v0.1.1/go.mod h1:6AFo+CMdKQm7UiA0eUPA8/eVCTx8jBIITLZHc9DWX5M= +github.com/pion/webrtc/v3 v3.1.43 h1:YT3ZTO94UT4kSBvZnRAH82+0jJPUruiKr9CEstdlQzk= +github.com/pion/webrtc/v3 v3.1.43/go.mod h1:G/J8k0+grVsjC/rjCZ24AKoCCxcFFODgh7zThNZGs0M= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -452,8 +502,11 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= +github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/shurcooL/component v0.0.0-20170202220835-f88ec8f54cc4/go.mod h1:XhFIlyj5a1fBNx5aJTbKoIq0mNaPvOagO+HjB3EtxrY= github.com/shurcooL/events v0.0.0-20181021180414-410e4ca65f48/go.mod h1:5u70Mqkb5O5cxEA8nxTsgrgLehJeAw6Oc4Ab1c/P1HM= @@ -507,8 +560,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= @@ -562,6 +616,9 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220516162934-403b01795ae8/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM= golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -634,17 +691,26 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201201195509-5d6afe98e0b7/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211201190559-0a0e4e1bb54c/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220401154927-543a649e0bdd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220531201128-c960675eff93/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220630215102-69896b714898/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220920183852-bf014ff85ad5 h1:KafLifaRFIuSJ5C+7CyFJOF9haxKNC1CEIDk8GX6X0k= golang.org/x/net v0.0.0-20220920183852-bf014ff85ad5/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -710,11 +776,13 @@ golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -730,6 +798,9 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220608164250-635b8c9b7f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.1-0.20221102194838-fc697a31fa06 h1:E1pm64FqQa4v8dHd/bAneyMkR4hk8LTJhoSlc5mc1cM= diff --git a/p2p/transport/webrtc/connection.go b/p2p/transport/webrtc/connection.go new file mode 100644 index 0000000000..bda2795eb4 --- /dev/null +++ b/p2p/transport/webrtc/connection.go @@ -0,0 +1,250 @@ +package libp2pwebrtc + +import ( + "context" + "os" + "sync" + + ic "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + tpt "github.com/libp2p/go-libp2p/core/transport" + ma "github.com/multiformats/go-multiaddr" + "github.com/pion/webrtc/v3" +) + +var _ tpt.CapableConn = &connection{} + +type connection struct { + pc *webrtc.PeerConnection + transport *WebRTCTransport + scope network.ConnManagementScope + + localPeer peer.ID + privKey ic.PrivKey + localMultiaddr ma.Multiaddr + + remotePeer peer.ID + remoteKey ic.PubKey + remoteMultiaddr ma.Multiaddr + + streams map[uint16]*dataChannel + + accept chan network.MuxedStream + + ctx context.Context + cancel context.CancelFunc + m sync.Mutex +} + +func newConnection( + pc *webrtc.PeerConnection, + transport *WebRTCTransport, + scope network.ConnManagementScope, + + localPeer peer.ID, + privKey ic.PrivKey, + localMultiaddr ma.Multiaddr, + + remotePeer peer.ID, + remoteKey ic.PubKey, + remoteMultiaddr ma.Multiaddr, +) *connection { + accept := make(chan network.MuxedStream, 10) + + ctx, cancel := context.WithCancel(context.Background()) + + conn := &connection{ + pc: pc, + transport: transport, + scope: scope, + + localPeer: localPeer, + privKey: privKey, + localMultiaddr: localMultiaddr, + + remotePeer: remotePeer, + remoteKey: remoteKey, + remoteMultiaddr: remoteMultiaddr, + ctx: ctx, + cancel: cancel, + streams: make(map[uint16]*dataChannel), + + accept: accept, + } + + pc.OnDataChannel(func(dc *webrtc.DataChannel) { + log.Debugf("[%s] incoming datachannel: %s", localPeer, dc.Label()) + id := *dc.ID() + var stream *dataChannel + dc.OnOpen(func() { + // datachannel cannot be detached before opening + rwc, err := dc.Detach() + if err != nil { + log.Errorf("[%s] could not detch channel: %s", localPeer, dc.Label()) + return + } + stream = newDataChannel(dc, rwc, pc, nil, nil) + conn.addStream(id, stream) + accept <- stream + }) + + dc.OnClose(func() { + stream.remoteClosed() + conn.removeStream(id) + }) + }) + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + if state == webrtc.PeerConnectionStateClosed || state == webrtc.PeerConnectionStateDisconnected { + conn.Close() + } + }) + + return conn +} + +// ConnState implements transport.CapableConn +func (c *connection) ConnState() network.ConnectionState { + return network.ConnectionState{} +} + +// Implement network.MuxedConn + +func (c *connection) Close() error { + if c.IsClosed() { + return nil + } + + c.scope.Done() + // cleanup routine + for _, stream := range c.streams { + _ = stream.Close() + } + c.cancel() + _ = c.pc.Close() + return nil +} + +func (c *connection) IsClosed() bool { + select { + case <-c.ctx.Done(): + return true + default: + } + return false +} + +func (c *connection) OpenStream(ctx context.Context) (network.MuxedStream, error) { + if c.IsClosed() { + return nil, os.ErrClosed + } + + result := make(chan struct { + network.MuxedStream + error + }) + dc, err := c.pc.CreateDataChannel("", nil) + if err != nil { + return nil, err + } + + streamID := *dc.ID() + var stream *dataChannel + dc.OnOpen(func() { + rwc, err := dc.Detach() + if err != nil { + result <- struct { + network.MuxedStream + error + }{nil, + errDatachannel("could not detach", err), + } + return + } + stream = newDataChannel(dc, rwc, c.pc, nil, nil) + c.addStream(streamID, stream) + result <- struct { + network.MuxedStream + error + }{stream, err} + }) + + dc.OnClose(func() { + stream.remoteClosed() + c.removeStream(streamID) + }) + + select { + case <-ctx.Done(): + _ = dc.Close() + return nil, ctx.Err() + case r := <-result: + return r.MuxedStream, r.error + } +} + +func (c *connection) AcceptStream() (network.MuxedStream, error) { + select { + case <-c.ctx.Done(): + return nil, os.ErrClosed + case stream := <-c.accept: + return stream, nil + } +} + +// implement network.ConnSecurity +func (c *connection) LocalPeer() peer.ID { + return c.localPeer +} + +// only used during setup +func (c *connection) setRemotePeer(id peer.ID) { + c.remotePeer = id +} + +func (c *connection) LocalPrivateKey() ic.PrivKey { + return c.privKey +} + +func (c *connection) RemotePeer() peer.ID { + return c.remotePeer +} + +func (c *connection) RemotePublicKey() ic.PubKey { + return c.remoteKey +} + +func (c *connection) setRemotePublicKey(key ic.PubKey) { + c.remoteKey = key +} + +// implement network.ConnMultiaddrs +func (c *connection) LocalMultiaddr() ma.Multiaddr { + return c.localMultiaddr +} + +func (c *connection) RemoteMultiaddr() ma.Multiaddr { + return c.remoteMultiaddr +} + +// implement network.ConnScoper +func (c *connection) Scope() network.ConnScope { + return c.scope +} + +func (c *connection) Transport() tpt.Transport { + return c.transport +} + +func (c *connection) addStream(id uint16, stream *dataChannel) { + c.m.Lock() + defer c.m.Unlock() + c.streams[id] = stream +} + +func (c *connection) removeStream(id uint16) { + c.m.Lock() + defer c.m.Unlock() + delete(c.streams, id) +} diff --git a/p2p/transport/webrtc/datachannel.go b/p2p/transport/webrtc/datachannel.go new file mode 100644 index 0000000000..aa5d9746d1 --- /dev/null +++ b/p2p/transport/webrtc/datachannel.go @@ -0,0 +1,340 @@ +package libp2pwebrtc + +import ( + "bytes" + "context" + "io" + "os" + + "net" + + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-msgio/protoio" + "github.com/pion/datachannel" + "github.com/pion/webrtc/v3" + + pb "github.com/libp2p/go-libp2p/p2p/transport/webrtc/pb" +) + +var _ network.MuxedStream = &dataChannel{} + +const ( + // maxMessageSize is limited to 16384 bytes in the SDP. + maxMessageSize uint64 = 16384 + // Max message size limit in the SDP is limited to 16384 bytes. + // We keep a maximum of 2 messages in the buffer + maxBufferedAmount uint64 = 2 * maxMessageSize + // bufferedAmountLowThreshold and maxBufferedAmount are bound + // to a stream but congestion control is done on the whole + // SCTP association. This means that a single stream can monopolize + // the complete congestion control window (cwnd) if it does not + // read stream data and it's remote continues to send. We can + // add messages to the send buffer once there is space for 1 full + // sized message. + bufferedAmountLowThreshold uint64 = 16384 + + protoOverhead int = 5 + varintOverhead int = 2 +) + +const ( + stateOpen uint32 = iota + stateReadClosed + stateWriteClosed + stateClosed +) + +// Package pion detached data channel into a net.Conn +// and then a network.MuxedStream +type dataChannel struct { + channel *webrtc.DataChannel + rwc datachannel.ReadWriteCloser + laddr net.Addr + raddr net.Addr + readDeadline *deadline + writeDeadline *deadline + + closeWriteOnce sync.Once + closeReadOnce sync.Once + resetOnce sync.Once + + state uint32 + + ctx context.Context + cancel context.CancelFunc + m sync.Mutex + readBuf bytes.Buffer + writeAvailable chan struct{} + reader protoio.Reader + writer protoio.Writer +} + +func newDataChannel( + channel *webrtc.DataChannel, + rwc datachannel.ReadWriteCloser, + pc *webrtc.PeerConnection, + laddr, raddr net.Addr) *dataChannel { + ctx, cancel := context.WithCancel(context.Background()) + + result := &dataChannel{ + channel: channel, + rwc: rwc, + laddr: laddr, + raddr: raddr, + readDeadline: newDeadline(), + writeDeadline: newDeadline(), + ctx: ctx, + cancel: cancel, + writeAvailable: make(chan struct{}), + reader: protoio.NewDelimitedReader(rwc, 16384), + writer: protoio.NewDelimitedWriter(rwc), + } + + channel.SetBufferedAmountLowThreshold(bufferedAmountLowThreshold) + channel.OnBufferedAmountLow(func() { + result.writeAvailable <- struct{}{} + }) + + return result +} + +func (d *dataChannel) processControlMessage(msg pb.Message) { + d.m.Lock() + defer d.m.Unlock() + if d.state == stateClosed { + return + } + if msg.Flag == nil { + return + } + switch msg.GetFlag() { + case pb.Message_FIN: + if d.state == stateWriteClosed { + d.Close() + return + } + d.state = stateReadClosed + case pb.Message_STOP_SENDING: + if d.state == stateReadClosed { + d.Close() + return + } + d.state = stateWriteClosed + case pb.Message_RESET: + d.channel.Close() + } +} + +func (d *dataChannel) Read(b []byte) (int, error) { + for { + select { + case <-d.readDeadline.wait(): + return 0, os.ErrDeadlineExceeded + default: + } + + d.m.Lock() + read, err := d.readBuf.Read(b) + d.m.Unlock() + if state := d.getState(); err == io.EOF && (state == stateReadClosed || state == stateClosed) { + return read, io.EOF + } + if read > 0 { + return read, nil + } + + // read until data message + var msg pb.Message + signal := make(chan struct { + error + }) + + // read in a separate goroutine to enable read deadlines + go func() { + err = d.reader.ReadMsg(&msg) + if err != nil { + if err != io.EOF { + log.Warnf("error reading from datachannel: %v", err) + } + signal <- struct { + error + }{err} + return + } + + if state := d.getState(); state != stateClosed && state != stateReadClosed && msg.Message != nil { + d.m.Lock() + d.readBuf.Write(msg.Message) + d.m.Unlock() + } + d.processControlMessage(msg) + + signal <- struct{ error }{nil} + + }() + select { + case sig := <-signal: + if sig.error != nil { + return 0, sig.error + } + case <-d.readDeadline.wait(): + return 0, os.ErrDeadlineExceeded + } + + } +} + +func (d *dataChannel) Write(b []byte) (int, error) { + if s := d.getState(); s == stateWriteClosed || s == stateClosed { + return 0, io.ErrClosedPipe + } + + var err error + var ( + start int = 0 + end = len(b) + chunkSize = int(maxMessageSize) - protoOverhead - varintOverhead + n = 0 + ) + + for start < len(b) { + end = len(b) + if start+chunkSize < end { + end = start + chunkSize + } + chunk := b[start:end] + n, err = d.partialWrite(chunk) + if err != nil { + break + } + start += n + } + return start, err +} + +func (d *dataChannel) partialWrite(b []byte) (int, error) { + if s := d.getState(); s == stateWriteClosed || s == stateClosed { + return 0, io.ErrClosedPipe + } + select { + case <-d.writeDeadline.wait(): + return 0, os.ErrDeadlineExceeded + default: + } + msg := &pb.Message{Message: b} + if d.channel.BufferedAmount()+uint64(len(b))+uint64(varintOverhead) > maxBufferedAmount { + select { + case <-d.writeAvailable: + case <-d.writeDeadline.wait(): + return 0, os.ErrDeadlineExceeded + } + } + return d.writeMessage(msg) +} + +func (d *dataChannel) writeMessage(msg *pb.Message) (int, error) { + err := d.writer.WriteMsg(msg) + return len(msg.GetMessage()), err + +} + +func (d *dataChannel) Close() error { + select { + case <-d.ctx.Done(): + return nil + default: + } + + d.m.Lock() + d.state = stateClosed + d.m.Unlock() + + d.cancel() + d.CloseWrite() + _ = d.channel.Close() + return nil +} + +func (d *dataChannel) CloseRead() error { + var err error + d.closeReadOnce.Do(func() { + d.m.Lock() + if d.state != stateClosed { + d.state = stateReadClosed + } + d.m.Unlock() + msg := &pb.Message{ + Flag: pb.Message_STOP_SENDING.Enum(), + } + _, err = d.writeMessage(msg) + }) + return err + +} + +func (d *dataChannel) remoteClosed() { + d.m.Lock() + defer d.m.Unlock() + d.state = stateClosed + d.cancel() + +} + +func (d *dataChannel) CloseWrite() error { + var err error + d.closeWriteOnce.Do(func() { + d.m.Lock() + if d.state != stateClosed { + d.state = stateWriteClosed + } + d.m.Unlock() + msg := &pb.Message{ + Flag: pb.Message_FIN.Enum(), + } + _, err = d.writeMessage(msg) + }) + return err +} + +func (d *dataChannel) LocalAddr() net.Addr { + return d.laddr +} + +func (d *dataChannel) RemoteAddr() net.Addr { + return d.raddr +} + +func (d *dataChannel) Reset() error { + var err error + d.resetOnce.Do(func() { + msg := &pb.Message{Flag: pb.Message_RESET.Enum()} + _, err = d.writeMessage(msg) + d.Close() + }) + return err +} + +func (d *dataChannel) SetDeadline(t time.Time) error { + d.SetReadDeadline(t) + d.SetWriteDeadline(t) + return nil +} + +func (d *dataChannel) SetReadDeadline(t time.Time) error { + d.readDeadline.set(t) + return nil +} + +func (d *dataChannel) SetWriteDeadline(t time.Time) error { + d.writeDeadline.set(t) + return nil +} + +func (d *dataChannel) getState() uint32 { + d.m.Lock() + defer d.m.Unlock() + return d.state +} diff --git a/p2p/transport/webrtc/deadline.go b/p2p/transport/webrtc/deadline.go new file mode 100644 index 0000000000..171850de3b --- /dev/null +++ b/p2p/transport/webrtc/deadline.go @@ -0,0 +1,77 @@ +package libp2pwebrtc + +import ( + "sync" + "time" +) + +type deadline struct { + m sync.Mutex + timer *time.Timer + closeChan chan struct{} +} + +func newDeadline() *deadline { + return &deadline{ + timer: nil, + closeChan: make(chan struct{}), + } +} + +func (d *deadline) set(t time.Time) { + d.m.Lock() + defer d.m.Unlock() + // if an existing timer is set, stop the timer + // from firing and drain the channel + if d.timer != nil { + if !d.timer.Stop() { + <-d.closeChan + } + } + + // remove reference to existing stopped timer + // so it can be GC'd + d.timer = nil + + closed := isClosed(d.closeChan) + + // no deadline + if t.IsZero() { + if closed { + d.closeChan = make(chan struct{}) + } + return + } + + if duration := time.Until(t); duration < 0 { + if !closed { + close(d.closeChan) + } + } else { + if closed { + d.closeChan = make(chan struct{}) + } + d.timer = time.AfterFunc(duration, func() { + // check to ensure channel is not closed + // twice + select { + case <-d.closeChan: + default: + close(d.closeChan) + } + }) + } +} + +func (d *deadline) wait() <-chan struct{} { + return d.closeChan +} + +func isClosed(c <-chan struct{}) bool { + select { + case <-c: + return true + default: + } + return false +} diff --git a/p2p/transport/webrtc/deadline_test.go b/p2p/transport/webrtc/deadline_test.go new file mode 100644 index 0000000000..82f2fd7fec --- /dev/null +++ b/p2p/transport/webrtc/deadline_test.go @@ -0,0 +1,56 @@ +package libp2pwebrtc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestDeadlineExtend(t *testing.T) { + dl := newDeadline() + start := time.Now() + dl.set(start.Add(500 * time.Millisecond)) + done := make(chan struct{}) + go func() { + <-dl.wait() + end := time.Now() + d := end.Sub(start) + require.GreaterOrEqual(t, d, 900*time.Millisecond) + require.LessOrEqual(t, d, 1100*time.Millisecond) + close(done) + }() + timer := time.AfterFunc(300*time.Millisecond, func() { + dl.set(start.Add(1000 * time.Millisecond)) + }) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal() + } + timer.Stop() +} + +func TestDeadlineSetToPast(t *testing.T) { + dl := newDeadline() + start := time.Now() + done := make(chan struct{}) + go func() { + <-dl.wait() + end := time.Now() + d := end.Sub(start) + require.GreaterOrEqual(t, d, 200*time.Millisecond) + require.LessOrEqual(t, d, 500*time.Millisecond) + close(done) + }() + timer := time.AfterFunc(300*time.Millisecond, func() { + dl.set(start.Add(200 * time.Millisecond)) + }) + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal() + } + timer.Stop() +} diff --git a/p2p/transport/webrtc/errors.go b/p2p/transport/webrtc/errors.go new file mode 100644 index 0000000000..55bcbb4d35 --- /dev/null +++ b/p2p/transport/webrtc/errors.go @@ -0,0 +1,49 @@ +package libp2pwebrtc + +import ( + "fmt" +) + +type errKind string + +const ( + errKindConnectionFailed errKind = "peerconnection failed" + errKindDatachannel errKind = "datachannel" + errKindMultiaddr errKind = "bad multiaddr" + errKindNoise errKind = "noise" + errKindInternal errKind = "internal" +) + +var ( + errDataChannelTimeout = errDatachannel("timed out waiting for datachannel", nil) +) + +type webRTCTransportError struct { + kind errKind + message string + nested error +} + +func (e *webRTCTransportError) Error() string { + return fmt.Sprintf("[webrtc-transport-error] %s : %s : %v", e.kind, e.message, e.nested) +} + +func errConnectionFailed(msg string, err error) error { + return &webRTCTransportError{kind: errKindConnectionFailed, message: msg, nested: err} +} + +func errDatachannel(msg string, err error) error { + return &webRTCTransportError{kind: errKindDatachannel, message: msg, nested: err} +} + +func errMultiaddr(msg string, err error) error { + return &webRTCTransportError{kind: errKindMultiaddr, message: msg, nested: err} +} + +func errNoise(msg string, err error) error { + return &webRTCTransportError{kind: errKindNoise, message: msg, nested: err} +} + +func errInternal(msg string, err error) error { + return &webRTCTransportError{kind: errKindInternal, message: msg, nested: err} +} diff --git a/p2p/transport/webrtc/fetch_ip_linux_test.go b/p2p/transport/webrtc/fetch_ip_linux_test.go new file mode 100644 index 0000000000..8bfe033e13 --- /dev/null +++ b/p2p/transport/webrtc/fetch_ip_linux_test.go @@ -0,0 +1,10 @@ +//go:build linux +// +build linux + +package libp2pwebrtc + +import "net" + +func getListenerAndDialerIP() (net.IP, net.IP) { + return net.IPv4(0, 0, 0, 0), net.IPv4(127, 0, 0, 1) +} diff --git a/p2p/transport/webrtc/fetch_ip_test.go b/p2p/transport/webrtc/fetch_ip_test.go new file mode 100644 index 0000000000..62dde5f434 --- /dev/null +++ b/p2p/transport/webrtc/fetch_ip_test.go @@ -0,0 +1,40 @@ +//go:build !linux +// +build !linux + +package libp2pwebrtc + +import "net" + +// non-linux builds need to bind to a non-loopback interface +// to accept incoming connections. 0.0.0.0 does not work since +// Pion will bind to a local interface which is not loopback +// and there may not be a route from, say 192.168.0.0/16 to 0.0.0.0. + +func getListenerAndDialerIP() (listenerIp net.IP, dialerIp net.IP) { + listenerIp = net.IPv4(0, 0, 0, 0) + dialerIp = net.IPv4(0, 0, 0, 0) + ifaces, err := net.Interfaces() + if err != nil { + return + } + for _, iface := range ifaces { + log.Debugf("checking interface: %s", iface.Name) + if iface.Flags&net.FlagUp == 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + return + } + for _, addr := range addrs { + if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() && ipnet.IP.IsPrivate() { + if ipnet.IP.To4() != nil { + listenerIp = ipnet.IP.To4() + dialerIp = listenerIp + return + } + } + } + } + return +} diff --git a/p2p/transport/webrtc/listener.go b/p2p/transport/webrtc/listener.go new file mode 100644 index 0000000000..ab58e53e4d --- /dev/null +++ b/p2p/transport/webrtc/listener.go @@ -0,0 +1,309 @@ +package libp2pwebrtc + +import ( + "context" + "crypto" + "encoding/hex" + "net" + "os" + "strings" + "sync" + "time" + + "github.com/libp2p/go-libp2p/core/network" + + tpt "github.com/libp2p/go-libp2p/core/transport" + ma "github.com/multiformats/go-multiaddr" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/multiformats/go-multibase" + "github.com/multiformats/go-multihash" + + "github.com/pion/ice/v2" + "github.com/pion/webrtc/v3" +) + +var ( + // since verification of the remote fingerprint is deferred until + // the noise handshake, a multihash with an arbitrary value is considered + // as the remote fingerprint during the intial PeerConnection connection + // establishment. + defaultMultihash *multihash.DecodedMultihash = nil +) + +func init() { + // populate default multihash + encoded, err := hex.DecodeString("ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad") + if err != nil { + panic(err) + } + + defaultMultihash = &multihash.DecodedMultihash{ + Code: multihash.SHA2_256, + Name: multihash.Codes[multihash.SHA2_256], + Digest: encoded, + Length: len(encoded), + } + +} + +// / implement net.Listener +type listener struct { + transport *WebRTCTransport + config webrtc.Configuration + localFingerprint webrtc.DTLSFingerprint + localFingerprintMultibase string + mux *udpMuxNewAddr + ctx context.Context + cancel context.CancelFunc + localMultiaddr ma.Multiaddr + connChan chan tpt.CapableConn + wg sync.WaitGroup +} + +func newListener(transport *WebRTCTransport, laddr ma.Multiaddr, socket net.PacketConn, config webrtc.Configuration) (*listener, error) { + mux := NewUDPMuxNewAddr(ice.UDPMuxParams{UDPConn: socket}, make(chan candidateAddr)) + localFingerprints, err := config.Certificates[0].GetFingerprints() + if err != nil { + return nil, err + } + + localMh, err := hex.DecodeString(strings.ReplaceAll(localFingerprints[0].Value, ":", "")) + if err != nil { + return nil, err + } + localMhBuf, _ := multihash.Encode(localMh, multihash.SHA2_256) + localFpMultibase, _ := multibase.Encode(multibase.Base58BTC, localMhBuf) + + ctx, cancel := context.WithCancel(context.Background()) + + l := &listener{ + mux: mux, + transport: transport, + config: config, + localFingerprint: localFingerprints[0], + localFingerprintMultibase: localFpMultibase, + localMultiaddr: laddr, + ctx: ctx, + cancel: cancel, + connChan: make(chan tpt.CapableConn, 20), + } + + l.wg.Add(1) + go l.startAcceptLoop() + return l, err +} + +func (l *listener) startAcceptLoop() { + defer l.wg.Done() + for { + select { + case <-l.ctx.Done(): + return + case addr := <-l.mux.newAddrChan: + go func() { + ctx, cancelFunc := context.WithTimeout(context.Background(), 20*time.Second) + defer cancelFunc() + conn, err := l.accept(ctx, addr) + if err != nil { + log.Debugf("could not accept connection: %v", err) + return + } + l.connChan <- conn + }() + } + } +} + +func (l *listener) Accept() (tpt.CapableConn, error) { + select { + case <-l.ctx.Done(): + return nil, os.ErrClosed + case conn := <-l.connChan: + return conn, nil + } +} + +func (l *listener) Close() error { + select { + case <-l.ctx.Done(): + return nil + default: + } + l.cancel() + l.wg.Wait() + return nil +} + +func (l *listener) Addr() net.Addr { + return l.mux.LocalAddr() +} + +func (l *listener) Multiaddr() ma.Multiaddr { + return l.localMultiaddr +} + +func (l *listener) accept(ctx context.Context, addr candidateAddr) (tpt.CapableConn, error) { + var ( + scope network.ConnManagementScope + pc *webrtc.PeerConnection + ) + + cleanup := func() { + if scope != nil { + scope.Done() + } + if pc != nil { + _ = pc.Close() + } + } + + remoteMultiaddr, err := manet.FromNetAddr(addr.raddr) + if err != nil { + return nil, err + } + + scope, err = l.transport.rcmgr.OpenConnection(network.DirInbound, false, remoteMultiaddr) + if err != nil { + defer cleanup() + return nil, err + } + + settingEngine := webrtc.SettingEngine{} + settingEngine.SetAnsweringDTLSRole(webrtc.DTLSRoleServer) + settingEngine.SetICECredentials(addr.ufrag, addr.ufrag) + settingEngine.SetLite(true) + settingEngine.SetICEUDPMux(l.mux) + settingEngine.DisableCertificateFingerprintVerification(true) + settingEngine.DetachDataChannels() + + api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) + + pc, err = api.NewPeerConnection(l.config) + if err != nil { + defer cleanup() + return nil, err + } + + // signaling channel wraps an error in a struct to make + // the error nullable. + signalChan := make(chan struct{ error }) + var wrappedChannel *dataChannel + var handshakeOnce sync.Once + // this enforces that the correct data channel label is used + // for the handshake + handshakeChannel, err := pc.CreateDataChannel("handshake", &webrtc.DataChannelInit{ + Negotiated: func(v bool) *bool { return &v }(true), + ID: func(v uint16) *uint16 { return &v }(0), + }) + if err != nil { + defer cleanup() + return nil, err + } + + // The raw datachannel is wrapped in the libp2p abstraction + // as early as possible to allow any messages sent by the remote + // to be buffered. This is done since the dialer leads the listener + // in the handshake process, and a faster dialer could have set up + // their connection and started sending Noise handshake messages before + // the listener has set up the onmessage callback. In this use case, + // since the data channels are negotiated out-of-band, they will be + // instantly in `readyState=open` once the SCTP connection is set up. + // Therefore, we wrap the datachannel before performing the + // offer-answer exchange, so any messages sent from the remote get + // buffered. + handshakeChannel.OnOpen(func() { + log.Debugf("handshake channel open") + rwc, err := handshakeChannel.Detach() + if err != nil { + signalChan <- struct{ error }{errDatachannel("could not detach", err)} + return + } + wrappedChannel = newDataChannel( + handshakeChannel, + rwc, + pc, + l.mux.LocalAddr(), + addr.raddr, + ) + handshakeOnce.Do(func() { + signalChan <- struct{ error }{nil} + }) + }) + + // Checking the peerconnection state is not necessary in this case as any + // error caused while accepting will trigger the onerror callback of the + // handshake channel. + handshakeChannel.OnError(func(e error) { + handshakeOnce.Do(func() { + signalChan <- struct{ error }{e} + + }) + }) + + clientSdpString := renderClientSdp(sdpArgs{ + Addr: addr.raddr, + Fingerprint: defaultMultihash, + Ufrag: addr.ufrag, + }) + clientSdp := webrtc.SessionDescription{SDP: clientSdpString, Type: webrtc.SDPTypeOffer} + pc.SetRemoteDescription(clientSdp) + + answer, err := pc.CreateAnswer(nil) + if err != nil { + defer cleanup() + return nil, err + } + + err = pc.SetLocalDescription(answer) + if err != nil { + defer cleanup() + return nil, err + } + + // await datachannel moving to open state + select { + case <-ctx.Done(): + defer cleanup() + return nil, ctx.Err() + case signal := <-signalChan: + if signal.error != nil { + defer cleanup() + log.Debugf("datachannel: ", signal.error) + return nil, errDatachannel("datachannel error", signal.error) + } + } + + // The connection is instantiated before performing the Noise handshake. This is + // to handle the case where the remote is faster and attempts to initiate a stream + // before the ondatachannel callback can be set. + conn := newConnection( + pc, + l.transport, + scope, + l.transport.localPeerId, + l.transport.privKey, + l.localMultiaddr, + "", + nil, + remoteMultiaddr, + ) + + // we do not yet know A's peer ID so accept any inbound + secureConn, err := l.transport.noiseHandshake(ctx, pc, wrappedChannel, "", crypto.SHA256, true) + if err != nil { + defer cleanup() + return nil, err + } + + // earliest point where we know the remote's peerID + err = scope.SetPeer(secureConn.RemotePeer()) + if err != nil { + defer cleanup() + return nil, err + } + + conn.setRemotePeer(secureConn.RemotePeer()) + conn.setRemotePublicKey(secureConn.RemotePublicKey()) + + return conn, nil +} diff --git a/p2p/transport/webrtc/pb/Makefile b/p2p/transport/webrtc/pb/Makefile new file mode 100644 index 0000000000..9555a98a49 --- /dev/null +++ b/p2p/transport/webrtc/pb/Makefile @@ -0,0 +1,9 @@ +pbgos := $(patsubst %.proto,%.pb.go,$(wildcard *.proto)) + +all: $(pbgos) + +%.pb.go: %.proto + protoc --gogofast_out=. --proto_path=$(GOPATH)/src:. $< + +clean: + rm -f *.pb.go diff --git a/p2p/transport/webrtc/pb/message.pb.go b/p2p/transport/webrtc/pb/message.pb.go new file mode 100644 index 0000000000..a8a261af1b --- /dev/null +++ b/p2p/transport/webrtc/pb/message.pb.go @@ -0,0 +1,413 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: message.proto + +package webrtc_pb + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type Message_Flag int32 + +const ( + // The sender will no longer send messages on the stream. + Message_FIN Message_Flag = 0 + // The sender will no longer read messages on the stream. Incoming data is + // being discarded on receipt. + Message_STOP_SENDING Message_Flag = 1 + // The sender abruptly terminates the sending part of the stream. The + // receiver can discard any data that it already received on that stream. + Message_RESET Message_Flag = 2 +) + +var Message_Flag_name = map[int32]string{ + 0: "FIN", + 1: "STOP_SENDING", + 2: "RESET", +} + +var Message_Flag_value = map[string]int32{ + "FIN": 0, + "STOP_SENDING": 1, + "RESET": 2, +} + +func (x Message_Flag) Enum() *Message_Flag { + p := new(Message_Flag) + *p = x + return p +} + +func (x Message_Flag) String() string { + return proto.EnumName(Message_Flag_name, int32(x)) +} + +func (x *Message_Flag) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Message_Flag_value, data, "Message_Flag") + if err != nil { + return err + } + *x = Message_Flag(value) + return nil +} + +func (Message_Flag) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_33c57e4bae7b9afd, []int{0, 0} +} + +type Message struct { + Flag *Message_Flag `protobuf:"varint,1,opt,name=flag,enum=webrtc.pb.Message_Flag" json:"flag,omitempty"` + Message []byte `protobuf:"bytes,2,opt,name=message" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_33c57e4bae7b9afd, []int{0} +} +func (m *Message) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return m.Size() +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetFlag() Message_Flag { + if m != nil && m.Flag != nil { + return *m.Flag + } + return Message_FIN +} + +func (m *Message) GetMessage() []byte { + if m != nil { + return m.Message + } + return nil +} + +func init() { + proto.RegisterEnum("webrtc.pb.Message_Flag", Message_Flag_name, Message_Flag_value) + proto.RegisterType((*Message)(nil), "webrtc.pb.Message") +} + +func init() { proto.RegisterFile("message.proto", fileDescriptor_33c57e4bae7b9afd) } + +var fileDescriptor_33c57e4bae7b9afd = []byte{ + // 161 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x4d, 0x2d, 0x2e, + 0x4e, 0x4c, 0x4f, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x2c, 0x4f, 0x4d, 0x2a, 0x2a, + 0x49, 0xd6, 0x2b, 0x48, 0x52, 0xaa, 0xe3, 0x62, 0xf7, 0x85, 0xc8, 0x09, 0x69, 0x73, 0xb1, 0xa4, + 0xe5, 0x24, 0xa6, 0x4b, 0x30, 0x2a, 0x30, 0x6a, 0xf0, 0x19, 0x89, 0xeb, 0xc1, 0x15, 0xe9, 0x41, + 0x55, 0xe8, 0xb9, 0xe5, 0x24, 0xa6, 0x07, 0x81, 0x15, 0x09, 0x49, 0x70, 0xb1, 0x43, 0xcd, 0x94, + 0x60, 0x52, 0x60, 0xd4, 0xe0, 0x09, 0x82, 0x71, 0x95, 0x74, 0xb8, 0x58, 0x40, 0xea, 0x84, 0xd8, + 0xb9, 0x98, 0xdd, 0x3c, 0xfd, 0x04, 0x18, 0x84, 0x04, 0xb8, 0x78, 0x82, 0x43, 0xfc, 0x03, 0xe2, + 0x83, 0x5d, 0xfd, 0x5c, 0x3c, 0xfd, 0xdc, 0x05, 0x18, 0x85, 0x38, 0xb9, 0x58, 0x83, 0x5c, 0x83, + 0x5d, 0x43, 0x04, 0x98, 0x9c, 0x78, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, + 0x23, 0x39, 0x46, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x2b, 0x2a, 0x76, 0x30, 0xa8, 0x00, 0x00, + 0x00, +} + +func (m *Message) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Message) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Message) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if m.Message != nil { + i -= len(m.Message) + copy(dAtA[i:], m.Message) + i = encodeVarintMessage(dAtA, i, uint64(len(m.Message))) + i-- + dAtA[i] = 0x12 + } + if m.Flag != nil { + i = encodeVarintMessage(dAtA, i, uint64(*m.Flag)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintMessage(dAtA []byte, offset int, v uint64) int { + offset -= sovMessage(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Message) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Flag != nil { + n += 1 + sovMessage(uint64(*m.Flag)) + } + if m.Message != nil { + l = len(m.Message) + n += 1 + l + sovMessage(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovMessage(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozMessage(x uint64) (n int) { + return sovMessage(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Message) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Message: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Message: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Flag", wireType) + } + var v Message_Flag + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= Message_Flag(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Flag = &v + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Message", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMessage + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthMessage + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthMessage + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Message = append(m.Message[:0], dAtA[iNdEx:postIndex]...) + if m.Message == nil { + m.Message = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMessage(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMessage + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipMessage(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessage + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessage + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMessage + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthMessage + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupMessage + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthMessage + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthMessage = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowMessage = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupMessage = fmt.Errorf("proto: unexpected end of group") +) diff --git a/p2p/transport/webrtc/pb/message.proto b/p2p/transport/webrtc/pb/message.proto new file mode 100644 index 0000000000..eab3ceb720 --- /dev/null +++ b/p2p/transport/webrtc/pb/message.proto @@ -0,0 +1,20 @@ +syntax = "proto2"; + +package webrtc.pb; + +message Message { + enum Flag { + // The sender will no longer send messages on the stream. + FIN = 0; + // The sender will no longer read messages on the stream. Incoming data is + // being discarded on receipt. + STOP_SENDING = 1; + // The sender abruptly terminates the sending part of the stream. The + // receiver can discard any data that it already received on that stream. + RESET = 2; + } + + optional Flag flag=1; + + optional bytes message = 2; +} diff --git a/p2p/transport/webrtc/sdp.go b/p2p/transport/webrtc/sdp.go new file mode 100644 index 0000000000..1252d119ab --- /dev/null +++ b/p2p/transport/webrtc/sdp.go @@ -0,0 +1,110 @@ +package libp2pwebrtc + +import ( + "crypto" + "fmt" + "net" + + "github.com/multiformats/go-multihash" +) + +type sdpArgs struct { + Addr *net.UDPAddr + Ufrag string + Fingerprint *multihash.DecodedMultihash +} + +const clientSDP string = ` +v=0 +o=- 0 0 IN %s %s +s=- +c=IN %s %s +t=0 0 +m=application %d UDP/DTLS/SCTP webrtc-datachannel +a=mid:0 +a=ice-options:trickle +a=ice-ufrag:%s +a=ice-pwd:%s +a=fingerprint:%s +a=setup:actpass +a=sctp-port:5000 +a=max-message-size:16384 +` + +func renderClientSdp(args sdpArgs) string { + ipVersion := "IP4" + if args.Addr.IP.To4() == nil { + ipVersion = "IP6" + } + return fmt.Sprintf( + clientSDP, + ipVersion, + args.Addr.IP, + ipVersion, + args.Addr.IP, + args.Addr.Port, + args.Ufrag, + args.Ufrag, + fingerprintToSDP(args.Fingerprint), + ) +} + +const serverSDP string = ` +v=0 +o=- 0 0 IN %s %s +s=- +t=0 0 +a=ice-lite +m=application %d UDP/DTLS/SCTP webrtc-datachannel +c=IN %s %s +a=mid:0 +a=ice-options:ice2 +a=ice-ufrag:%s +a=ice-pwd:%s +a=fingerprint:%s +a=setup:passive +a=sctp-port:5000 +a=max-message-size:16384 +a=candidate:1 1 UDP 1 %s %d typ host +` + +func renderServerSdp(args sdpArgs) string { + ipVersion := "IP4" + if args.Addr.IP.To4() == nil { + ipVersion = "IP6" + } + fp := fingerprintToSDP(args.Fingerprint) + return fmt.Sprintf( + serverSDP, + ipVersion, + args.Addr.IP, + args.Addr.Port, + ipVersion, + args.Addr.IP, + args.Ufrag, + args.Ufrag, + fp, + args.Addr.IP, + args.Addr.Port, + ) +} + +func getSupportedSDPHash(code uint64) (crypto.Hash, bool) { + switch code { + case multihash.MD5: + return crypto.MD5, true + case multihash.SHA1: + return crypto.SHA1, true + case multihash.SHA3_224: + return crypto.SHA3_224, true + case multihash.SHA2_256: + return crypto.SHA256, true + case multihash.SHA3_384: + return crypto.SHA3_384, true + case multihash.SHA2_512: + return crypto.SHA512, true + } + // default to sha256 but the dialer will fail + // the multiaddr first + return crypto.SHA256, false +} diff --git a/p2p/transport/webrtc/transport.go b/p2p/transport/webrtc/transport.go new file mode 100644 index 0000000000..ff975d8b65 --- /dev/null +++ b/p2p/transport/webrtc/transport.go @@ -0,0 +1,432 @@ +package libp2pwebrtc + +import ( + "context" + "crypto" + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "crypto/x509" + "encoding/hex" + "fmt" + "net" + "strings" + "sync" + "time" + + "github.com/google/uuid" + ic "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/sec" + tpt "github.com/libp2p/go-libp2p/core/transport" + "github.com/libp2p/go-libp2p/p2p/security/noise" + + logging "github.com/ipfs/go-log/v2" + ma "github.com/multiformats/go-multiaddr" + mafmt "github.com/multiformats/go-multiaddr-fmt" + manet "github.com/multiformats/go-multiaddr/net" + "github.com/multiformats/go-multihash" + + "github.com/pion/dtls/v2/pkg/crypto/fingerprint" + "github.com/pion/webrtc/v3" +) + +var log = logging.Logger("webrtc-transport") + +var dialMatcher = mafmt.And(mafmt.IP, mafmt.Base(ma.P_UDP), mafmt.Base(ma.P_WEBRTC), mafmt.Base(ma.P_CERTHASH)) + +type WebRTCTransport struct { + webrtcConfig webrtc.Configuration + rcmgr network.ResourceManager + privKey ic.PrivKey + noiseTpt *noise.Transport + localPeerId peer.ID +} + +var _ tpt.Transport = &WebRTCTransport{} + +type Option func(*WebRTCTransport) error + +func New(privKey ic.PrivKey, rcmgr network.ResourceManager, opts ...Option) (*WebRTCTransport, error) { + localPeerId, err := peer.IDFromPrivateKey(privKey) + if err != nil { + return nil, errInternal("could not get local peer ID", err) + } + // We use elliptic P-256 since it is widely supported by browsers. + // See: https://github.com/libp2p/specs/pull/412#discussion_r968294244 + pk, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) + if err != nil { + return nil, errInternal("could not generate key for cert", err) + } + cert, err := webrtc.GenerateCertificate(pk) + if err != nil { + return nil, errInternal("could not generate certificate", err) + } + config := webrtc.Configuration{ + Certificates: []webrtc.Certificate{*cert}, + } + noiseTpt, err := noise.New(noise.ID, privKey, nil) + if err != nil { + return nil, errInternal("unable to create noise transport", err) + } + return &WebRTCTransport{rcmgr: rcmgr, webrtcConfig: config, privKey: privKey, noiseTpt: noiseTpt, localPeerId: localPeerId}, nil +} + +func (t *WebRTCTransport) Protocols() []int { + return []int{ma.P_WEBRTC, ma.P_CERTHASH} +} + +func (t *WebRTCTransport) Proxy() bool { + return false +} + +func (t *WebRTCTransport) CanDial(addr ma.Multiaddr) bool { + return dialMatcher.Matches(addr) +} + +func (t *WebRTCTransport) Listen(addr ma.Multiaddr) (tpt.Listener, error) { + addr, wrtcComponent := ma.SplitLast(addr) + isWebrtc := wrtcComponent.Equal(ma.StringCast("/webrtc")) + if !isWebrtc { + return nil, errMultiaddr("must listen on webrtc multiaddr", nil) + } + nw, host, err := manet.DialArgs(addr) + if err != nil { + return nil, errMultiaddr("listener could not fetch dialargs", err) + } + udpAddr, err := net.ResolveUDPAddr(nw, host) + if err != nil { + return nil, errMultiaddr("listener could not resolve udp address", err) + } + + socket, err := net.ListenUDP(nw, udpAddr) + if err != nil { + return nil, errInternal("could not listen on udp", err) + } + + // construct multiaddr + listenerMultiaddr, err := manet.FromNetAddr(socket.LocalAddr()) + if err != nil { + _ = socket.Close() + return nil, err + } + + listenerFingerprint, err := t.getCertificateFingerprint() + if err != nil { + _ = socket.Close() + return nil, err + } + + encodedLocalFingerprint, err := encodeDTLSFingerprint(listenerFingerprint) + if err != nil { + _ = socket.Close() + return nil, err + } + + certMultiaddress, err := ma.NewMultiaddr(fmt.Sprintf("/webrtc/certhash/%s", encodedLocalFingerprint)) + if err != nil { + _ = socket.Close() + return nil, err + } + + listenerMultiaddr = listenerMultiaddr.Encapsulate(certMultiaddress) + + // log.Debugf("can be dialed at: %s", listenerMultiaddr.Encapsulate(ma.StringCast(fmt.Sprintf("/p2p/%s", t.localPeerId)))) + + return newListener( + t, + listenerMultiaddr, + socket, + t.webrtcConfig, + ) +} + +func (t *WebRTCTransport) Dial( + ctx context.Context, + remoteMultiaddr ma.Multiaddr, + p peer.ID, +) (tpt.CapableConn, error) { + var ( + pc *webrtc.PeerConnection + wrappedChannel *dataChannel + ) + scope, err := t.rcmgr.OpenConnection(network.DirOutbound, false, remoteMultiaddr) + + cleanup := func() { + if pc != nil { + _ = pc.Close() + } + if scope != nil { + scope.Done() + } + } + + if err != nil { + defer cleanup() + return nil, err + } + + err = scope.SetPeer(p) + if err != nil { + defer cleanup() + return nil, err + } + + remoteMultihash, err := decodeRemoteFingerprint(remoteMultiaddr) + if err != nil { + defer cleanup() + return nil, errMultiaddr("could not decode fingerprint", err) + } + remoteHashFunction, ok := getSupportedSDPHash(remoteMultihash.Code) + if !ok { + return nil, errMultiaddr("unsupported hash function", nil) + } + + rnw, rhost, err := manet.DialArgs(remoteMultiaddr) + if err != nil { + defer cleanup() + return nil, errMultiaddr("could not generate dial args", err) + } + + raddr, err := net.ResolveUDPAddr(rnw, rhost) + if err != nil { + defer cleanup() + return nil, errMultiaddr("could not resolve udp address", err) + } + + // Instead of encoding the local fingerprint we + // instead generate a random uuid as the connection ufrag. + // The only requirement here is that the ufrag and password + // must be equal, which will allow the server to determine + // the password using the STUN message. + ufrag := "libp2p+webrtc+v1/" + strings.ReplaceAll(uuid.New().String(), "-", "") + + settingEngine := webrtc.SettingEngine{} + settingEngine.SetICECredentials(ufrag, ufrag) + settingEngine.SetLite(false) + settingEngine.DetachDataChannels() + api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) + + pc, err = api.NewPeerConnection(t.webrtcConfig) + if err != nil { + defer cleanup() + return nil, errInternal("could not instantiate peerconnection", err) + } + + signalChan := make(chan struct{ error }) + var connectedOnce sync.Once + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + switch state { + case webrtc.PeerConnectionStateConnected: + connectedOnce.Do(func() { + signalChan <- struct{ error }{nil} + }) + case webrtc.PeerConnectionStateFailed: + connectedOnce.Do(func() { + err := errConnectionFailed("peerconnection move to failed state", nil) + signalChan <- struct{ error }{err} + }) + } + }) + + // We need to set negotiated = true for this channel on both + // the client and server to avoid DCEP errors. + handshakeChannel, err := pc.CreateDataChannel("handshake", &webrtc.DataChannelInit{ + Negotiated: func(v bool) *bool { return &v }(true), + ID: func(v uint16) *uint16 { return &v }(0), + }) + if err != nil { + defer cleanup() + return nil, errDatachannel("could not create", err) + } + handshakeChannel.OnOpen(func() { + rwc, err := handshakeChannel.Detach() + if err != nil { + signalChan <- struct{ error }{err} + return + } + wrappedChannel = newDataChannel(handshakeChannel, rwc, pc, nil, raddr) + cp, err := handshakeChannel.Transport().Transport().ICETransport().GetSelectedCandidatePair() + if cp == nil || err != nil { + err = errDatachannel("could not fetch selected candidate pair", err) + signalChan <- struct{ error }{err} + return + } + + laddr := &net.UDPAddr{IP: net.ParseIP(cp.Local.Address), Port: int(cp.Local.Port)} + wrappedChannel.laddr = laddr + signalChan <- struct{ error }{nil} + }) + + // do offer-answer exchange + offer, err := pc.CreateOffer(nil) + if err != nil { + defer cleanup() + return nil, errConnectionFailed("could not create offer", err) + } + + err = pc.SetLocalDescription(offer) + if err != nil { + defer cleanup() + return nil, errConnectionFailed("could not set local description", err) + } + + answerSdpString := renderServerSdp(sdpArgs{ + Addr: raddr, + Fingerprint: remoteMultihash, + Ufrag: ufrag, + }) + + answer := webrtc.SessionDescription{SDP: answerSdpString, Type: webrtc.SDPTypeAnswer} + err = pc.SetRemoteDescription(answer) + if err != nil { + defer cleanup() + return nil, errConnectionFailed("could not set remote description", err) + } + + // await peerconnection opening + select { + case signal := <-signalChan: + if signal.error != nil { + defer cleanup() + return nil, signal.error + } + case <-ctx.Done(): + scope.Done() + defer cleanup() + return nil, errDataChannelTimeout + } + + // await datachannel opening + select { + case signal := <-signalChan: + if signal.error != nil { + defer cleanup() + return nil, signal.error + } + case <-ctx.Done(): + scope.Done() + defer cleanup() + return nil, errDataChannelTimeout + } + + // the local address of the selected candidate pair should be the + // local address for the connection, since different datachannels + // are multiplexed over the same SCTP connection + localAddr, err := manet.FromNetAddr(wrappedChannel.LocalAddr()) + if err != nil { + defer cleanup() + return nil, err + } + + conn := newConnection( + pc, + t, + scope, + t.localPeerId, + t.privKey, + localAddr, + p, + nil, + remoteMultiaddr, + ) + tctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + secureConn, err := t.noiseHandshake(tctx, pc, wrappedChannel, p, remoteHashFunction, false) + if err != nil { + defer cleanup() + return nil, err + } + + conn.setRemotePublicKey(secureConn.RemotePublicKey()) + + return conn, nil +} + +func (t *WebRTCTransport) getCertificateFingerprint() (webrtc.DTLSFingerprint, error) { + fps, err := t.webrtcConfig.Certificates[0].GetFingerprints() + if err != nil { + return webrtc.DTLSFingerprint{}, err + } + return fps[0], nil +} + +func (t *WebRTCTransport) generateNoisePrologue(pc *webrtc.PeerConnection, hash crypto.Hash, inbound bool) ([]byte, error) { + raw := pc.SCTP().Transport().GetRemoteCertificate() + cert, err := x509.ParseCertificate(raw) + if err != nil { + return nil, err + } + // guess hash algorithm + localFp, err := t.getCertificateFingerprint() + if err != nil { + return nil, err + } + + remoteFp, err := fingerprint.Fingerprint(cert, hash) + if err != nil { + return nil, err + } + remoteFp = strings.ReplaceAll(strings.ToLower(remoteFp), ":", "") + remoteFpBytes, err := hex.DecodeString(remoteFp) + if err != nil { + return nil, err + } + + local := strings.ReplaceAll(localFp.Value, ":", "") + localBytes, err := hex.DecodeString(local) + if err != nil { + return nil, err + } + + localEncoded, err := multihash.Encode(localBytes, multihash.SHA2_256) + if err != nil { + log.Debugf("could not encode multihash for local fingerprint") + return nil, err + } + remoteEncoded, err := multihash.Encode(remoteFpBytes, multihash.SHA2_256) + if err != nil { + log.Debugf("could not encode multihash for remote fingerprint") + return nil, err + } + + result := []byte("libp2p-webrtc-noise:") + if inbound { + result = append(result, remoteEncoded...) + result = append(result, localEncoded...) + } else { + result = append(result, localEncoded...) + result = append(result, remoteEncoded...) + } + return result, nil +} + +func (t *WebRTCTransport) noiseHandshake(ctx context.Context, pc *webrtc.PeerConnection, datachannel *dataChannel, peer peer.ID, hash crypto.Hash, inbound bool) (secureConn sec.SecureConn, err error) { + prologue, err := t.generateNoisePrologue(pc, hash, inbound) + if err != nil { + return nil, errNoise("could not generate prologue", err) + } + sessionTransport, err := t.noiseTpt.WithSessionOptions( + noise.Prologue(prologue), + noise.DisablePeerIDCheck(), + ) + if err != nil { + return nil, errNoise("could not instantiate transport", err) + } + if inbound { + secureConn, err = sessionTransport.SecureOutbound(ctx, datachannel, "") + if err != nil { + err = errNoise("failed to secure inbound", err) + return + } + } else { + secureConn, err = sessionTransport.SecureInbound(ctx, datachannel, peer) + if err != nil { + err = errNoise("failed to secure outbound", err) + return + } + } + return secureConn, nil +} diff --git a/p2p/transport/webrtc/transport_test.go b/p2p/transport/webrtc/transport_test.go new file mode 100644 index 0000000000..25af292c02 --- /dev/null +++ b/p2p/transport/webrtc/transport_test.go @@ -0,0 +1,274 @@ +package libp2pwebrtc + +import ( + "context" + "fmt" + "net" + "os" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + tpt "github.com/libp2p/go-libp2p/core/transport" + "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multibase" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + "golang.org/x/crypto/sha3" +) + +func getTransport(t *testing.T) (tpt.Transport, peer.ID) { + privKey, _, err := crypto.GenerateKeyPair(crypto.Ed25519, -1) + require.NoError(t, err) + rcmgr := &network.NullResourceManager{} + transport, err := New(privKey, rcmgr) + require.NoError(t, err) + peerID, err := peer.IDFromPrivateKey(privKey) + require.NoError(t, err) + return transport, peerID +} + +var ( + listenerIp net.IP + dialerIp net.IP +) + +func TestMain(m *testing.M) { + listenerIp, dialerIp = getListenerAndDialerIP() + os.Exit(m.Run()) +} + +func TestTransportWebRTC_CanDial(t *testing.T) { + tr, _ := getTransport(t) + invalid := []string{ + "/ip4/1.2.3.4/udp/1234/webrtc", + "/dns/test.test/udp/1234/webrtc", + "/dns/test.test/udp/1234/webrtc/certhash/uEiAsGPzpiPGQzSlVHRXrUCT5EkTV7YFrV4VZ3hpEKTd_zg", + } + + valid := []string{ + "/ip4/1.2.3.4/udp/1234/webrtc/certhash/uEiAsGPzpiPGQzSlVHRXrUCT5EkTV7YFrV4VZ3hpEKTd_zg", + "/ip6/0:0:0:0:0:0:0:1/udp/1234/webrtc/certhash/uEiAsGPzpiPGQzSlVHRXrUCT5EkTV7YFrV4VZ3hpEKTd_zg", + "/ip6/::1/udp/1234/webrtc/certhash/uEiAsGPzpiPGQzSlVHRXrUCT5EkTV7YFrV4VZ3hpEKTd_zg", + } + + for _, addr := range invalid { + ma, err := multiaddr.NewMultiaddr(addr) + require.NoError(t, err) + require.Equal(t, false, tr.CanDial(ma)) + } + + for _, addr := range valid { + ma, err := multiaddr.NewMultiaddr(addr) + require.NoError(t, err) + require.Equal(t, true, tr.CanDial(ma), addr) + } +} + +func TestTransportWebRTC_ListenFailsOnNonWebRTCMultiaddr(t *testing.T) { + tr, _ := getTransport(t) + testAddrs := []string{ + "/ip4/0.0.0.0/udp/0", + "/ip4/0.0.0.0/tcp/0/wss", + } + for _, addr := range testAddrs { + listenMultiaddr, err := multiaddr.NewMultiaddr(addr) + require.NoError(t, err) + listener, err := tr.Listen(listenMultiaddr) + require.Error(t, err) + require.Nil(t, listener) + } +} + +func TestTransportWebRTC_DialFailsOnUnsupportedHashFunction(t *testing.T) { + tr, _ := getTransport(t) + hash := sha3.New512() + certhash := func() string { + _, err := hash.Write([]byte("test-data")) + require.NoError(t, err) + mh, err := multihash.Encode(hash.Sum([]byte{}), multihash.SHA3_512) + require.NoError(t, err) + certhash, err := multibase.Encode(multibase.Base58BTC, mh) + require.NoError(t, err) + return certhash + }() + testaddr, err := multiaddr.NewMultiaddr("/ip4/1.2.3.4/udp/1234/webrtc/certhash/" + certhash) + require.NoError(t, err) + _, err = tr.Dial(context.Background(), testaddr, "") + require.ErrorContains(t, err, "unsupported hash function") +} + +func TestTransportWebRTC_CanListenSingle(t *testing.T) { + tr, listeningPeer := getTransport(t) + tr1, connectingPeer := getTransport(t) + listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp)) + require.NoError(t, err) + listener, err := tr.Listen(listenMultiaddr) + require.NoError(t, err) + + go func() { + _, err := tr1.Dial(context.Background(), listener.Multiaddr(), listeningPeer) + require.NoError(t, err) + }() + + conn, err := listener.Accept() + require.NoError(t, err) + require.NotNil(t, conn) + + require.Equal(t, connectingPeer, conn.RemotePeer()) +} + +func TestTransportWebRTC_CanListenMultiple(t *testing.T) { + tr, listeningPeer := getTransport(t) + listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp)) + require.NoError(t, err) + listener, err := tr.Listen(listenMultiaddr) + require.NoError(t, err) + count := 5 + + for i := 0; i < count; i++ { + go func() { + ctr, _ := getTransport(t) + conn, err := ctr.Dial(context.Background(), listener.Multiaddr(), listeningPeer) + require.NoError(t, err) + require.Equal(t, conn.RemotePeer(), listeningPeer) + }() + } + + for i := 0; i < count; i++ { + _, err := listener.Accept() + require.NoError(t, err) + t.Logf("listener accepted connection: %d", i) + } +} + +func TestTransportWebRTC_ListenerCanCreateStreams(t *testing.T) { + tr, listeningPeer := getTransport(t) + tr1, connectingPeer := getTransport(t) + listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp)) + require.NoError(t, err) + listener, err := tr.Listen(listenMultiaddr) + require.NoError(t, err) + + go func() { + conn, err := listener.Accept() + require.NoError(t, err) + t.Logf("listener accepted connection") + + require.Equal(t, connectingPeer, conn.RemotePeer()) + + stream, err := conn.OpenStream(context.Background()) + require.NoError(t, err) + t.Logf("listener opened stream") + _, err = stream.Write([]byte("test")) + require.NoError(t, err) + }() + + streamChan := make(chan network.MuxedStream) + go func() { + conn, err := tr1.Dial(context.Background(), listener.Multiaddr(), listeningPeer) + require.NoError(t, err) + t.Logf("connection opened by dialer") + stream, err := conn.AcceptStream() + require.NoError(t, err) + t.Logf("dialer accepted stream") + streamChan <- stream + }() + + var stream network.MuxedStream + select { + case stream = <-streamChan: + case <-time.After(3 * time.Second): + t.Fatal("stream opening timed out") + } + buf := make([]byte, 100) + stream.SetReadDeadline(time.Now().Add(3 * time.Second)) + n, err := stream.Read(buf) + require.NoError(t, err) + require.Equal(t, "test", string(buf[:n])) + +} + +func TestTransportWebRTC_DialerCanCreateStreams(t *testing.T) { + tr, listeningPeer := getTransport(t) + listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp)) + require.NoError(t, err) + listener, err := tr.Listen(listenMultiaddr) + require.NoError(t, err) + + tr1, connectingPeer := getTransport(t) + done := make(chan struct{}) + + go func() { + lconn, err := listener.Accept() + require.NoError(t, err) + t.Logf("listener accepted connection") + require.Equal(t, connectingPeer, lconn.RemotePeer()) + + stream, err := lconn.AcceptStream() + require.NoError(t, err) + t.Logf("listener accepted stream") + buf := make([]byte, 100) + n, err := stream.Read(buf) + require.NoError(t, err) + require.Equal(t, "test", string(buf[:n])) + + done <- struct{}{} + }() + + go func() { + conn, err := tr1.Dial(context.Background(), listener.Multiaddr(), listeningPeer) + require.NoError(t, err) + t.Logf("dialer opened connection") + stream, err := conn.OpenStream(context.Background()) + require.NoError(t, err) + t.Logf("dialer opened stream") + _, err = stream.Write([]byte("test")) + require.NoError(t, err) + + }() + select { + case <-done: + case <-time.After(10 * time.Second): + t.Fatal("timed out") + } + +} + +func TestTransportWebRTC_PeerConnectionDTLSFailed(t *testing.T) { + tr, listeningPeer := getTransport(t) + listenMultiaddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp)) + require.NoError(t, err) + listener, err := tr.Listen(listenMultiaddr) + require.NoError(t, err) + + tr1, _ := getTransport(t) + + go func() { + listener.Accept() + }() + + badMultiaddr, _ := multiaddr.SplitFunc(listener.Multiaddr(), func(component multiaddr.Component) bool { + return component.Protocol().Code == multiaddr.P_CERTHASH + }) + + encodedCerthash, err := multihash.Encode(defaultMultihash.Digest, defaultMultihash.Code) + require.NoError(t, err) + badEncodedCerthash, err := multibase.Encode(multibase.Base58BTC, encodedCerthash) + require.NoError(t, err) + badCerthash, err := multiaddr.NewMultiaddr(fmt.Sprintf("/certhash/%s", badEncodedCerthash)) + require.NoError(t, err) + badMultiaddr = badMultiaddr.Encapsulate(badCerthash) + + conn, err := tr1.Dial(context.Background(), badMultiaddr, listeningPeer) + require.Nil(t, conn) + require.Error(t, err) + + webrtcErr, ok := err.(*webRTCTransportError) + require.True(t, ok, "could not cast to webRTCTransportError") + require.Equal(t, webrtcErr.kind, errKindConnectionFailed) + require.Contains(t, webrtcErr.message, "failed") + +} diff --git a/p2p/transport/webrtc/udp_mux.go b/p2p/transport/webrtc/udp_mux.go new file mode 100644 index 0000000000..b12054fa2c --- /dev/null +++ b/p2p/transport/webrtc/udp_mux.go @@ -0,0 +1,351 @@ +package libp2pwebrtc + +// udpMuxNewAddr is mostly similar to UDPMuxDefault exported from +// pion/ice [https://github.com/pion/ice/blob/master/udp_mux.go] with +// the only difference being the additional channel to notify the libp2p +// transport of any STUN requests from unknown ufrags. + +import ( + "errors" + "io" + "net" + "os" + "strings" + "sync" + + "github.com/pion/ice/v2" + "github.com/pion/logging" + "github.com/pion/stun" +) + +type candidateAddr struct { + raddr *net.UDPAddr + ufrag string +} + +var _ ice.UDPMux = &udpMuxNewAddr{} + +// udpMuxNewAddr is an implementation of the interface +type udpMuxNewAddr struct { + params ice.UDPMuxParams + + closedChan chan struct{} + closeOnce sync.Once + + // connsIPv4 and connsIPv6 are maps of all udpMuxedConn indexed by ufrag|network|candidateType + connsIPv4, connsIPv6 map[string]*udpMuxedConn + + addressMapMu sync.RWMutex + addressMap map[string]*udpMuxedConn + + // buffer pool to recycle buffers for net.UDPAddr encodes/decodes + pool *sync.Pool + + mu sync.Mutex + + newAddrChan chan candidateAddr + newAddrs map[*net.UDPAddr]struct{} +} + +const maxAddrSize = 512 +const receiveMTU = 1500 + +// NewUDPMuxNewAddr creates an implementation of UDPMux +func NewUDPMuxNewAddr(params ice.UDPMuxParams, newAddrChan chan candidateAddr) *udpMuxNewAddr { + if params.Logger == nil { + params.Logger = logging.NewDefaultLoggerFactory().NewLogger("ice") + } + + m := &udpMuxNewAddr{ + addressMap: map[string]*udpMuxedConn{}, + params: params, + connsIPv4: make(map[string]*udpMuxedConn), + connsIPv6: make(map[string]*udpMuxedConn), + closedChan: make(chan struct{}), + pool: &sync.Pool{ + New: func() interface{} { + // big enough buffer to fit both packet and address + return newBufferHolder(receiveMTU + maxAddrSize) + }, + }, + newAddrChan: newAddrChan, + newAddrs: make(map[*net.UDPAddr]struct{}), + } + + go m.connWorker() + + return m +} + +// LocalAddr returns the listening address of this UDPMuxNewAddr +func (m *udpMuxNewAddr) LocalAddr() net.Addr { + return m.params.UDPConn.LocalAddr() +} + +// GetConn returns a PacketConn given the connection's ufrag and network +// creates the connection if an existing one can't be found +func (m *udpMuxNewAddr) GetConn(ufrag string, isIPv6 bool) (net.PacketConn, error) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.IsClosed() { + return nil, io.ErrClosedPipe + } + + if conn, ok := m.getConn(ufrag, isIPv6); ok { + return conn, nil + } + + c := m.createMuxedConn(ufrag) + go func() { + <-c.CloseChannel() + m.removeConn(ufrag) + }() + + if isIPv6 { + m.connsIPv6[ufrag] = c + } else { + m.connsIPv4[ufrag] = c + } + + return c, nil +} + +// RemoveConnByUfrag stops and removes the muxed packet connection +func (m *udpMuxNewAddr) RemoveConnByUfrag(ufrag string) { + removedConns := make([]*udpMuxedConn, 0, 2) + + // Keep lock section small to avoid deadlock with conn lock + m.mu.Lock() + if c, ok := m.connsIPv4[ufrag]; ok { + delete(m.connsIPv4, ufrag) + removedConns = append(removedConns, c) + } + if c, ok := m.connsIPv6[ufrag]; ok { + delete(m.connsIPv6, ufrag) + removedConns = append(removedConns, c) + } + m.mu.Unlock() + + m.addressMapMu.Lock() + defer m.addressMapMu.Unlock() + + for _, c := range removedConns { + addresses := c.getAddresses() + for _, addr := range addresses { + delete(m.addressMap, addr) + } + } +} + +// IsClosed returns true if the mux had been closed +func (m *udpMuxNewAddr) IsClosed() bool { + select { + case <-m.closedChan: + return true + default: + return false + } +} + +// Close the mux, no further connections could be created +func (m *udpMuxNewAddr) Close() error { + var err error + m.closeOnce.Do(func() { + m.mu.Lock() + defer m.mu.Unlock() + + for _, c := range m.connsIPv4 { + _ = c.Close() + } + for _, c := range m.connsIPv6 { + _ = c.Close() + } + + m.connsIPv4 = make(map[string]*udpMuxedConn) + m.connsIPv6 = make(map[string]*udpMuxedConn) + + close(m.closedChan) + }) + return err +} + +func (m *udpMuxNewAddr) removeConn(key string) { + // keep lock section small to avoid deadlock with conn lock + c := func() *udpMuxedConn { + m.mu.Lock() + defer m.mu.Unlock() + + if c, ok := m.connsIPv4[key]; ok { + delete(m.connsIPv4, key) + return c + } + + if c, ok := m.connsIPv6[key]; ok { + delete(m.connsIPv6, key) + return c + } + + return nil + }() + + if c == nil { + return + } + + m.addressMapMu.Lock() + defer m.addressMapMu.Unlock() + + addresses := c.getAddresses() + for _, addr := range addresses { + delete(m.addressMap, addr) + } +} + +func (m *udpMuxNewAddr) writeTo(buf []byte, raddr net.Addr) (n int, err error) { + return m.params.UDPConn.WriteTo(buf, raddr) +} + +func (m *udpMuxNewAddr) registerConnForAddress(conn *udpMuxedConn, addr string) { + if m.IsClosed() { + return + } + + m.addressMapMu.Lock() + defer m.addressMapMu.Unlock() + + existing, ok := m.addressMap[addr] + if ok { + existing.removeAddress(addr) + } + m.addressMap[addr] = conn + + m.params.Logger.Debugf("Registered %s for %s", addr, conn.params.Key) +} + +func (m *udpMuxNewAddr) createMuxedConn(key string) *udpMuxedConn { + c := newUDPMuxedConn(&udpMuxedConnParams{ + Mux: m, + Key: key, + AddrPool: m.pool, + LocalAddr: m.LocalAddr(), + Logger: m.params.Logger, + }) + return c +} + +func (m *udpMuxNewAddr) connWorker() { + logger := m.params.Logger + + defer func() { + _ = m.Close() + }() + + buf := make([]byte, receiveMTU) + for { + n, addr, err := m.params.UDPConn.ReadFrom(buf) + if m.IsClosed() { + return + } else if err != nil { + if os.IsTimeout(err) { + continue + } else if err != io.EOF { + logger.Errorf("could not read udp packet: %v", err) + } + + return + } + + udpAddr, ok := addr.(*net.UDPAddr) + if !ok { + logger.Errorf("underlying PacketConn did not return a UDPAddr") + return + } + + // If we have already seen this address dispatch to the appropriate destination + m.addressMapMu.Lock() + destinationConn := m.addressMap[addr.String()] + m.addressMapMu.Unlock() + + // If we haven't seen this address before but is a STUN packet lookup by ufrag + if destinationConn == nil && stun.IsMessage(buf[:n]) { + msg := &stun.Message{ + Raw: append([]byte{}, buf[:n]...), + } + // log.Info("received new stun message: %v", *msg) + + if err = msg.Decode(); err != nil { + m.params.Logger.Warnf("Failed to handle decode ICE from %s: %v\n", addr.String(), err) + continue + } + + ufrag, ufragErr := ufragFromStunMessage(msg, false) + if ufragErr != nil { + m.params.Logger.Warnf("%v", ufragErr) + log.Warnf("%v", ufragErr) + continue + } + + isIPv6 := udpAddr.IP.To4() == nil + + m.mu.Lock() + destinationConn, ok = m.getConn(ufrag, isIPv6) + m.mu.Unlock() + + // notify that a new connection is requested + if !ok { + // log.Debugf("new connection requested: %v %v", udpAddr, ufrag) + m.newAddrChan <- candidateAddr{raddr: udpAddr, ufrag: ufrag} + m.mu.Lock() + m.newAddrs[udpAddr] = struct{}{} + m.mu.Unlock() + dc, _ := m.GetConn(ufrag, isIPv6) + destinationConn = dc.(*udpMuxedConn) + } + } + + if destinationConn == nil { + m.params.Logger.Debugf("dropping packet from %s, addr: %s", udpAddr.String(), addr.String()) + continue + } + + if err = destinationConn.writePacket(buf[:n], udpAddr); err != nil { + m.params.Logger.Errorf("could not write packet: %v", err) + } + } +} + +func ufragFromStunMessage(msg *stun.Message, local_ufrag bool) (string, error) { + attr, stunAttrErr := msg.Get(stun.AttrUsername) + if stunAttrErr != nil { + return "", stunAttrErr + } + ufrag := strings.Split(string(attr), ":") + if len(ufrag) < 2 { + return "", errors.New("invalid STUN username attribute") + } + if local_ufrag { + return ufrag[1], nil + } else { + return ufrag[0], nil + } +} + +func (m *udpMuxNewAddr) getConn(ufrag string, isIPv6 bool) (val *udpMuxedConn, ok bool) { + if isIPv6 { + val, ok = m.connsIPv6[ufrag] + } else { + val, ok = m.connsIPv4[ufrag] + } + return +} + +type bufferHolder struct { + buffer []byte +} + +func newBufferHolder(size int) *bufferHolder { + return &bufferHolder{ + buffer: make([]byte, size), + } +} diff --git a/p2p/transport/webrtc/udp_mux_conn.go b/p2p/transport/webrtc/udp_mux_conn.go new file mode 100644 index 0000000000..9369a2899a --- /dev/null +++ b/p2p/transport/webrtc/udp_mux_conn.go @@ -0,0 +1,250 @@ +package libp2pwebrtc + +// This file is copied as is from https://github.com/pion/ice/blob/master/udp_muxed_conn.go . +// The udpMuxedConn struct is not exported from pion/ice and is required for implementing +// the custom udp muxer. + +import ( + "encoding/binary" + "io" + "net" + "sync" + "time" + + "github.com/pion/logging" + "github.com/pion/transport/packetio" +) + +type udpMuxedConnParams struct { + Mux *udpMuxNewAddr + AddrPool *sync.Pool + Key string + LocalAddr net.Addr + Logger logging.LeveledLogger +} + +// udpMuxedConn represents a logical packet conn for a single remote as identified by ufrag +type udpMuxedConn struct { + params *udpMuxedConnParams + // remote addresses that we have sent to on this conn + addresses []string + + // channel holding incoming packets + buffer *packetio.Buffer + closedChan chan struct{} + closeOnce sync.Once + mu sync.Mutex +} + +func newUDPMuxedConn(params *udpMuxedConnParams) *udpMuxedConn { + p := &udpMuxedConn{ + params: params, + buffer: packetio.NewBuffer(), + closedChan: make(chan struct{}), + } + + return p +} + +func (c *udpMuxedConn) ReadFrom(b []byte) (n int, raddr net.Addr, err error) { + buf := c.params.AddrPool.Get().(*bufferHolder) + defer c.params.AddrPool.Put(buf) + + // read address + total, err := c.buffer.Read(buf.buffer) + if err != nil { + return 0, nil, err + } + + dataLen := int(binary.LittleEndian.Uint16(buf.buffer[:2])) + if dataLen > total || dataLen > len(b) { + return 0, nil, io.ErrShortBuffer + } + + // read data and then address + offset := 2 + copy(b, buf.buffer[offset:offset+dataLen]) + offset += dataLen + + // read address len & decode address + addrLen := int(binary.LittleEndian.Uint16(buf.buffer[offset : offset+2])) + offset += 2 + + if raddr, err = decodeUDPAddr(buf.buffer[offset : offset+addrLen]); err != nil { + return 0, nil, err + } + + return dataLen, raddr, nil +} + +func (c *udpMuxedConn) WriteTo(buf []byte, raddr net.Addr) (n int, err error) { + if c.isClosed() { + return 0, io.ErrClosedPipe + } + // each time we write to a new address, we'll register it with the mux + addr := raddr.String() + if !c.containsAddress(addr) { + c.addAddress(addr) + } + + return c.params.Mux.writeTo(buf, raddr) +} + +func (c *udpMuxedConn) LocalAddr() net.Addr { + return c.params.LocalAddr +} + +func (c *udpMuxedConn) SetDeadline(tm time.Time) error { + return nil +} + +func (c *udpMuxedConn) SetReadDeadline(tm time.Time) error { + return nil +} + +func (c *udpMuxedConn) SetWriteDeadline(tm time.Time) error { + return nil +} + +func (c *udpMuxedConn) CloseChannel() <-chan struct{} { + return c.closedChan +} + +func (c *udpMuxedConn) Close() error { + var err error + c.closeOnce.Do(func() { + err = c.buffer.Close() + close(c.closedChan) + }) + c.mu.Lock() + defer c.mu.Unlock() + c.addresses = nil + return err +} + +func (c *udpMuxedConn) isClosed() bool { + select { + case <-c.closedChan: + return true + default: + return false + } +} + +func (c *udpMuxedConn) getAddresses() []string { + c.mu.Lock() + defer c.mu.Unlock() + addresses := make([]string, len(c.addresses)) + copy(addresses, c.addresses) + return addresses +} + +func (c *udpMuxedConn) addAddress(addr string) { + c.mu.Lock() + c.addresses = append(c.addresses, addr) + c.mu.Unlock() + + // map it on mux + c.params.Mux.registerConnForAddress(c, addr) +} + +func (c *udpMuxedConn) removeAddress(addr string) { + c.mu.Lock() + defer c.mu.Unlock() + + newAddresses := make([]string, 0, len(c.addresses)) + for _, a := range c.addresses { + if a != addr { + newAddresses = append(newAddresses, a) + } + } + + c.addresses = newAddresses +} + +func (c *udpMuxedConn) containsAddress(addr string) bool { + c.mu.Lock() + defer c.mu.Unlock() + for _, a := range c.addresses { + if addr == a { + return true + } + } + return false +} + +func (c *udpMuxedConn) writePacket(data []byte, addr *net.UDPAddr) error { + // write two packets, address and data + buf := c.params.AddrPool.Get().(*bufferHolder) + defer c.params.AddrPool.Put(buf) + + // format of buffer | data len | data bytes | addr len | addr bytes | + if len(buf.buffer) < len(data)+maxAddrSize { + return io.ErrShortBuffer + } + // data len + binary.LittleEndian.PutUint16(buf.buffer, uint16(len(data))) + offset := 2 + + // data + copy(buf.buffer[offset:], data) + offset += len(data) + + // write address first, leaving room for its length + n, err := encodeUDPAddr(addr, buf.buffer[offset+2:]) + if err != nil { + return nil + } + total := offset + n + 2 + + // address len + binary.LittleEndian.PutUint16(buf.buffer[offset:], uint16(n)) + + if _, err := c.buffer.Write(buf.buffer[:total]); err != nil { + return err + } + return nil +} + +func encodeUDPAddr(addr *net.UDPAddr, buf []byte) (int, error) { + ipdata, err := addr.IP.MarshalText() + if err != nil { + return 0, err + } + total := 2 + len(ipdata) + 2 + len(addr.Zone) + if total > len(buf) { + return 0, io.ErrShortBuffer + } + + binary.LittleEndian.PutUint16(buf, uint16(len(ipdata))) + offset := 2 + n := copy(buf[offset:], ipdata) + offset += n + binary.LittleEndian.PutUint16(buf[offset:], uint16(addr.Port)) + offset += 2 + copy(buf[offset:], addr.Zone) + return total, nil +} + +func decodeUDPAddr(buf []byte) (*net.UDPAddr, error) { + addr := net.UDPAddr{} + + offset := 0 + ipLen := int(binary.LittleEndian.Uint16(buf[:2])) + offset += 2 + // basic bounds checking + if ipLen+offset > len(buf) { + return nil, io.ErrShortBuffer + } + if err := addr.IP.UnmarshalText(buf[offset : offset+ipLen]); err != nil { + return nil, err + } + offset += ipLen + addr.Port = int(binary.LittleEndian.Uint16(buf[offset : offset+2])) + offset += 2 + zone := make([]byte, len(buf[offset:])) + copy(zone, buf[offset:]) + addr.Zone = string(zone) + + return &addr, nil +} diff --git a/p2p/transport/webrtc/udp_mux_test.go b/p2p/transport/webrtc/udp_mux_test.go new file mode 100644 index 0000000000..eedf518ffc --- /dev/null +++ b/p2p/transport/webrtc/udp_mux_test.go @@ -0,0 +1,60 @@ +package libp2pwebrtc + +import ( + "fmt" + "net" + "os" + "strings" + "testing" + "time" + + "github.com/pion/ice/v2" + "github.com/pion/logging" + "github.com/pion/stun" +) + +func TestUdpMuxNewAddrNewStun(t *testing.T) { + serverConn, err := net.ListenUDP("udp4", &net.UDPAddr{IP: listenerIp, Port: 0}) + if err != nil { + panic(err) + } + + clientConn, err := net.ListenUDP("udp", &net.UDPAddr{IP: dialerIp, Port: 0}) + loggerFactory := logging.NewDefaultLoggerFactory() + loggerFactory.Writer = os.Stdout + loggerFactory.DefaultLogLevel = logging.LogLevelDebug + + logger := loggerFactory.NewLogger("mux-test") + + newAddrChan := make(chan candidateAddr, 1) + _ = NewUDPMuxNewAddr(ice.UDPMuxParams{UDPConn: serverConn, Logger: logger}, newAddrChan) + + certhash := "496612170D1C91AE574CC636DDD597D27D62C99A7FB9A3F47003E7439173235E" + go func() { + <-time.After(1 * time.Second) + msg := stun.MustBuild( + stun.TransactionID, + stun.BindingRequest, + ice.AttrControl{Role: ice.Controlling}, + stun.NewUsername(fmt.Sprintf("%s:%s", certhash, certhash)), + ) + msg.Encode() + _, err := clientConn.WriteTo(msg.Raw, serverConn.LocalAddr()) + if err != nil { + panic(err) + } + }() + + select { + case addr := <-newAddrChan: + hash := addr.ufrag + if err != nil { + t.Fatal(err) + } + if !strings.EqualFold(hash, certhash) { + t.Fatalf("expected hash: %s, received: %s", certhash, hash) + } + case <-time.After(20 * time.Second): + t.Fatal("test timed out") + } +} diff --git a/p2p/transport/webrtc/util.go b/p2p/transport/webrtc/util.go new file mode 100644 index 0000000000..572ddc1c2b --- /dev/null +++ b/p2p/transport/webrtc/util.go @@ -0,0 +1,59 @@ +package libp2pwebrtc + +import ( + "encoding/hex" + "strings" + + ma "github.com/multiformats/go-multiaddr" + "github.com/multiformats/go-multibase" + mh "github.com/multiformats/go-multihash" + "github.com/pion/webrtc/v3" +) + +func maFingerprintToSdp(fp string) string { + result := "" + first := true + for pos, char := range fp { + if pos%2 == 0 { + if first { + first = false + } else { + result += ":" + } + } + result += string(char) + } + return result +} + +func fingerprintToSDP(fp *mh.DecodedMultihash) string { + if fp == nil { + return "" + } + fpDigest := maFingerprintToSdp(hex.EncodeToString(fp.Digest)) + return "sha-256 " + fpDigest +} + +func decodeRemoteFingerprint(maddr ma.Multiaddr) (*mh.DecodedMultihash, error) { + remoteFingerprintMultibase, err := maddr.ValueForProtocol(ma.P_CERTHASH) + if err != nil { + return nil, err + } + _, data, err := multibase.Decode(remoteFingerprintMultibase) + if err != nil { + return nil, err + } + return mh.Decode(data) +} + +func encodeDTLSFingerprint(fp webrtc.DTLSFingerprint) (string, error) { + digest, err := hex.DecodeString(strings.ReplaceAll(fp.Value, ":", "")) + if err != nil { + return "", err + } + encoded, err := mh.Encode(digest, mh.SHA2_256) + if err != nil { + return "", err + } + return multibase.Encode(multibase.Base58BTC, encoded) +} diff --git a/p2p/transport/webrtc/util_test.go b/p2p/transport/webrtc/util_test.go new file mode 100644 index 0000000000..37454ea257 --- /dev/null +++ b/p2p/transport/webrtc/util_test.go @@ -0,0 +1,72 @@ +package libp2pwebrtc + +import ( + "net" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMaFingerprintToSdp(t *testing.T) { + certhash := "496612170D1C91AE574CC636DDD597D27D62C99A7FB9A3F47003E7439173235E" + expected := "49:66:12:17:0D:1C:91:AE:57:4C:C6:36:DD:D5:97:D2:7D:62:C9:9A:7F:B9:A3:F4:70:03:E7:43:91:73:23:5E" + result := maFingerprintToSdp(certhash) + require.Equal(t, expected, result) +} + +const expectedServerSDP = ` +v=0 +o=- 0 0 IN IP4 0.0.0.0 +s=- +t=0 0 +a=ice-lite +m=application 37826 UDP/DTLS/SCTP webrtc-datachannel +c=IN IP4 0.0.0.0 +a=mid:0 +a=ice-options:ice2 +a=ice-ufrag:d2c0fc07-8bb3-42ae-bae2-a6fce8a0b581 +a=ice-pwd:d2c0fc07-8bb3-42ae-bae2-a6fce8a0b581 +a=fingerprint:sha-256 ba:78:16:bf:8f:01:cf:ea:41:41:40:de:5d:ae:22:23:b0:03:61:a3:96:17:7a:9c:b4:10:ff:61:f2:00:15:ad +a=setup:passive +a=sctp-port:5000 +a=max-message-size:16384 +a=candidate:1 1 UDP 1 0.0.0.0 37826 typ host +` + +func TestRenderServerSDP(t *testing.T) { + args := sdpArgs{ + Addr: &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 37826}, + Ufrag: "d2c0fc07-8bb3-42ae-bae2-a6fce8a0b581", + Fingerprint: defaultMultihash, + } + + sdp := renderServerSdp(args) + require.Equal(t, expectedServerSDP, sdp) +} + +const expectedClientSDP = ` +v=0 +o=- 0 0 IN IP4 0.0.0.0 +s=- +c=IN IP4 0.0.0.0 +t=0 0 +m=application 37826 UDP/DTLS/SCTP webrtc-datachannel +a=mid:0 +a=ice-options:trickle +a=ice-ufrag:d2c0fc07-8bb3-42ae-bae2-a6fce8a0b581 +a=ice-pwd:d2c0fc07-8bb3-42ae-bae2-a6fce8a0b581 +a=fingerprint:sha-256 ba:78:16:bf:8f:01:cf:ea:41:41:40:de:5d:ae:22:23:b0:03:61:a3:96:17:7a:9c:b4:10:ff:61:f2:00:15:ad +a=setup:actpass +a=sctp-port:5000 +a=max-message-size:16384 +` + +func TestRenderClientSDP(t *testing.T) { + args := sdpArgs{ + Addr: &net.UDPAddr{IP: net.IPv4(0, 0, 0, 0), Port: 37826}, + Ufrag: "d2c0fc07-8bb3-42ae-bae2-a6fce8a0b581", + Fingerprint: defaultMultihash, + } + sdp := renderClientSdp(args) + require.Equal(t, expectedClientSDP, sdp) +}