Skip to content

Commit

Permalink
Merge branch 'master' into spark_2.3_merge
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala
  • Loading branch information
ymahajan committed May 25, 2018
2 parents 7f9c6b3 + 73838ca commit 0190cc4
Show file tree
Hide file tree
Showing 20 changed files with 3,118 additions and 534 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ task checkAll {
dependsOn ":gemfire-connector:check"
}
if (!rootProject.hasProperty('smoke.skip')) {
dependsOn buildDtests, ":snappy-dtests_${scalaBinaryVersion}:scalaTest"
dependsOn buildDtests, ":snappy-dtests_${scalaBinaryVersion}:check"
}
mustRunAfter buildAll
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ class SnappySQLQuerySuite extends SnappyFunSuite {
"(exists (select col1 from r2 where r2.col1=r1.col1) " +
"or exists(select col1 from r3 where r3.col1=r1.col1))")

val result = df.collect()
df.collect()
checkAnswer(df, Seq(Row(1, "1", "1", 100),
Row(2, "2", "2", 2), Row(4, "4", "4", 4) ))
}
Expand Down
246 changes: 235 additions & 11 deletions cluster/src/test/scala/org/apache/spark/sql/store/BugTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
*/
package org.apache.spark.sql.store

import java.util.Properties

import com.pivotal.gemfirexd.Attribute
import com.pivotal.gemfirexd.security.{LdapTestServer, SecurityTestUtils}
import io.snappydata.util.TestUtils
import io.snappydata.{Constant, PlanTest, SnappyFunSuite}
import io.snappydata.{Constant, SnappyFunSuite}
import org.junit.Assert.assertEquals
import org.scalatest.BeforeAndAfterAll
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}

import org.apache.spark.SparkConf

Expand All @@ -33,11 +30,11 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
override def beforeAll(): Unit = {
this.stopAll()
}

