Skip to content

Commit

Permalink
update build config and add test (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored and Wu Tao committed Jul 5, 2019
1 parent 5121fef commit ed65027
Show file tree
Hide file tree
Showing 16 changed files with 1,186 additions and 663 deletions.
18 changes: 18 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
language: scala

jdk:
- oraclejdk8

scala:
- "2.11.7"
- "2.12.7"

cache:
directories:
- $HOME/.m2/repository
- $HOME/.sbt
- $HOME/.ivy2

script:
- ./scripts/travis.sh

32 changes: 27 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
version := "0.0.4-SNAPSHOT"
version := "1.11.5-1-SNAPSHOT"

organization := "com.xiaomi"
organization := "com.xiaomi.infra"

name := "pegasus-scala-client"

scalaVersion := "2.11.7"
scalaVersion := "2.12.7"

crossScalaVersions := Seq("2.11.7", "2.12.7")

publishMavenStyle := true

scalafmtOnCompile := true

//custom repository
resolvers ++= Seq(
//"Remote Maven Repository" at "http://your-url/",
"Local Maven Repository" at "file://" + Path.userHome.absolutePath + "/.m2/repository"
)

//custom publish url
publishTo := {
val nexus = "http://your-url/"
if (isSnapshot.value) Some("snapshots" at nexus + "snapshots")
else Some("releases" at nexus + "releases")
}

credentials += Credentials(
new File((Path.userHome / ".sbt" / ".credentials").toString()))

libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "20.0",
"com.xiaomi.infra" % "pegasus-client" % "1.11.2-thrift-0.11.0-inlined",
"com.google.guava" % "guava" % "21.0",
"com.xiaomi.infra" % "pegasus-client" % "1.11.5-thrift-0.11.0-inlined",
"org.scalatest" %% "scalatest" % "3.0.3" % Test
)
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.13
sbt.version=1.2.8
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.1")
4 changes: 4 additions & 0 deletions scripts/format-all.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/usr/bin/env bash

# scala format tool,see https://github.com/scalameta/scalafmt
sbt scalafmtSbt scalafmt test:scalafmt
44 changes: 44 additions & 0 deletions scripts/travis.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/usr/bin/env bash

set -e

SCRIPT_DIR=$(dirname "${BASH_SOURCE[0]}")
PROJECT_DIR=$(dirname "${SCRIPT_DIR}")
cd "${PROJECT_DIR}" || exit 1

# lint all scripts, abort if there's any warning.
function shellcheck_must_pass()
{
if [[ $(shellcheck "$1") ]]; then
echo "shellcheck $1 failed"
shellcheck "$1"
exit 1
fi
}
shellcheck_must_pass ./scripts/travis.sh

# check format
sbt scalafmtSbtCheck scalafmtCheck test:scalafmtCheck

# install java-client dependency
git clone https://github.com/XiaoMi/pegasus-java-client.git
cd pegasus-java-client
git checkout 1.11.5-thrift-0.11.0-inlined-release
mvn clean package -DskipTests
mvn clean install -DskipTests
cd ..

# start pegasus onebox environment
wget https://github.com/XiaoMi/pegasus/releases/download/v1.11.5/pegasus-server-1.11.5-ba0661d--release.zip
unzip pegasus-server-1.11.5-ba0661d--release.zip
cd pegasus-server-1.11.5-ba0661d--release

./run.sh start_onebox -w
cd ../

if ! sbt test
then
cd pegasus-server-1.11.5-ba0661d--release
./run.sh list_onebox
exit 1
fi
33 changes: 19 additions & 14 deletions src/main/scala/com/xiaomi/infra/pegasus/scalaclient/Models.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,33 @@ import com.xiaomi.infra.pegasus.client.PException
import com.xiaomi.infra.pegasus.client.{FilterType, PException}
import com.xiaomi.infra.pegasus.scalaclient.{Serializer => SER}
import scala.concurrent.duration._

/**
* [Copyright]
* Author: oujinliang
* 3/27/18 8:29 PM
*/

