From 54724170ce09d8b9443065a4d2cf198953fa7e96 Mon Sep 17 00:00:00 2001 From: Gabriel Russo Date: Tue, 9 Feb 2021 18:49:23 +0000 Subject: [PATCH] [PLAT-28297] Add a cli option to specify compression --- devbox/agent/src/DevboxAgentMain.scala | 9 ++++++-- devbox/common/src/CompressionMode.scala | 12 +++++++++++ devbox/common/src/Rpc.scala | 2 +- devbox/src/DevboxMain.scala | 11 ++++++++-- devbox/src/syncer/AgentReadWriteActor.scala | 13 +++++++++-- devbox/src/syncer/Syncer.scala | 5 ++++- devbox/test/src/devbox/DevboxTests.scala | 24 +++++++++++++++------ launcher/src/Main.scala | 4 +++- 8 files changed, 65 insertions(+), 15 deletions(-) create mode 100644 devbox/common/src/CompressionMode.scala diff --git a/devbox/agent/src/DevboxAgentMain.scala b/devbox/agent/src/DevboxAgentMain.scala index d9b53c6..1e8802c 100644 --- a/devbox/agent/src/DevboxAgentMain.scala +++ b/devbox/agent/src/DevboxAgentMain.scala @@ -13,6 +13,7 @@ import devbox.common._ import devbox.common.Cli import devbox.common.Cli.Arg import devbox.common.Cli.pathScoptRead +import devbox.common.CompressionMode import os.Path object DevboxAgentMain { @@ -190,11 +191,15 @@ object DevboxAgentMain { } client.writeMsg(Response.Ack()) - case Rpc.WriteChunk(root, path, offset, data) => + case Rpc.WriteChunk(root, path, offset, data, compressed) => val p = wd / root / path + val bytes = compressed match { + case CompressionMode.gzip => Util.gunzip(data.value) + case _ => data.value + } withWritable(p){ - os.write.write(p, data.value, Seq(StandardOpenOption.WRITE), 0, offset) + os.write.write(p, bytes, Seq(StandardOpenOption.WRITE), 0, offset) } client.writeMsg(Response.Ack()) diff --git a/devbox/common/src/CompressionMode.scala b/devbox/common/src/CompressionMode.scala new file mode 100644 index 0000000..dc4b154 --- /dev/null +++ b/devbox/common/src/CompressionMode.scala @@ -0,0 +1,12 @@ +package devbox.common + +import upickle.default.{ReadWriter, readwriter} + +object CompressionMode extends Enumeration { + type CompressionMode = Value + val gzip, ssh, none = Value + + implicit val rw: ReadWriter[CompressionMode] = readwriter[Int].bimap[CompressionMode](_.id, apply(_)) +} + + diff --git a/devbox/common/src/Rpc.scala b/devbox/common/src/Rpc.scala index 9419493..09307c3 100644 --- a/devbox/common/src/Rpc.scala +++ b/devbox/common/src/Rpc.scala @@ -33,7 +33,7 @@ object Rpc{ case class PutLink(root: os.RelPath, path: os.SubPath, dest: String) extends Rpc with PathRpc with Action object PutLink{ implicit val rw: ReadWriter[PutLink] = macroRW } - case class WriteChunk(root: os.RelPath, path: os.SubPath, offset: Long, data: Bytes) extends Rpc + case class WriteChunk(root: os.RelPath, path: os.SubPath, offset: Long, data: Bytes, compression: CompressionMode.Value) extends Rpc object WriteChunk{ implicit val rw: ReadWriter[WriteChunk] = macroRW } case class SetSize(root: os.RelPath, path: os.SubPath, offset: Long) extends Rpc with PathRpc with Action diff --git a/devbox/src/DevboxMain.scala b/devbox/src/DevboxMain.scala index 6e0698e..c41b598 100644 --- a/devbox/src/DevboxMain.scala +++ b/devbox/src/DevboxMain.scala @@ -28,7 +28,8 @@ object DevboxMain { healthCheckInterval: Int = 0, retryInterval: Int = 0, noSync: Boolean = false, - proxyGit: Boolean = true) + proxyGit: Boolean = true, + compression: CompressionMode.Value = CompressionMode.ssh) val signature = Seq( Arg[Config, String]( @@ -81,6 +82,11 @@ object DevboxMain { "Don't sync .git directories and proxy git commands back to the laptop", (c, v) => c.copy(proxyGit = v) ), + Arg[Config, String]( + "compression", None, + s"Enables compression. Possible values are ${CompressionMode.values.mkString(",")}. Defaults to ssh.", + (c, v) => c.copy(compression = CompressionMode.withName(v)) + ), ) @@ -211,7 +217,8 @@ object DevboxMain { case (path, sig) => sig } }, - Option(config.syncIgnore) + Option(config.syncIgnore), + config.compression ) Util.autoclose(syncer){syncer => diff --git a/devbox/src/syncer/AgentReadWriteActor.scala b/devbox/src/syncer/AgentReadWriteActor.scala index 3cb8c8c..5835105 100644 --- a/devbox/src/syncer/AgentReadWriteActor.scala +++ b/devbox/src/syncer/AgentReadWriteActor.scala @@ -17,7 +17,8 @@ object AgentReadWriteActor{ case class Close() extends Msg } class AgentReadWriteActor(agent: AgentApi, - onResponse: Response => Unit) + onResponse: Response => Unit, + compression: CompressionMode.Value) (implicit ac: castor.Context, logger: SyncLogger) extends castor.StateMachineActor[AgentReadWriteActor.Msg](){ @@ -210,11 +211,19 @@ class AgentReadWriteActor(agent: AgentApi, } }) () + val data = if (n < byteArr.length) byteArr.take(n) else byteArr + val bytes = new Bytes( + compression match { + case CompressionMode.gzip => Util.gzip(data) + case _ => data + } + ) val msg = Rpc.WriteChunk( dest, subPath, offset, - new Bytes(if (n < byteArr.length) byteArr.take(n) else byteArr) + bytes, + compression, ) logger.filesAndBytes(0, n) Some(msg) diff --git a/devbox/src/syncer/Syncer.scala b/devbox/src/syncer/Syncer.scala index 8d24a52..fad3536 100644 --- a/devbox/src/syncer/Syncer.scala +++ b/devbox/src/syncer/Syncer.scala @@ -16,12 +16,14 @@ class Syncer(agent: AgentApi, debounceMillis: Int, proxyGit: Boolean, signatureTransformer: (os.SubPath, Sig) => Sig, - syncIgnore: Option[String]) + syncIgnore: Option[String], + compression: CompressionMode.Value) (implicit ac: castor.Context, logger: SyncLogger) extends AutoCloseable{ val syncIgnoreRegex = syncIgnore.map(com.google.re2j.Pattern.compile(_)) println(s"Syncing ${mapping.map{case (from, to) => s"$from:$to"}.mkString(", ")}") + println(s"Compression mode: ${compression}") /** Skippers to use on each repository, used by the SkipScan actor and also sent to the remote endpoint */ val skippers = mapping.map { case (base, _) => Skipper.fromString(ignoreStrategy, base, proxyGit) } @@ -29,6 +31,7 @@ class Syncer(agent: AgentApi, val agentActor: AgentReadWriteActor = new AgentReadWriteActor( agent, x => skipActor.send(SkipScanActor.Receive(x)), + compression, ) val syncActor = new SyncActor( diff --git a/devbox/test/src/devbox/DevboxTests.scala b/devbox/test/src/devbox/DevboxTests.scala index 925d4f9..aecbf2d 100644 --- a/devbox/test/src/devbox/DevboxTests.scala +++ b/devbox/test/src/devbox/DevboxTests.scala @@ -33,6 +33,12 @@ object DevboxTests extends TestSuite{ test("git") - testCase(1, ignoreStrategy = "") test("restart") - testCase(1, restartInterval = Some(1)) test("reconnect") - testCase(1, randomKillInterval = Some(50)) + + // Test different compression modes + test("compression-gzip") - testCase(1, compression = CompressionMode.gzip) + // ssh compression here doesn't do anything since we don't ssh anywhere. + test("compression-ssh") - testCase(1, compression = CompressionMode.ssh) + test("compression-none") - testCase(1, compression = CompressionMode.none) } test("oslib") { @@ -89,7 +95,8 @@ object DevboxTests extends TestSuite{ ignoreStrategy: String = "dotgit", restartInterval: Option[Int] = None, randomKillInterval: Option[Int] = None, - parallel: Boolean = true) + parallel: Boolean = true, + compression: CompressionMode.Value = CompressionMode.none) (implicit tp: TestPath) = { walkValidate( tp.value.mkString("-"), @@ -99,7 +106,8 @@ object DevboxTests extends TestSuite{ ignoreStrategy = ignoreStrategy, restartInterval = restartInterval, randomKillInterval = randomKillInterval, - parallel = parallel + parallel = parallel, + compression = compression, ) } def walkValidate(label: String, @@ -110,7 +118,8 @@ object DevboxTests extends TestSuite{ ignoreStrategy: String = "dotgit", restartInterval: Option[Int] = None, randomKillInterval: Option[Int] = None, - parallel: Boolean = true) = { + parallel: Boolean = true, + compression: CompressionMode.Value = CompressionMode.none) = { val (src, dest, log, commits, commitsIndicesToCheck, repo) = initializeWalk(label, uri, validateInterval, commitIndicesToCheck0) @@ -134,7 +143,8 @@ object DevboxTests extends TestSuite{ ignoreStrategy, exitOnError = true, signatureMapping = (_, sig) => sig, - randomKill = randomKillInterval + randomKill = randomKillInterval, + compression = compression, ) (logger, ac, syncer) @@ -262,7 +272,8 @@ object DevboxTests extends TestSuite{ exitOnError: Boolean, signatureMapping: (os.SubPath, Sig) => Sig, healthCheckInterval: Int = 0, - randomKill: Option[Int] = None) + randomKill: Option[Int] = None, + compression: CompressionMode.Value = CompressionMode.none) (implicit ac: castor.Context, logger: SyncLogger) = { @@ -288,7 +299,8 @@ object DevboxTests extends TestSuite{ debounceMillis, proxyGit = false, signatureTransformer = signatureMapping, - syncIgnore = None + syncIgnore = None, + compression = compression ) syncer } diff --git a/launcher/src/Main.scala b/launcher/src/Main.scala index 90e29f8..a799029 100644 --- a/launcher/src/Main.scala +++ b/launcher/src/Main.scala @@ -3,6 +3,7 @@ package launcher import cmdproxy.ProxyServer import devbox.DevboxMain import devbox.common.Cli +import devbox.common.CompressionMode object Main { def main(args: Array[String]): Unit = { @@ -60,7 +61,8 @@ object Main { prepResult.exitCode == 0 }, { (port: Option[Int]) => Seq( - "ssh", "-C", + "ssh", + "-o", s"Compression=${if (config.compression == CompressionMode.ssh) "yes" else "no"}", "-o", "ExitOnForwardFailure=yes", "-o", "ServerAliveInterval=4", "-o", "ServerAliveCountMax=4"