Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for data transfer encryption via rc4 and aes #236

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ env:
- PLATFORM=hdp2
- PLATFORM=cdh5
- PLATFORM=cdh6
- PLATFORM=cdh6 ENCRYPT_DATA_TRANSFER=true
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=authentication
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=integrity
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy
- PLATFORM=cdh6 KERBEROS=true RPC_PROTECTION=privacy ENCRYPT_DATA_TRANSFER=true
before_install:
- export GO111MODULE=on # Travis installs into $GOPATH/src, which disables module support by default.
install:
Expand Down
42 changes: 42 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"

"github.com/colinmarc/hdfs/v2/hadoopconf"
"github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_common"
hdfs "github.com/colinmarc/hdfs/v2/internal/protocol/hadoop_hdfs"
"github.com/colinmarc/hdfs/v2/internal/rpc"
krb "gopkg.in/jcmturner/gokrb5.v7/client"
Expand Down Expand Up @@ -68,6 +69,13 @@ type ClientOptions struct {
// multi-namenode setup (for example: 'nn/_HOST'). It is required if
// KerberosClient is provided.
KerberosServicePrincipleName string
// EncryptDataTransfer specifies whether or not data transfer for datanodes
// needs to utilize encryption and thus do a negotiation to get data
// this is specified by dfs.encrypt.data.transfer in the config
EncryptDataTransfer bool
// SecureDataNode specifies whether we're protecting our data transfer
// communication via dfs.data.transfer.protection
SecureDataNode bool
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was something else, actually? We've always sent the block token, and I think this is just a server-side option for verifying them (with or without kerberos). In any case it should be EnableBlockAccessToken

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out you're right, this should be changed to reference dfs.data.transfer.protection not dfs.block.access.token.enable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if the new comments / updated name/comment is good.

}

// ClientOptionsFromConf attempts to load any relevant configuration options
Expand Down Expand Up @@ -116,6 +124,26 @@ func ClientOptionsFromConf(conf hadoopconf.HadoopConf) ClientOptions {
options.KerberosServicePrincipleName = strings.Split(conf["dfs.namenode.kerberos.principal"], "@")[0]
}

dataTransferProt := strings.Split(strings.ToLower(conf["dfs.data.transfer.protection"]), ",")
for _, val := range dataTransferProt {
switch val {
case "privacy":
options.EncryptDataTransfer = true
fallthrough
case "integrity", "authentication":
options.SecureDataNode = true
}
}

// dfs.encrypt.data.transfer set to true overrides dfs.data.transfer.protection and
// requires both privacy and integrity for communication. If dfs.encrypt.data.transfer is
// "true" we explicitly set EncryptDataTransfer and SecureDataNode to true, regardless of
// what they already were.
if conf["dfs.encrypt.data.transfer"] == "true" {
options.EncryptDataTransfer = true
options.SecureDataNode = true
}

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment for this method needs to be updated (to say that we munge these fields)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment updated. let me know if this is good.

return options
}

Expand Down Expand Up @@ -148,6 +176,20 @@ func NewClient(options ClientOptions) (*Client, error) {
return &Client{namenode: namenode, options: options}, nil
}

func (c *Client) datanodeDialFunc(token *hadoop_common.TokenProto) func(ctx context.Context, network, addr string) (net.Conn, error) {
if c.options.EncryptDataTransfer || c.options.SecureDataNode {
return (&rpc.DatanodeSaslDialer{
Dialer: c.options.DatanodeDialFunc,
Key: c.namenode.GetEncryptionKeys(),
Privacy: c.options.EncryptDataTransfer,
Integrity: c.options.SecureDataNode,
Token: token,
}).DialContext
}

return c.options.DatanodeDialFunc
}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid wrapping it at all if those options are false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at the definition for DialContext in the DatanodeSaslDialer it indeed will just call Dial and won't wrap the connection if those values are false. If you'd prefer though I can have this just return the DatanodeDialFunc if those values are false rather than using the sasl dialer


// New returns Client connected to the namenode(s) specified by address, or an
// error if it can't connect. Multiple namenodes can be specified by separating
// them with commas, for example "nn1:9000,nn2:9000".
Expand Down
4 changes: 4 additions & 0 deletions cmd/hdfs/test/helper.bash
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ export HADOOP_FS=${HADOOP_FS-"hadoop fs"}
export ROOT_TEST_DIR="$BATS_TEST_DIRNAME/../../.."
export HDFS="$ROOT_TEST_DIR/hdfs"

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