case class PegasusResult(bytes: Array[Byte]) {
def as[V](implicit ser: SER[V]): V = ser.deserialize(bytes)
def asOpt[V](implicit ser: SER[V]): Option[V] = if (isNull) None else Some(ser.deserialize(bytes))
def isNull = bytes == null
def as[V](implicit ser: SER[V]): V = ser.deserialize(bytes)
def asOpt[V](implicit ser: SER[V]): Option[V] =
if (isNull) None else Some(ser.deserialize(bytes))
def isNull = bytes == null
}

case class PegasusResultList(list: List[Array[Byte]]) {
def as[V](implicit ser: SER[V]): List[V] = list.map(ser.deserialize)
def asOpt[V](implicit ser: SER[V]): List[Option[V]] = list.map {v => if (v == null) None else Some(ser.deserialize(v))}
def as[V](implicit ser: SER[V]): List[V] = list.map(ser.deserialize)
def asOpt[V](implicit ser: SER[V]): List[Option[V]] = list.map { v =>
if (v == null) None else Some(ser.deserialize(v))
}
}

case class PegasusKey[H, S](hashKey: H, sortKey: S)

case class BatchGetResult[V](count: Int, result: List[Either[PException, V]])

case class BatchMultiGetResult[H, S, V](count: Int, values: List[Either[PException, HashKeyData[H, S, V]]])
case class BatchMultiGetResult[H, S, V](
count: Int,
values: List[Either[PException, HashKeyData[H, S, V]]])

case class MultiGetResult[S, V](allFetched: Boolean, values: List[(S, V)])

Expand All @@ -39,11 +44,11 @@ case class MultiGetSortKeysResult[S](allFetched: Boolean, values: List[S])

object Options {

case class MultiGet(
startInclusive: Boolean = true,
stopInclusive: Boolean = false, // if the stopSortKey is included
sortKeyFilterType: FilterType = FilterType.FT_NO_FILTER, // filter type for sort key
sortKeyFilterPattern: Array[Byte] = null, // filter pattern for sort key
noValue: Boolean = false // only fetch hash_key and sort_key, but not fetch value
)
case class MultiGet(
startInclusive: Boolean = true,
stopInclusive: Boolean = false, // if the stopSortKey is included
sortKeyFilterType: FilterType = FilterType.FT_NO_FILTER, // filter type for sort key
sortKeyFilterPattern: Array[Byte] = null, // filter pattern for sort key
noValue: Boolean = false // only fetch hash_key and sort_key, but not fetch value
)
}
191 changes: 111 additions & 80 deletions src/main/scala/com/xiaomi/infra/pegasus/scalaclient/PegasusUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ package com.xiaomi.infra.pegasus.scalaclient

import java.util

import com.xiaomi.infra.pegasus.client.{FilterType, MultiGetOptions, PegasusTableInterface, HashKeyData => PHashKeyData, SetItem => PSetItem}
import com.xiaomi.infra.pegasus.client.{
FilterType,
MultiGetOptions,
PegasusTableInterface,
HashKeyData => PHashKeyData,
SetItem => PSetItem
}
import com.xiaomi.infra.pegasus.scalaclient.{Serializer => SER}
import org.apache.commons.lang3.tuple.Pair

Expand All @@ -14,87 +20,112 @@ import scala.concurrent.duration._
* Author: oujinliang
* 3/15/18 10:34 PM
*/

