Skip to content

Commit

Permalink
[SNAP-2356] release compressed buffers that are skipped (#1040)
Browse files Browse the repository at this point in the history
- added a releaseExecutionMemory in StoreUnifiedManager like acquireExecutionMemory
- in case compression does not reduce size enough, decrement accounting for the
  compressed buffer for heap configuration
- some cleanups to change UnsafeHolder.release calls with BufferAllocator.release
- fixing occasional failure in SnappyStreamingSuite
- exclude abrt-watch-log from suspected exceptions in hydra that runs on CentOS/RHEL by
  default having "Exception" in its command-line and gets flagged as suspect
- add dependency of scalaTest on check for dtests
- additional logging for beforeClass/afterClass invocation
  • Loading branch information
Sumedh Wale authored May 24, 2018
1 parent c8887c6 commit 73838ca
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 84 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ task checkAll {
dependsOn ":gemfire-connector:check"
}
if (!rootProject.hasProperty('smoke.skip')) {
dependsOn buildDtests, ":snappy-dtests_${scalaBinaryVersion}:scalaTest"
dependsOn buildDtests, ":snappy-dtests_${scalaBinaryVersion}:check"
}
mustRunAfter buildAll
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class SnappySQLQuerySuite extends SnappyFunSuite {
"(exists (select col1 from r2 where r2.col1=r1.col1) " +
"or exists(select col1 from r3 where r3.col1=r1.col1))")

val result = df.collect()
df.collect()
checkAnswer(df, Seq(Row(1, "1", "1", 100),
Row(2, "2", "2", 2), Row(4, "4", "4", 4) ))
}
Expand Down
22 changes: 9 additions & 13 deletions cluster/src/test/scala/org/apache/spark/sql/store/BugTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
*/
package org.apache.spark.sql.store

import java.util.Properties

import com.pivotal.gemfirexd.Attribute
import com.pivotal.gemfirexd.security.{LdapTestServer, SecurityTestUtils}
import io.snappydata.util.TestUtils
import io.snappydata.{Constant, PlanTest, SnappyFunSuite}
import io.snappydata.{Constant, SnappyFunSuite}
import org.junit.Assert.assertEquals
import org.scalatest.BeforeAndAfterAll
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}

import org.apache.spark.SparkConf

Expand All @@ -33,11 +30,11 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
override def beforeAll(): Unit = {
this.stopAll()
}