# stolen from https://github.com/sstephenson/rbenv/blob/master/test/test_helper.bash

flunk() {
Expand Down
5 changes: 3 additions & 2 deletions file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,12 @@ func (f *FileReader) Checksum() ([]byte, error) {
paddedLength := 32
totalLength := 0
checksum := md5.New()

for _, block := range f.blocks {
cr := &rpc.ChecksumReader{
Block: block,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

err := cr.SetDeadline(f.deadline)
Expand Down Expand Up @@ -405,7 +406,7 @@ func (f *FileReader) getNewBlockReader() error {
Block: block,
Offset: int64(off - start),
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

return f.SetDeadline(f.deadline)
Expand Down
4 changes: 2 additions & 2 deletions file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ func TestFileReadDeadline(t *testing.T) {
file, err := client.Open("/_test/foo.txt")
require.NoError(t, err)

file.SetDeadline(time.Now().Add(100 * time.Millisecond))
file.SetDeadline(time.Now().Add(200 * time.Millisecond))
_, err = file.Read([]byte{0, 0})
assert.NoError(t, err)

time.Sleep(100 * time.Millisecond)
time.Sleep(200 * time.Millisecond)
_, err = file.Read([]byte{0, 0})
assert.NotNil(t, err)
}
Expand Down
7 changes: 4 additions & 3 deletions file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (c *Client) Append(name string) (*FileWriter, error) {
Offset: int64(block.B.GetNumBytes()),
Append: true,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

err = f.blockWriter.SetDeadline(f.deadline)
Expand Down Expand Up @@ -262,12 +262,13 @@ func (f *FileWriter) startNewBlock() error {
return &os.PathError{"create", f.name, interpretException(err)}
}

block := addBlockResp.GetBlock()
f.blockWriter = &rpc.BlockWriter{
ClientName: f.client.namenode.ClientName,
Block: addBlockResp.GetBlock(),
Block: block,
BlockSize: f.blockSize,
UseDatanodeHostname: f.client.options.UseDatanodeHostname,
DialFunc: f.client.options.DatanodeDialFunc,
DialFunc: f.client.datanodeDialFunc(block.GetBlockToken()),
}

return f.blockWriter.SetDeadline(f.deadline)
Expand Down
4 changes: 4 additions & 0 deletions fixtures.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
set -e

# jdk11 is missing some APIs that the older jars here rely on
# so point at openjdk8 for now
export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

HADOOP_FS=${HADOOP_FS-"hadoop fs"}
$HADOOP_FS -mkdir -p "/_test"
$HADOOP_FS -chmod 777 "/_test"
Expand Down
65 changes: 65 additions & 0 deletions internal/rpc/aes_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rpc

import (
"crypto/aes"
"crypto/cipher"
"net"
"time"
)

type aesConn struct {
conn net.Conn

encStream cipher.StreamWriter
decStream cipher.StreamReader
}

func newAesConn(conn net.Conn, inKey, outKey, inIv, outIv []byte) (net.Conn, error) {
c := &aesConn{conn: conn}

encBlock, err := aes.NewCipher(inKey)
if err != nil {
return nil, err
}

decBlock, err := aes.NewCipher(outKey)
if err != nil {
return nil, err
}

c.encStream = cipher.StreamWriter{S: cipher.NewCTR(encBlock, inIv), W: conn}
c.decStream = cipher.StreamReader{S: cipher.NewCTR(decBlock, outIv), R: conn}
return c, nil
}

func (d *aesConn) Close() error {
return d.conn.Close()
}

func (d *aesConn) LocalAddr() net.Addr {
return d.conn.LocalAddr()
}

func (d *aesConn) RemoteAddr() net.Addr {
return d.conn.RemoteAddr()
}

func (d *aesConn) SetDeadline(t time.Time) error {
return d.conn.SetDeadline(t)
}

func (d *aesConn) SetReadDeadline(t time.Time) error {
return d.conn.SetReadDeadline(t)
}

func (d *aesConn) SetWriteDeadline(t time.Time) error {
return d.conn.SetWriteDeadline(t)
}

func (d *aesConn) Write(b []byte) (n int, err error) {
return d.encStream.Write(b)
}

func (d *aesConn) Read(b []byte) (n int, err error) {
return d.decStream.Read(b)
}
19 changes: 6 additions & 13 deletions internal/rpc/block_write_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,33 +330,26 @@ func (s *blockWriteStream) writePacket(p outboundPacket) error {
DataLen: proto.Int32(int32(len(p.data))),
}

header := make([]byte, 6)
// Don't ask me why this doesn't include the header proto...
totalLength := len(p.data) + len(p.checksums) + 4

header := make([]byte, 6, 6+totalLength)
infoBytes, err := proto.Marshal(headerInfo)
if err != nil {
return err
}

// Don't ask me why this doesn't include the header proto...
totalLength := len(p.data) + len(p.checksums) + 4
binary.BigEndian.PutUint32(header, uint32(totalLength))
binary.BigEndian.PutUint16(header[4:], uint16(len(infoBytes)))
header = append(header, infoBytes...)
header = append(header, p.checksums...)
header = append(header, p.data...)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give header some extra capacity up on line 333?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!


_, err = s.conn.Write(header)
if err != nil {
return err
}

_, err = s.conn.Write(p.checksums)
if err != nil {
return err
}

_, err = s.conn.Write(p.data)
if err != nil {
return err
}

return nil
}

Expand Down
86 changes: 81 additions & 5 deletions internal/rpc/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,12 @@ type tokenChallenge struct {
algorithm string
}

// parseChallenge returns a tokenChallenge parsed from a challenge response from
// the namenode.
func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) {
func parseChallenge(challenge []byte) (*tokenChallenge, error) {
tokenChallenge := tokenChallenge{}

matched := challengeRegexp.FindAllSubmatch(auth.Challenge, -1)
matched := challengeRegexp.FindAllSubmatch(challenge, -1)
if matched == nil {
return nil, fmt.Errorf("invalid token challenge: %s", auth.Challenge)
return nil, fmt.Errorf("invalid token challenge: %s", challenge)
}

for _, m := range matched {
Expand All @@ -64,3 +62,81 @@ func parseChallenge(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error)

return &tokenChallenge, nil
}

// parseChallengeAuth returns a tokenChallenge parsed from a challenge response from
// the namenode.
func parseChallengeAuth(auth *hadoop.RpcSaslProto_SaslAuth) (*tokenChallenge, error) {
return parseChallenge(auth.Challenge)
}

type cipherType uint8

const (
cipherUnknown cipherType = 0
cipherDES cipherType = 1 << iota
cipher3DES
cipherRC4
cipherRC440
cipherRC456
cipherAESCBC
)

func (c cipherType) String() string {
switch c {
case cipherDES:
return "des"
case cipher3DES:
return "3des"
case cipherRC4:
return "rc4"
case cipherRC440:
return "rc4-40"
case cipherRC456:
return "rc4-56"
case cipherAESCBC:
return "aes-cbc"
}
return ""
}

func getCipher(s string) cipherType {
switch s {
case "des":
return cipherDES
case "3des":
return cipher3DES
case "rc4":
return cipherRC4
case "rc4-40":
return cipherRC440
case "rc4-56":
return cipherRC456
case "aes-cbc":
return cipherAESCBC
}
return 0
}

func chooseCipher(cipherOpts []string) cipherType {
var avail cipherType
for _, c := range cipherOpts {
avail |= getCipher(c)
}

if avail&cipherRC4 != 0 {
return cipherRC4
}
// if Has(avail, cipher3DES) {
// return cipher3DES
// }
if avail&cipherRC456 != 0 {
return cipherRC456
}
if avail&cipherRC440 != 0 {
return cipherRC440
}
// if Has(avail, cipherDES) {
// return cipherDES
// }
return 0
}
3 changes: 1 addition & 2 deletions internal/rpc/checksum_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ type ChecksumReader struct {
// UseDatanodeHostname specifies whether the datanodes should be connected to
// via their hostnames (if true) or IP addresses (if false).
UseDatanodeHostname bool
// DialFunc is used to connect to the datanodes. If nil, then
// (&net.Dialer{}).DialContext is used.
// DialFunc is used to connect to the datanodes. If nil, then (&net.Dialer{}).DialContext is used
DialFunc func(ctx context.Context, network, addr string) (net.Conn, error)

deadline time.Time
Expand Down
Loading