private [scalaclient] trait PegasusUtil {
implicit def convertOption(option: Options.MultiGet) = {
val result = new MultiGetOptions()
result.startInclusive = option.startInclusive
result.stopInclusive = option.stopInclusive
result.sortKeyFilterType = option.sortKeyFilterType
result.sortKeyFilterPattern = option.sortKeyFilterPattern
result.noValue = option.noValue
result
}

implicit def ser[A](a: A)(implicit ser: SER[A]): Array[Byte] = ser.serialize(a)
implicit def ser[A](list: Seq[A])(implicit ser: SER[A]): util.List[Array[Byte]] = {
val javaList = new util.ArrayList[Array[Byte]](list.size)
list.foreach { a => javaList.add(ser.serialize(a)) }
javaList
}

implicit def deser[A](bytes: Array[Byte])(implicit ser: SER[A]): A = ser.deserialize(bytes)

@inline implicit def durationToMs(timeout: Duration): Int = timeout.toMillis.toInt

def pegasusKeysToPairList[H, S](keys: Seq[PegasusKey[H, S]])
(implicit hSer: SER[H], sSer: SER[S]): util.List[Pair[Array[Byte], Array[Byte]]] = {
keys.map(k => Pair.of(hSer.serialize(k.hashKey), sSer.serialize(k.sortKey)))
private[scalaclient] trait PegasusUtil {
implicit def convertOption(option: Options.MultiGet) = {
val result = new MultiGetOptions()
result.startInclusive = option.startInclusive
result.stopInclusive = option.stopInclusive
result.sortKeyFilterType = option.sortKeyFilterType
result.sortKeyFilterPattern = option.sortKeyFilterPattern
result.noValue = option.noValue
result
}

implicit def ser[A](a: A)(implicit ser: SER[A]): Array[Byte] =
ser.serialize(a)
implicit def ser[A](list: Seq[A])(
implicit ser: SER[A]): util.List[Array[Byte]] = {
val javaList = new util.ArrayList[Array[Byte]](list.size)
list.foreach { a =>
javaList.add(ser.serialize(a))
}

def pairToEither[A, B](pair: Pair[A, B]): Either[A, B] = {
if (pair.getLeft != null) Left(pair.getLeft) else Right(pair.getRight)
}

def pairToEither[A, B, C](pair: Pair[A, B], f: B => C): Either[A, C] = {
if (pair.getLeft != null) Left(pair.getLeft) else Right(f(pair.getRight))
}

def bytesPairToTuple[A, B](pair: Pair[Array[Byte], Array[Byte]])
(implicit aSer: SER[A], bSer: SER[B]) = {
(aSer.deserialize(pair.getLeft), bSer.deserialize(pair.getRight))
}

def tupleToBytesPair[A, B](t: (A, B))(implicit aSer: SER[A], bSer: SER[B]): Pair[Array[Byte], Array[Byte]] = {
Pair.of(aSer.serialize(t._1), bSer.serialize(t._2))
javaList
}

implicit def deser[A](bytes: Array[Byte])(implicit ser: SER[A]): A =
ser.deserialize(bytes)

@inline implicit def durationToMs(timeout: Duration): Int =
timeout.toMillis.toInt

def pegasusKeysToPairList[H, S](keys: Seq[PegasusKey[H, S]])(
implicit hSer: SER[H],
sSer: SER[S]): util.List[Pair[Array[Byte], Array[Byte]]] = {
keys.map(k => Pair.of(hSer.serialize(k.hashKey), sSer.serialize(k.sortKey)))
}

def pairToEither[A, B](pair: Pair[A, B]): Either[A, B] = {
if (pair.getLeft != null) Left(pair.getLeft) else Right(pair.getRight)
}

def pairToEither[A, B, C](pair: Pair[A, B], f: B => C): Either[A, C] = {
if (pair.getLeft != null) Left(pair.getLeft) else Right(f(pair.getRight))
}

def bytesPairToTuple[A, B](pair: Pair[Array[Byte], Array[Byte]])(
implicit aSer: SER[A],
bSer: SER[B]) = {
(aSer.deserialize(pair.getLeft), bSer.deserialize(pair.getRight))
}

def tupleToBytesPair[A, B](t: (A, B))(
implicit aSer: SER[A],
bSer: SER[B]): Pair[Array[Byte], Array[Byte]] = {
Pair.of(aSer.serialize(t._1), bSer.serialize(t._2))
}

def toListOfOption[A](list: java.util.List[A]): List[Option[A]] = {
list.toList.map(Option.apply)
}

def convertMultiGetKeys[H, S](keys: Seq[(H, Seq[S])])(implicit hSer: SER[H],
sSer: SER[S])
: java.util.List[Pair[Array[Byte], util.List[Array[Byte]]]] = {
keys.map {
case (hashKey, sortKeys) =>
Pair.of(hSer.serialize(hashKey), ser(sortKeys))
}

def toListOfOption[A](list: java.util.List[A]): List[Option[A]] = {
list.toList.map(Option.apply)
}

def convertMultiGetKeys[H, S](keys: Seq[(H, Seq[S])])(implicit hSer: SER[H], sSer: SER[S]): java.util.List[Pair[Array[Byte], util.List[Array[Byte]]]] = {
keys.map { case (hashKey, sortKeys) =>
Pair.of(hSer.serialize(hashKey), ser(sortKeys))
}
}

def convertToHashKeyData[H, S, V](data: PHashKeyData)
(implicit hSer: SER[H], sSer: SER[S], vSer: SER[V]): HashKeyData[H, S, V] = {
HashKeyData[H, S, V](hSer.deserialize(data.hashKey), data.values.toList.map(bytesPairToTuple[S, V]))
}

def convertValue[S, V](value: (S, Array[Byte]))(implicit ser: SER[V]) = (value._1, ser.deserialize(value._2))

def convertHashKeyData[H, S, V](data: HashKeyData[H, S, Array[Byte]])(implicit ser: SER[V]): HashKeyData[H, S, V] = {
HashKeyData[H, S, V](data.hashKey, data.values.map(convertValue[S, V]))
}

def convertHashKeyData[H, S, V](data: HashKeyData[H, S, V])
(implicit hSer: SER[H], sSer: SER[S], vSer: SER[V]): PHashKeyData = {
new PHashKeyData(data.hashKey, data.values.map(tupleToBytesPair(_)))
}

def convertMultiGetResult[S](result: PegasusTableInterface.MultiGetResult)(implicit sSer: SER[S]) = {
MultiGetResult(result.allFetched, result.values.toList.map {p => (sSer.deserialize(p.getLeft), p.getRight)})
}

def convertSetItem[H, S, V](item: SetItem[H, S, V])
(implicit hSer: SER[H], sSer: SER[S], vSer: SER[V]) = {
new PSetItem(item.hashKey, item.sortKey, item.value, item.ttl.toSeconds.toInt)
}

}

def convertToHashKeyData[H, S, V](data: PHashKeyData)(
implicit hSer: SER[H],
sSer: SER[S],
vSer: SER[V]): HashKeyData[H, S, V] = {
HashKeyData[H, S, V](hSer.deserialize(data.hashKey),
data.values.toList.map(bytesPairToTuple[S, V]))
}

def convertValue[S, V](value: (S, Array[Byte]))(implicit ser: SER[V]) =
(value._1, ser.deserialize(value._2))

def convertHashKeyData[H, S, V](data: HashKeyData[H, S, Array[Byte]])(
implicit ser: SER[V]): HashKeyData[H, S, V] = {
HashKeyData[H, S, V](data.hashKey, data.values.map(convertValue[S, V]))
}

def convertHashKeyData[H, S, V](data: HashKeyData[H, S, V])(
implicit hSer: SER[H],
sSer: SER[S],
vSer: SER[V]): PHashKeyData = {
new PHashKeyData(data.hashKey, data.values.map(tupleToBytesPair(_)))
}

def convertMultiGetResult[S](result: PegasusTableInterface.MultiGetResult)(
implicit sSer: SER[S]) = {
MultiGetResult(result.allFetched, result.values.toList.map { p =>
(sSer.deserialize(p.getLeft), p.getRight)
})
}

def convertSetItem[H, S, V](item: SetItem[H, S, V])(implicit hSer: SER[H],
sSer: SER[S],
vSer: SER[V]) = {
new PSetItem(item.hashKey,
item.sortKey,
item.value,
item.ttl.toSeconds.toInt)
}

}

object PegasusUtil extends PegasusUtil
object PegasusUtil extends PegasusUtil
Loading

0 comments on commit ed65027

Please sign in to comment.