protected override def newSparkConf(addOn: (SparkConf) => SparkConf): SparkConf = {
val ldapProperties = SecurityTestUtils.startLdapServerAndGetBootProperties(0, 0, sysUser,
getClass.getResource("/auth.ldif").getPath)
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE}
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SEARCH_BASE, AUTH_LDAP_SERVER}
for (k <- List(Attribute.AUTH_PROVIDER, AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE)) {
System.setProperty(k, ldapProperties.getProperty(k))
}
Expand All @@ -63,11 +60,11 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
if (ldapServer.isServerStarted) {
ldapServer.stopService()
}
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE}
import com.pivotal.gemfirexd.Property.{AUTH_LDAP_SEARCH_BASE, AUTH_LDAP_SERVER}
for (k <- List(Attribute.AUTH_PROVIDER, AUTH_LDAP_SERVER, AUTH_LDAP_SEARCH_BASE)) {
System.clearProperty(k)
System.clearProperty("gemfirexd." + k)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + k)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + k)
}
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.USERNAME_ATTR)
System.clearProperty(Constant.STORE_PROPERTY_PREFIX + Attribute.PASSWORD_ATTR)
Expand All @@ -89,8 +86,7 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {

// TODO : Use the actual connection pool limit
val limit = 500

for (i <- 1 to limit) {
for (_ <- 1 to limit) {
val snc2 = snc.newSession()
snc2.snappySession.conf.set(Attribute.USERNAME_ATTR, user2)
snc2.snappySession.conf.set(Attribute.PASSWORD_ATTR, user2)
Expand All @@ -100,4 +96,232 @@ class BugTest extends SnappyFunSuite with BeforeAndAfterAll {
assertEquals(1, rs.length)
}
}

test("SNAP-2342 nested query involving joins & union throws Exception") {
val user1 = "gemfire1"
val session = snc.newSession()
session.snappySession.conf.set(Attribute.USERNAME_ATTR, user1)
session.snappySession.conf.set(Attribute.PASSWORD_ATTR, user1)

session.sql(s"create table ujli ( " +
"aagmaterial string," +
"accountassignmentgroup string," +
"accounttype string," +
"allocationcycle string," +
"allocationsegment string," +
"asset string," +
"billingdocument string," +
"billingdocumentitem string," +
"bravoequitycode string," +
"bravominorcode string," +
"bsegdocumentlinenumber string," +
"businessplace string," +
"businesstransaction string," +
"controllingarea string," +
"copadocumentnumber string," +
"copaobjectnumber string," +
"costcenter string," +
"costelement string," +
"countryofshiptocustomer string," +
"createdby string," +
"creationtime string," +
"customer string," +
"customergroup string," +
"debitcreditindicator string," +
"distributionchannel string," +
"division string," +
"documentdate string," +
"documentheadertext string," +
"documentlinenumberinsourcesystem string," +
"documentnumberinsourcesystem string," +
"documenttype string," +
"edgcreateditemindoc string," +
"entrydate string," +
"errorstatus string," +
"fidocumentquantity string," +
"fiscalperiod string," +
"fiscalyear string," +
"fsid string," +
"functionalareacode string," +
"glaccountcode string," +
"hleamount string," +
"indexfromcopa string," +
"itemcategory string," +
"itemtext string," +
"kitmaterial string," +
"kittype string," +
"leamount string," +
"lebillingtype string," +
"lecode string," +
"lecurrencycode string," +
"lesalesqty string," +
"lesalesqtyuom string," +
"ledgercode string," +
"localcompanycode string," +
"localdocumenttype string," +
"localfiscalperiod string," +
"localfiscalyear string," +
"localfunctionalareacode string," +
"localglaccountcode string," +
"locallecurrencycode string," +
"localledgercode string," +
"localmrccode string," +
"localprofitcenter string," +
"localsku string," +
"localversioncode string," +
"mrccode string," +
"parentdocumentnumberinsourcesystem string," +
"partnercostcenter string," +
"partnerfunctionalarea string," +
"partnerprofitcenter string," +
"partnersegment string," +
"payer string," +
"pcadocnumber string," +
"pcaitemnumber string," +
"plant string," +
"postingdate string," +
"postingkey string," +
"producthierarchy string," +
"psegment string," +
"rclnt string," +
"reference string," +
"referencedocument string," +
"referencetransaction string," +
"regionofshiptocustomer string," +
"salesdoctype string," +
"salesgroup string," +
"salesoffice string," +
"salesorder string," +
"salesorderitem string," +
"salesorganization string," +
"sectorproductgroup string," +
"shipto string," +
"sleamount string," +
"sourcesystemid string," +
"tradingpartner string," +
"transactioncode string," +
"transactioncurrencyamount string," +
"transactioncurrencycode string," +
"transactiontype string," +
"ujlkey string," +
"valuefieldfromcopa string," +
"vendor string," +
"versioncode string )")

session.sql("create table ujs (" +
"uuid string," +
"bravoequitycode string," +
"controllingarea string," +
"costcenter string," +
"creationtime string," +
"debitcreditindicator string," +
"errstatus string," +
"fiscalyear string," +
"fsid string," +
"functionalareacode string," +
"glaccountcode string," +
"hleamount string," +
"leamount string," +
"lecode string," +
"lecostelement string," +
"lecurrencycode string," +
"leplant string," +
"ledgercode string," +
"localcompanycode string," +
"localfiscalyear string," +
"localfunctionalareacode string," +
"localglaccountcode string," +
"locallecurrencycode string," +
"localledgercode string," +
"localmrccode string," +
"localprofitcenter string," +
"localversioncode string," +
"mrccode string," +
"partnerfunctionalarea string," +
"partnerprofitcenter string," +
"partnersegment string," +
"referencetransaction string," +
"sleamount string," +
"sourceadditionalkey string," +
"sourcesystemid string," +
"tradingpartner string," +
"transactioncurrencyamount string," +
"transactioncurrencycode string," +
"transactiontype string," +
"versioncode string)")

session.sql("create table gfs (" +
"gfs string, " +
" gfsdescription string, " +
" globalfunctionalarea string )")

session.sql("create table bravo (" +
" bravo string," +
"bravodescription string," +
" gfs string, " +
" gfsdescription string)")

session.sql("create table gtw (" +
"gfs string," +
"gfsdescription string," +
"gtw string," +
"gtwdescription string)")

session.sql("create table coa (" +
"accounttype string," +
"errorcode string," +
"errormessage string," +
"errorstatus string," +
"gfs string," +
"gfsdescription string," +
"globalfunctionalarea string," +
"indicevalue string," +
"localfunctionalarea string," +
"localgl string," +
"localgldescription string)")

session.sql(s"create or replace view TrialBalance as " +
s"( select leUniversal,gfs,first(gfsDescription) as gfsDescription, " +
s"first(bravo) as bravo, " +
s"first(bravoDescription) as bravoDescription, first(gtw) as gtw, " +
s"first(gtwDescription) as gtwDescription, " +
s"first(globalFunctionalArea) as globalFunctionalArea," +
s"format_number(sum(credit),2) as credit," +
s" format_number(sum(debit),2) as debit,format_number(sum(total),2) as total from" +
s" ( select a.leCode as leUniversal,a.localCompanyCode as leLocal," +
s" a.mrcCode as mrcUniversal," +
s" a.sourceSystemId as sourceSystem,a.glAccountCode as gfs," +
s"a.localGlAccountCode as localGl," +
s" SUM(hleAmount) as debit,SUM(sleAmount) as credit,SUM(leAmount) as total," +
s" first(b.gfsDescription) as gfsDescription," +
s" first(b.globalFunctionalArea) as globalFunctionalArea," +
s" first((case when a.sourceSystemId='project_one' then e.localGlDescription " +
s" when a.sourceSystemId='btb_latam' then b.gfsDescription else '' end)) " +
s" as localGlDescription ," +
s" first(c.bravoDescription) as bravoDescription," +
s"first(d.gtwDescription) as gtwDescription, " +
s" first(c.bravo) as bravo, first(d.gtw) as gtw from ( select ledgerCode,leCode," +
s" localCompanyCode,mrcCode,fiscalYear,sourceSystemId,localGlAccountCode," +
s" glAccountCode,last(localFunctionalAreaCode),SUM(leAmount) as leAmount," +
s" SUM(hleAmount) as hleAmount,SUM(sleAmount) as sleAmount, glAccountCode ," +
s" 'Local GL' as accountType,localGlAccountCode as localGl from " +
s" ( select ledgerCode,leCode,localCompanyCode,mrcCode,fiscalYear,sourceSystemId," +
s" localGlAccountCode,glAccountCode,localFunctionalAreaCode,leAmount,hleAmount,sleAmount" +
s" from ujli where ledgerCode='0L' and leCode='7600' " +
s" AND fiscalYear='2017' and fiscalPeriod<=3 AND sourceSystemId='btb_latam' union all" +
s" select ledgerCode,leCode,localCompanyCode,mrcCode,fiscalYear,sourceSystemId," +
s" localGlAccountCode,glAccountCode,localFunctionalAreaCode,leAmount,hleAmount," +
s" sleAmount from ujs where ledgerCode='0L' and leCode='7600'" +
s" AND fiscalYear='2017' AND sourceSystemId='btb_latam' ) group by ledgerCode," +
s" leCode,localCompanyCode,mrcCode,fiscalYear,sourceSystemId," +
s" localGlAccountCode,glAccountCode ) a" +
s" left join gfs b on (a.glAccountCode=b.gfs) left join " +
s" bravo c " +
s" on (a.glAccountCode=c.gfs) left join gtw d on (a.glAccountCode=d.gfs)" +
s" left join coa e on(a.accountType=e.accountType and " +
s" a.glAccountCode = e.gfs and a.localGl = e.localGl ) group by a.leCode," +
s"a.localCompanyCode," +
s" a.mrcCode,a.sourceSystemId,a.glAccountCode,a.localGlAccountCode," +
s"c.bravo,d.gtw) group by leUniversal,gfs)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,14 @@ object MemoryManagerCallback extends Logging {
}
allocator.allocate(size, owner).order(ByteOrder.LITTLE_ENDIAN)
}

/** release and accounting for byte buffer allocated by [[allocateExecutionMemory]] */
def releaseExecutionMemory(buffer: ByteBuffer, owner: String, releaseBuffer: Boolean): Unit = {
if (releaseBuffer) BufferAllocator.releaseBuffer(buffer)
if (buffer.hasArray) {
StoreCallbacksImpl.releaseStorageMemory(owner, buffer.capacity(), offHeap = false)
}
}
}

final class DefaultMemoryConsumer(taskMemoryManager: TaskMemoryManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import scala.util.control.NonFatal

import com.gemstone.gemfire.cache.EntryDestroyedException
import com.gemstone.gemfire.internal.cache.{BucketRegion, GemFireCacheImpl, LocalRegion, NonLocalRegionEntry, PartitionedRegion, RegionEntry, TXStateInterface}
import com.gemstone.gemfire.internal.shared.FetchRequest
import com.gemstone.gemfire.internal.shared.unsafe.UnsafeHolder
import com.gemstone.gemfire.internal.shared.{BufferAllocator, FetchRequest}
import com.koloboke.function.IntObjPredicate
import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer
import com.pivotal.gemfirexd.internal.impl.jdbc.EmbedConnection
import io.snappydata.collection.IntObjectHashMap
import io.snappydata.thrift.common.BufferedBlob

import org.apache.spark.memory.MemoryManagerCallback
import org.apache.spark.sql.execution.columnar.encoding.{ColumnDecoder, ColumnDeleteDecoder, ColumnEncoding, UpdatedColumnDecoder, UpdatedColumnDecoderBase}
import org.apache.spark.sql.execution.columnar.impl._
import org.apache.spark.sql.execution.row.PRValuesIterator
Expand Down Expand Up @@ -333,7 +333,7 @@ final class ColumnBatchIteratorOnRS(conn: Connection,
val result = CompressionUtils.codecDecompressIfRequired(
buffer.order(ByteOrder.LITTLE_ENDIAN), allocator)
if (result ne buffer) {
UnsafeHolder.releaseIfDirectBuffer(buffer)
BufferAllocator.releaseBuffer(buffer)
// decompressed buffer will be ordered by LITTLE_ENDIAN while non-decompressed
// is returned with BIG_ENDIAN in order to distinguish the two cases
result
Expand Down Expand Up @@ -404,11 +404,11 @@ final class ColumnBatchIteratorOnRS(conn: Connection,
override def test(col: Int, buffer: ByteBuffer): Boolean = {
// release previous set of buffers immediately
if (buffer ne null) {
if (buffer.isDirect) UnsafeHolder.releaseDirectBuffer(buffer)
// release from accounting if decompressed buffer
else if (buffer.order() eq ByteOrder.LITTLE_ENDIAN) {
StoreCallbacksImpl.releaseStorageMemory(CompressionUtils.DECOMPRESSION_OWNER,
buffer.capacity(), offHeap = false)
if (!BufferAllocator.releaseBuffer(buffer) &&
(buffer.order() eq ByteOrder.LITTLE_ENDIAN)) {
MemoryManagerCallback.releaseExecutionMemory(buffer,
CompressionUtils.DECOMPRESSION_OWNER, releaseBuffer = false)
}
}
true
Expand Down
Loading

0 comments on commit 0190cc4

Please sign in to comment.