Skip to content

Commit

Permalink
Add for other spark versions.
Browse files Browse the repository at this point in the history
  • Loading branch information
Imbruced committed Oct 11, 2024
1 parent 5b88709 commit 5fa9331
Show file tree
Hide file tree
Showing 28 changed files with 3,144 additions and 19 deletions.
18 changes: 18 additions & 0 deletions spark/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,24 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.20.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
<version>1.20.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.minio</groupId>
<artifactId>minio</artifactId>
<version>8.5.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata.GeoParquetMetadataDataSource
org.apache.sedona.sql.datasources.shapefile.ShapefileDataSource
org.apache.sedona.sql.datasourhices.shapefile.ShapefileDataSource
org.apache.sedona.sql.datasources.geopackage.GeoPackageDataSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.hadoop.fs.Path
import org.apache.sedona.sql.datasources.geopackage.model.GeoPackageOptions
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util.Locale
import scala.jdk.CollectionConverters._
import scala.util.Try

class GeoPackageDataSource extends FileDataSourceV2 with DataSourceRegister {

override def fallbackFileFormat: Class[_ <: FileFormat] = {
null
}

override protected def getTable(options: CaseInsensitiveStringMap): Table = {
GeoPackageTable(
"",
sparkSession,
options,
getPaths(options),
None,
fallbackFileFormat,
getLoadOptions(options))
}

private def getLoadOptions(options: CaseInsensitiveStringMap): GeoPackageOptions = {
val path = options.get("path")
if (path.isEmpty) {
throw new IllegalArgumentException("GeoPackage path is not specified")
}

val showMetadata = options.getBoolean("showMetadata", false)
val maybeTableName = options.get("tableName")

if (!showMetadata && maybeTableName == null) {
throw new IllegalArgumentException("Table name is not specified")
}

val tableName = if (showMetadata) {
"gpkg_contents"
} else {
maybeTableName
}

GeoPackageOptions(tableName = tableName, showMetadata = showMetadata)
}

override def shortName(): String = "geopackage"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.hadoop.fs.Path
import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.TableType.{FEATURES, METADATA, TILES, UNKNOWN}
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageReadOptions, PartitionOptions, TileRowMetadata}
import org.apache.sedona.sql.datasources.geopackage.transform.ValuesMapper
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.util.SerializableConfiguration

import java.io.File
import java.sql.ResultSet

case class GeoPackagePartitionReader(
var rs: ResultSet,
options: GeoPackageReadOptions,
broadcastedConf: Broadcast[SerializableConfiguration],
var currentTempFile: File,
copying: Boolean = false)
extends PartitionReader[InternalRow] {

private var values: Seq[Any] = Seq.empty
private var currentFile = options.currentFile
private val partitionedFiles = options.partitionedFiles

override def next(): Boolean = {
if (rs.next()) {
values = ValuesMapper.mapValues(adjustPartitionOptions, rs)
return true
}

partitionedFiles.remove(currentFile)

if (partitionedFiles.isEmpty) {
return false
}

rs.close()

currentFile = partitionedFiles.head
val (tempFile, _) = FileSystemUtils.copyToLocal(
options = broadcastedConf.value.value,
file = new Path(currentFile.filePath))

if (copying) {
currentTempFile.deleteOnExit()
}

currentTempFile = tempFile

rs = GeoPackageConnectionManager.getTableCursor(currentTempFile.getPath, options.tableName)

if (!rs.next()) {
return false
}

values = ValuesMapper.mapValues(adjustPartitionOptions, rs)

true
}

private def adjustPartitionOptions: PartitionOptions = {
options.partitionOptions.tableType match {
case FEATURES | METADATA => options.partitionOptions
case TILES =>
val tileRowMetadata = TileRowMetadata(
zoomLevel = rs.getInt("zoom_level"),
tileColumn = rs.getInt("tile_column"),
tileRow = rs.getInt("tile_row"))

options.partitionOptions.withTileRowMetadata(tileRowMetadata)
case UNKNOWN => options.partitionOptions
}

}

override def get(): InternalRow = {
InternalRow.fromSeq(values)
}

override def close(): Unit = {
rs.close()
if (copying) {
options.tempFile.delete()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.hadoop.fs.Path
import org.apache.sedona.sql.datasources.geopackage.connection.{FileSystemUtils, GeoPackageConnectionManager}
import org.apache.sedona.sql.datasources.geopackage.model.TableType.TILES
import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageOptions, GeoPackageReadOptions, PartitionOptions, TableType}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

case class GeoPackagePartitionReaderFactory(
sparkSession: SparkSession,
broadcastedConf: Broadcast[SerializableConfiguration],
loadOptions: GeoPackageOptions,
dataSchema: StructType)
extends PartitionReaderFactory {

override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val partitionFiles = partition match {
case filePartition: FilePartition => filePartition.files
case _ =>
throw new IllegalArgumentException(
s"Unexpected partition type: ${partition.getClass.getCanonicalName}")
}

val (tempFile, copied) = FileSystemUtils.copyToLocal(
options = broadcastedConf.value.value,
file = new Path(partitionFiles.head.filePath))

val tableType = if (loadOptions.showMetadata) {
TableType.METADATA
} else {
GeoPackageConnectionManager.findFeatureMetadata(tempFile.getPath, loadOptions.tableName)
}

val rs =
GeoPackageConnectionManager.getTableCursor(tempFile.getAbsolutePath, loadOptions.tableName)

val schema = GeoPackageConnectionManager.getSchema(tempFile.getPath, loadOptions.tableName)

if (StructType(schema.map(_.toStructField(tableType))) != dataSchema) {
throw new IllegalArgumentException(
s"Schema mismatch: expected $dataSchema, got ${StructType(schema.map(_.toStructField(tableType)))}")
}

val tileMetadata = tableType match {
case TILES =>
Some(
GeoPackageConnectionManager.findTilesMetadata(tempFile.getPath, loadOptions.tableName))
case _ => None
}

GeoPackagePartitionReader(
rs = rs,
options = GeoPackageReadOptions(
tableName = loadOptions.tableName,
tempFile = tempFile,
partitionOptions =
PartitionOptions(tableType = tableType, columns = schema, tile = tileMetadata),
partitionedFiles = scala.collection.mutable.HashSet(partitionFiles: _*),
currentFile = partitionFiles.head),
broadcastedConf = broadcastedConf,
currentTempFile = tempFile,
copying = copied)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.sql.datasources.geopackage

import org.apache.sedona.sql.datasources.geopackage.model.{GeoPackageLoadOptions, GeoPackageOptions}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

import scala.jdk.CollectionConverters._

case class GeoPackageScan(
dataSchema: StructType,
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap,
loadOptions: GeoPackageOptions)
extends FileScan {

override def partitionFilters: Seq[Expression] = {
Seq.empty
}

override def dataFilters: Seq[Expression] = {
Seq.empty
}

override def createReaderFactory(): PartitionReaderFactory = {
val caseSensitiveMap = options.asScala.toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
val broadcastedConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

GeoPackagePartitionReaderFactory(sparkSession, broadcastedConf, loadOptions, dataSchema)
}
}
Loading

0 comments on commit 5fa9331

Please sign in to comment.