Skip to content

Commit

Permalink
Extract the native component without copying it into memory (#198)
Browse files Browse the repository at this point in the history
Use message digest filter streams to limit the required memory during
native executable extraction.
  • Loading branch information
pfifer authored Apr 10, 2018
1 parent 191ecf4 commit a51acee
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 45 deletions.
23 changes: 16 additions & 7 deletions java/amazon-kinesis-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,6 @@
<artifactId>protobuf-java</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.13</version>
<optional>true</optional>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
Expand All @@ -109,13 +102,29 @@
<artifactId>confluex-mock-http</artifactId>
<version>0.4.3</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.2.22</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.7</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ private void deletePipes() {
}

private void startChildProcess() throws IOException, InterruptedException {
log.info("Asking for trace");
List<String> args = new ArrayList<>(Arrays.asList(pathToExecutable, "-o", outPipe.getAbsolutePath(), "-i",
inPipe.getAbsolutePath(), "-c", protobufToHex(config.toProtobufMessage()), "-k",
protobufToHex(makeSetCredentialsMessage(config.getCredentialsProvider(), false)), "-t"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.producer;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.FileLock;
import java.security.DigestInputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.util.Arrays;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashedFileCopier {
private static final Logger log = LoggerFactory.getLogger(HashedFileCopier.class);

static final String MESSAGE_DIGEST_ALGORITHM = "SHA-1";
static final String TEMP_PREFIX = "kpl";
static final String TEMP_SUFFIX = ".tmp";
static final String LOCK_SUFFIX = ".lock";

public static File copyFileFrom(InputStream sourceData, File destinationDirectory, String fileNameFormat)
throws Exception {
File tempFile = null;
try {
tempFile = File.createTempFile(TEMP_PREFIX, TEMP_SUFFIX, destinationDirectory);
log.debug("Extracting file with format {}", fileNameFormat);
FileOutputStream fileOutputStream = new FileOutputStream(tempFile);

DigestOutputStream digestOutputStream = new DigestOutputStream(fileOutputStream,
MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM));
IOUtils.copy(sourceData, digestOutputStream);
digestOutputStream.close();
byte[] digest = digestOutputStream.getMessageDigest().digest();
log.debug("Calculated digest of new file: {}", Arrays.toString(digest));
String digestHex = DatatypeConverter.printHexBinary(digest);
File finalFile = new File(destinationDirectory, String.format(fileNameFormat, digestHex));
File lockFile = new File(destinationDirectory, String.format(fileNameFormat + LOCK_SUFFIX, digestHex));
log.debug("Preparing to check and copy {} to {}", tempFile.getAbsolutePath(), finalFile.getAbsolutePath());
try (FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock()) {
if (finalFile.exists() && finalFile.length() == tempFile.length()) {
byte[] existingFileDigest = null;
try (DigestInputStream digestInputStream = new DigestInputStream(new FileInputStream(finalFile),
MessageDigest.getInstance(MESSAGE_DIGEST_ALGORITHM))) {
byte[] discardedBytes = new byte[8192];
while (digestInputStream.read(discardedBytes) != -1) {
//
// This is just used for the side affect of the digest input stream
//
}
existingFileDigest = digestInputStream.getMessageDigest().digest();
}
if (Arrays.equals(digest, existingFileDigest)) {
//
// The existing file matches the expected file, it's ok to just drop out now
//
log.info("'{}' already exists, and matches. Not overwriting.", finalFile.getAbsolutePath());
return finalFile;
}
log.warn(
"Detected a mismatch between the existing file, and the new file. "
+ "Will overwrite the existing file. " + "Existing: {} -- New File: {}",
Arrays.toString(existingFileDigest), Arrays.toString(digest));
}

if (!tempFile.renameTo(finalFile)) {
log.error("Failed to rename '{}' to '{}'", tempFile.getAbsolutePath(), finalFile.getAbsolutePath());
throw new IOException("Failed to rename extracted file");
}
}
return finalFile;
} finally {
if (tempFile != null && tempFile.exists()) {
if (!tempFile.delete()) {
log.warn("Unable to delete temp file: {}", tempFile.getAbsolutePath());
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -861,8 +861,21 @@ private String extractBinaries() {
if (binPath != null && !binPath.trim().isEmpty()) {
pathToExecutable = binPath.trim();
log.warn("Using non-default native binary at " + pathToExecutable);
pathToLibDir = "";
return "";

File parent = new File(binPath).getParentFile();
pathToLibDir = parent.getAbsolutePath();
CertificateExtractor certificateExtractor = new CertificateExtractor();

try {
String caDirectory = certificateExtractor
.extractCertificates(parent.getAbsoluteFile());
watchFiles.addAll(certificateExtractor.getExtractedCertificates());
FileAgeManager.instance().registerFiles(watchFiles);
return caDirectory;
} catch (IOException ioex) {
log.error("Exception while extracting certificates. Returning no CA directory", ioex);
return "";
}
} else {
log.info("Extracting binaries to " + tmpDir);
try {
Expand All @@ -873,39 +886,14 @@ private String extractBinaries() {

String extension = os.equals("windows") ? ".exe" : "";
String executableName = "kinesis_producer" + extension;
byte[] bin = IOUtils.toByteArray(
this.getClass().getClassLoader().getResourceAsStream(root + "/" + os + "/" + executableName));
MessageDigest md = MessageDigest.getInstance("SHA1");
String mdHex = DatatypeConverter.printHexBinary(md.digest(bin)).toLowerCase();

pathToExecutable = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + extension).toString();
File extracted = new File(pathToExecutable);
watchFiles.add(extracted);

// use dedicated lock-file to limit access to executable by a single process
final String pathToLock = Paths.get(pathToTmpDir, "kinesis_producer_" + mdHex + ".lock").toString();
final File lockFile = new File(pathToLock);
try (FileOutputStream lockFOS = new FileOutputStream(lockFile);
FileLock lock = lockFOS.getChannel().lock()) {
if (extracted.exists()) {
boolean contentEqual = false;
if (extracted.length() == bin.length) {
try (InputStream executableIS = new FileInputStream(extracted)) {
byte[] existingBin = IOUtils.toByteArray(executableIS);
contentEqual = Arrays.equals(bin, existingBin);
}
}
if (!contentEqual) {
throw new SecurityException("The contents of the binary " + extracted.getAbsolutePath()
+ " is not what it's expected to be.");
}
} else {
try (OutputStream fos = new FileOutputStream(extracted)) {
IOUtils.write(bin, fos);
}
extracted.setExecutable(true);
}
}
InputStream is = this.getClass().getClassLoader().getResourceAsStream(root + "/" + os + "/" + executableName);
String resultFileFormat = "kinesis_producer_%s" + extension;

File extracted = HashedFileCopier.copyFileFrom(is, tmpDirFile, resultFileFormat);
watchFiles.add(extracted);
extracted.setExecutable(true);
pathToExecutable = extracted.getAbsolutePath();

CertificateExtractor certificateExtractor = new CertificateExtractor();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.producer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.DigestInputStream;
import java.security.MessageDigest;

import javax.xml.bind.DatatypeConverter;

import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashedFileCopierTest {

private static final Logger log = LoggerFactory.getLogger(HashedFileCopierTest.class);

private static final String TEST_FILE_PREFIX = "res-file.";
private static final String TEST_FILE_SUFFIX = ".txt";
private static final String TEST_FILE_FORMAT = TEST_FILE_PREFIX + "%s" + TEST_FILE_SUFFIX;

private File tempDir;

@Before
public void before() throws Exception {
tempDir = Files.createTempDirectory("kpl-unit-tests").toFile();
}

@After
public void after() throws Exception {
int extraTempFiles = 0;
if (tempDir != null && tempDir.isDirectory()) {
DirectoryStream<Path> directoryStream = Files.newDirectoryStream(tempDir.toPath());
for (Path entry : directoryStream) {
String filename = entry.toFile().getName();
if (filename.startsWith(HashedFileCopier.TEMP_PREFIX)
&& filename.endsWith(HashedFileCopier.TEMP_SUFFIX)) {
Files.delete(entry);
extraTempFiles++;
continue;
}
if (filename.startsWith(TEST_FILE_PREFIX) && filename.endsWith(TEST_FILE_SUFFIX)) {
Files.delete(entry);
continue;
}
if (filename.startsWith(TEST_FILE_PREFIX) && filename.endsWith(HashedFileCopier.LOCK_SUFFIX)) {
Files.delete(entry);
continue;
}
log.warn("Unexpected file {} found. Not deleting the file.", entry);
}
Files.delete(tempDir.toPath());
}

assertThat("Copier didn't clean up all temporary files.", extraTempFiles, equalTo(0));
}

@Test
public void normalFileCopyTest() throws Exception {

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
File expectedFile = makeTestFile();

assertThat(resultFile, equalTo(expectedFile));
assertThat(expectedFile.exists(), equalTo(true));

byte[] writtenBytes = Files.readAllBytes(resultFile.toPath());
byte[] expectedBytes = IOUtils.toByteArray(testDataInputStream());

assertThat(writtenBytes, equalTo(expectedBytes));

}

@Test
public void fileExistsTest() throws Exception {
File expectedFile = makeTestFile();
try (FileOutputStream fso = new FileOutputStream(expectedFile)) {
IOUtils.copy(testDataInputStream(), fso);
}
File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

@Test
public void lengthMismatchTest() throws Exception {
File expectedFile = makeTestFile();
FileOutputStream fso = new FileOutputStream(expectedFile);
IOUtils.copy(testDataInputStream(), fso);
fso.write("This is some extra crap".getBytes(Charset.forName("UTF-8")));
fso.close();

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

@Test
public void hashMismatchTest() throws Exception {
File expectedFile = makeTestFile();
byte[] testData = testDataBytes();
testData[10] = (byte)~testData[10];

Files.write(expectedFile.toPath(), testData);

File resultFile = HashedFileCopier.copyFileFrom(testDataInputStream(), tempDir, TEST_FILE_FORMAT);
assertThat(resultFile, equalTo(expectedFile));

byte[] expectedData = testDataBytes();
byte[] actualData = Files.readAllBytes(resultFile.toPath());

assertThat(actualData, equalTo(expectedData));
}

private File makeTestFile() throws Exception {
return new File(tempDir, String.format(TEST_FILE_FORMAT, hexDigestForTestData()));
}

private String hexDigestForTestData() throws Exception {
return DatatypeConverter.printHexBinary(hashForTestData());
}

private byte[] testDataBytes() throws Exception {
return IOUtils.toByteArray(testDataInputStream());
}

private byte[] hashForTestData() throws Exception {
DigestInputStream dis = new DigestInputStream(testDataInputStream(), MessageDigest.getInstance(HashedFileCopier.MESSAGE_DIGEST_ALGORITHM));
IOUtils.toByteArray(dis);
return dis.getMessageDigest().digest();
}

private InputStream testDataInputStream() {
return this.getClass().getClassLoader().getResourceAsStream("test-data/test.txt");
}
}
Loading

0 comments on commit a51acee

Please sign in to comment.