protected override def newSparkConf(addOn: (SparkConf) => SparkConf): SparkConf = {
val ldapProperties = SecurityTestUtils.startLdapServerAndGetBootProperties(0, 0, sysUser,
getClass.getResource("/auth.ldif").getPath)
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE}
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SEARCH_BASE, AUTH_LDAP_SERVER}
for (k <- List(Attribute.AUTH_PROVIDER, AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE)) {
System.setProperty(k, ldapProperties.getProperty(k))
}
Expand All @@ -63,11 +60,11 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
if (ldapServer.isServerStarted) {
ldapServer.stopService()
}
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE}
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SEARCH_BASE, AUTH_LDAP_SERVER}
for (k <- List(Attribute.AUTH_PROVIDER, AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE)) {
System.clearProperty(k)
System.clearProperty("gemfirexd." + k)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + k)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + k)
}
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.USERNAME_ATTR)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.PASSWORD_ATTR)
Expand All @@ -89,8 +86,7 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {

// TODO : Use the actual connection pool limit
val limit = 500

for (i <- 1 to limit) {
for (_ <- 1 to limit) {
val snc2 = snc.newSession()
snc2.snappySession.conf.set(Attribute.USERNAME_ATTR, user2)
snc2.snappySession.conf.set(Attribute.PASSWORD_ATTR, user2)
Expand All @@ -106,7 +102,7 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
val session = snc.newSession()
session.snappySession.conf.set(Attribute.USERNAME_ATTR, user1)
session.snappySession.conf.set(Attribute.PASSWORD_ATTR, user1)

session.sql(s"create table ujli ( " +
"aagmaterial string," +
"accountassignmentgroup string," +
Expand Down Expand Up @@ -212,7 +208,7 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
"vendor string," +
"versioncode string )")

session.sql ("create table ujs (" +
session.sql("create table ujs (" +
"uuid string," +
"bravoequitycode string," +
"controllingarea string," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ object MemoryManagerCallback extends Logging {
}
allocator.allocate(size, owner).order(ByteOrder.LITTLE_ENDIAN)
}

/** release and accounting for byte buffer allocated by [[allocateExecutionMemory]] */
def releaseExecutionMemory(buffer: ByteBuffer, owner: String, releaseBuffer: Boolean): Unit = {
if (releaseBuffer) BufferAllocator.releaseBuffer(buffer)
if (buffer.hasArray) {
StoreCallbacksImpl.releaseStorageMemory(owner, buffer.capacity(), offHeap = false)
}
}
}

final class DefaultMemoryConsumer(taskMemoryManager: TaskMemoryManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import scala.util.control.NonFatal

import com.gemstone.gemfire.cache.EntryDestroyedException
import com.gemstone.gemfire.internal.cache.{BucketRegion, GemFireCacheImpl, LocalRegion, NonLocalRegionEntry, PartitionedRegion, RegionEntry, TXStateInterface}
import com.gemstone.gemfire.internal.shared.FetchRequest
import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder
import com.gemstone.gemfire.internal.shared.{BufferAllocator, FetchRequest}
import com.koloboke.function.IntObjPredicate
import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer
import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection
import io.snappydata.collection.IntObjectHashMap
import io.snappydata.thrift.common.BufferedBlob

import org.apache.spark.memory.MemoryManagerCallback
import org.apache.spark.sql.execution.columnar.encoding.{ColumnDecoder, ColumnDeleteDecoder, ColumnEncoding, UpdatedColumnDecoder, UpdatedColumnDecoderBase}
import org.apache.spark.sql.execution.columnar.impl._
import org.apache.spark.sql.execution.row.PRValuesIterator
Expand Down Expand Up @@ -333,7 +333,7 @@ final class ColumnBatchIteratorOnRS(conn: Connection,
val result = CompressionUtils.codecDecompressIfRequired(
buffer.order(ByteOrder.LITTLE_ENDIAN), allocator)
if (result ne buffer) {
UnsafeHolder.releaseIfDirectBuffer(buffer)
BufferAllocator.releaseBuffer(buffer)
// decompressed buffer will be ordered by LITTLE_ENDIAN while non-decompressed
// is returned with BIG_ENDIAN in order to distinguish the two cases
result
Expand Down Expand Up @@ -404,11 +404,11 @@ final class ColumnBatchIteratorOnRS(conn: Connection,
override def test(col: Int, buffer: ByteBuffer): Boolean = {
// release previous set of buffers immediately
if (buffer ne null) {
if (buffer.isDirect) UnsafeHolder.releaseDirectBuffer(buffer)
// release from accounting if decompressed buffer
else if (buffer.order() eq ByteOrder.LITTLE_ENDIAN) {
StoreCallbacksImpl.releaseStorageMemory(CompressionUtils.DECOMPRESSION_OWNER,
buffer.capacity(), offHeap = false)
if (!BufferAllocator.releaseBuffer(buffer) &&
(buffer.order() eq ByteOrder.LITTLE_ENDIAN)) {
MemoryManagerCallback.releaseExecutionMemory(buffer,
CompressionUtils.DECOMPRESSION_OWNER, releaseBuffer = false)
}
}
true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.gemstone.gemfire.internal.cache.partitioned.PREntriesIterator
import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView
import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer
import com.gemstone.gemfire.internal.shared._
import com.gemstone.gemfire.internal.shared.unsafe.{DirectBufferAllocator, UnsafeHolder}
import com.gemstone.gemfire.internal.shared.unsafe.DirectBufferAllocator
import com.gemstone.gemfire.internal.size.ReflectionSingleObjectSizer.REFERENCE_SIZE
import com.gemstone.gemfire.internal.{ByteBufferDataInput, DSCODE, DSFIDFactory, DataSerializableFixedID, HeapDataOutputStream}
import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils
Expand Down Expand Up @@ -461,14 +461,13 @@ class ColumnFormatValue extends SerializedDiskBuffer
val perfStats = getCachePerfStats(context)
val startDecompression = perfStats.startDecompression()
val decompressed = CompressionUtils.codecDecompress(buffer, allocator, position, -typeId)
val isManagedDirect = allocator.isManagedDirect
try {
// update decompression stats
perfStats.endDecompression(startDecompression)
val newValue = copy(decompressed, isCompressed = false, changeOwnerToStorage = false)
if (!isDirect || this.refCount <= 2) {
val updateStats = (context ne null) && !fromDisk
if (updateStats && !isManagedDirect) {
if (updateStats && !isDirect) {
// acquire the increased memory after decompression
val numBytes = decompressed.capacity() - buffer.capacity()
if (!StoreCallbacksImpl.acquireStorageMemory(context.getFullPath,
Expand All @@ -483,22 +482,18 @@ class ColumnFormatValue extends SerializedDiskBuffer
}
this.columnBuffer = newBuffer
this.decompressionState = 1
if (isDirect) {
UnsafeHolder.releaseDirectBuffer(buffer)
}
allocator.release(buffer)
perfStats.incDecompressedReplaced()
this
} else {
perfStats.incDecompressedReplaceSkipped()
newValue
}
} finally {
if (!isManagedDirect) {
// release the memory acquired for decompression
// (any on-the-fly returned buffer will be part of runtime overhead)
StoreCallbacksImpl.releaseStorageMemory(CompressionUtils.DECOMPRESSION_OWNER,
decompressed.capacity(), offHeap = false)
}
// release the memory acquired for decompression
// (any on-the-fly returned buffer will be part of runtime overhead)
MemoryManagerCallback.releaseExecutionMemory(decompressed,
CompressionUtils.DECOMPRESSION_OWNER, releaseBuffer = false)
}
}
}
Expand All @@ -523,7 +518,6 @@ class ColumnFormatValue extends SerializedDiskBuffer
val compressed = CompressionUtils.codecCompress(compressionCodecId,
buffer, bufferLen, allocator)
if (compressed ne buffer) {
val isManagedDirect = allocator.isManagedDirect
try {
// update compression stats
perfStats.endCompression(startCompression, bufferLen, compressed.limit())
Expand All @@ -534,9 +528,7 @@ class ColumnFormatValue extends SerializedDiskBuffer
val newBuffer = if (compressed.capacity() >= size + 32) {
val trimmed = allocator.allocateForStorage(size).order(ByteOrder.LITTLE_ENDIAN)
trimmed.put(compressed)
if (isDirect) {
UnsafeHolder.releaseDirectBuffer(compressed)
}
allocator.release(compressed)
trimmed.rewind()
trimmed
} else transferToStorage(compressed, allocator)
Expand All @@ -547,11 +539,10 @@ class ColumnFormatValue extends SerializedDiskBuffer
}
this.columnBuffer = newBuffer
this.decompressionState = 0
if (isDirect) {
UnsafeHolder.releaseDirectBuffer(buffer)
}
// release storage memory
if (updateStats && !isManagedDirect) {
allocator.release(buffer)
// release storage memory for the buffer being transferred to storage
// (memory for compressed buffer will be marked released in any case)
if (updateStats && !isDirect) {
StoreCallbacksImpl.releaseStorageMemory(context.getFullPath,
buffer.capacity() - newBuffer.capacity(), offHeap = false)
}
Expand All @@ -566,10 +557,8 @@ class ColumnFormatValue extends SerializedDiskBuffer
} finally {
// release the memory acquired for compression
// (any on-the-fly returned buffer will be part of runtime overhead)
if (!isManagedDirect) {
StoreCallbacksImpl.releaseStorageMemory(CompressionUtils.COMPRESSION_OWNER,
compressed.capacity(), offHeap = false)
}
MemoryManagerCallback.releaseExecutionMemory(compressed,
CompressionUtils.COMPRESSION_OWNER, releaseBuffer = false)
}
} else {
// update skipped compression stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, KryoSerializable}
import com.gemstone.gemfire.cache.IsolationLevel
import com.gemstone.gemfire.internal.cache.{BucketRegion, CachePerfStats, GemFireCacheImpl, LocalRegion, PartitionedRegion, TXManagerImpl}
import com.gemstone.gemfire.internal.shared.SystemProperties
import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder
import com.gemstone.gemfire.internal.shared.{BufferAllocator, SystemProperties}
import com.pivotal.gemfirexd.internal.engine.Misc
import com.pivotal.gemfirexd.internal.engine.ddl.catalog.GfxdSystemProcedures
import com.pivotal.gemfirexd.internal.iapi.services.context.ContextService
Expand Down Expand Up @@ -599,7 +598,7 @@ class JDBCSourceAsColumnarStore(private var _connProperties: ConnectionPropertie
iter.next()
}
// release the batch buffers
batch.buffers.foreach(UnsafeHolder.releaseIfDirectBuffer)
batch.buffers.foreach(b => if (b ne null) BufferAllocator.releaseBuffer(b))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ package org.apache.spark.sql.store

import java.nio.{ByteBuffer, ByteOrder}

import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder
import com.gemstone.gemfire.internal.shared.{BufferAllocator, HeapBufferAllocator, SystemProperties}
import com.ning.compress.lzf.{LZFDecoder, LZFEncoder}
import io.snappydata.Constant
import net.jpountz.lz4.LZ4Factory
import org.xerial.snappy.Snappy

import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec, LZFCompressionCodec, SnappyCompressionCodec}
import org.apache.spark.memory.MemoryManagerCallback.allocateExecutionMemory
import org.apache.spark.memory.MemoryManagerCallback.{allocateExecutionMemory, releaseExecutionMemory}

/**
* Utility methods for compression/decompression.
Expand Down Expand Up @@ -92,8 +91,8 @@ object CompressionUtils {
result.limit(resultLen + COMPRESSION_HEADER_SIZE)
result
} else {
// release the compressed buffer if required
UnsafeHolder.releaseIfDirectBuffer(result)
// release the compressed buffer
releaseExecutionMemory(result, COMPRESSION_OWNER, releaseBuffer = true)
input
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,22 @@ class ColumnTableTest
dataDF.write.insertInto(tableName)

var query = s"SELECT sum(Col1) as summ FROM $tableName where col1 > .0001 having summ > .001"
snc.sql(query).collect
snc.sql(query).collect()

snc.sql(s"create or replace view $viewName as ($query)")

query = s"SELECT sum(Col1) as summ FROM $tableName where col1 > .0001BD having summ > .001bD"
snc.sql(query).collect
snc.sql(query).collect()

snc.sql(s"create or replace view $viewName as ($query)")

query = s"SELECT sum(Col1) as summ FROM $tableName having summ > .001f"
snc.sql(query).collect
snc.sql(query).collect()

snc.sql(s"create or replace view $viewName as ($query)")

query = s"SELECT sum(Col1) as summ FROM $tableName having summ > .001d"
snc.sql(query).collect
snc.sql(query).collect()

snc.sql(s"create or replace view $viewName as ($query)")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ class SnappyStreamingSuite
kafkaUtils.createTopic(topic)

val add = kafkaUtils.brokerAddress
ssnc.sql("drop table if exists directKafkaStream")
ssnc.sql("create stream table directKafkaStream (" +
" publisher string, advertiser string)" +
" using directkafka_stream options(" +
Expand Down
2 changes: 2 additions & 0 deletions dtests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,5 @@ testClasses.doLast {
}
}
}

check.dependsOn test, scalaTest
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import io.snappydata.SnappyTestRunner
import scala.sys.process._

/**
* Class extending can mix match methods like searchExceptions
*/
* Class extending can mix match methods like searchExceptions
*/
class SnappyHydraTestRunner extends SnappyTestRunner {

var SNAPPYDATA_SOURCE_DIR = ""
Expand Down Expand Up @@ -52,19 +52,19 @@ class SnappyHydraTestRunner extends SnappyTestRunner {
val c14 = "grep -v java.lang.reflect.InvocationTargetException"
val c15 = "grep -v org.apache.spark.storage.ShuffleBlockFetcherIterator." +
"throwFetchFailedException"
/*val c16 = "grep -v org.apache.spark.SparkException:[[:space:]]*Exception[[:space:]]*thrown" +
"[[:space:]]*in[[:space:]]*awaitResult"*/
val c16 = Seq("grep", "-v", "org.apache.spark.SparkException: Exception thrown in awaitResult")
/*val c17 = "grep \'status:[[:space:]]*stopping\'[[:space:]]*-e[[:space:]]*\'java.lang" +
".IllegalStateException\'"*/
/* val c17 = "grep \'status:[[:space:]]*stopping\'[[:space:]]*-e[[:space:]]*\'java.lang" +
".IllegalStateException\'" */
val c18 = "grep -v com.gemstone.gemfire.distributed.LockServiceDestroyedException"
/*val c19 = "grep GemFireIOException:[[:space:]]*Current[[:space:]]*operations[[:space:]]*did" +
/*
val c19 = "grep GemFireIOException:[[:space:]]*Current[[:space:]]*operations[[:space:]]*did" +
"[[:space:]]*not[[:space:]]*distribute[[:space:]]*within"
val c20 = "grep SparkException:[[:space:]]*External[[:space:]]*scheduler[[:space:]]*cannot" +
"[[:space:]]*be[[:space:]]*instantiated"*/
"[[:space:]]*be[[:space:]]*instantiated" */
val c21 = Seq("grep", "-v", "Failed to retrieve information for")
val c22 = "grep -v abrt-watch-log"
val command1 = c1 #| c2 #| c3 #| c4 #| c5 #| c6 #| c7 #| c8 #| c12 #| c13 #| c14 #| c15 #|
c16 #| /*c17 #|*/ c18 #| /*c19 #| c20 #|*/ c21
c16 #| /* c17 #| */ c18 #| /* c19 #| c20 #| */ c21 #| c22
// TODO : handle case where the logDir path is incorrect or doesn't exists
try {
val output1: String = command1.!!
Expand All @@ -73,7 +73,7 @@ class SnappyHydraTestRunner extends SnappyTestRunner {
}
catch {
case r: java.lang.RuntimeException =>
if (r.getMessage().contains("Nonzero exit value: 1")) {
if (r.getMessage.contains("Nonzero exit value: 1")) {
// scalastyle:off println
println("No unexpected Exceptions observed during smoke bt run.")
}
Expand All @@ -83,6 +83,4 @@ class SnappyHydraTestRunner extends SnappyTestRunner {
case i: Throwable => throw i
}
}

}

Loading

0 comments on commit 73838ca

Please sign in to comment.