Skip to content

Commit

Permalink
[Feature][Connector-V2] Add phoenix connector sink (apache#2499)
Browse files Browse the repository at this point in the history
* [Feature][Connector-V2] Add phoenix connector sink

* fix doc style

Co-authored-by: 毕博 <bibo@mafengwo.com>
  • Loading branch information
2 people authored and MRYOG committed Sep 8, 2022
1 parent e04e822 commit cb8cf66
Show file tree
Hide file tree
Showing 13 changed files with 822 additions and 0 deletions.
41 changes: 41 additions & 0 deletions docs/en/connector-v2/sink/Phoenix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Phoenix

> Phoenix sink connector
## Description
Write Phoenix data through [Jdbc connector](Jdbc.md).
Support Batch mode and Streaming mode. The tested Phoenix version is 4.xx and 5.xx
On the underlying implementation, through the jdbc driver of Phoenix, execute the upsert statement to write data to HBase.
Two ways of connecting Phoenix with Java JDBC. One is to connect to zookeeper through JDBC, and the other is to connect to queryserver through JDBC thin client.

> Tips: By default, the (thin) driver jar is used. If you want to use the (thick) driver or other versions of Phoenix (thin) driver, you need to recompile the jdbc connector module
> Tips: Not support exactly-once semantics (XA transaction is not yet supported in Phoenix).
## Options

### driver [string]
if you use phoenix (thick) driver the value is `org.apache.phoenix.jdbc.PhoenixDriver` or you use (thin) driver the value is `org.apache.phoenix.queryserver.client.Driver`

### url [string]
if you use phoenix (thick) driver the value is `jdbc:phoenix:localhost:2182/hbase` or you use (thin) driver the value is `jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF`

## Example
use thick client drive
```
Jdbc {
driver = org.apache.phoenix.jdbc.PhoenixDriver
url = "jdbc:phoenix:localhost:2182/hbase"
query = "upsert into test.sink(age, name) values(?, ?)"
}
```

use thin client drive
```
Jdbc {
driver = org.apache.phoenix.queryserver.client.Driver
url = "jdbc:phoenix:thin:url=http://spark_e2e_phoenix_sink:8765;serialization=PROTOBUF"
query = "upsert into test.sink(age, name) values(?, ?)"
}
```
39 changes: 39 additions & 0 deletions docs/en/connector-v2/source/Phoenix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Phoenix

> Phoenix source connector
## Description
Read Phoenix data through [Jdbc connector](Jdbc.md).
Support Batch mode and Streaming mode. The tested Phoenix version is 4.xx and 5.xx
On the underlying implementation, through the jdbc driver of Phoenix, execute the upsert statement to write data to HBase.
Two ways of connecting Phoenix with Java JDBC. One is to connect to zookeeper through JDBC, and the other is to connect to queryserver through JDBC thin client.

> Tips: By default, the (thin) driver jar is used. If you want to use the (thick) driver or other versions of Phoenix (thin) driver, you need to recompile the jdbc connector module
## Options

### driver [string]
if you use phoenix (thick) driver the value is `org.apache.phoenix.jdbc.PhoenixDriver` or you use (thin) driver the value is `org.apache.phoenix.queryserver.client.Driver`

### url [string]
if you use phoenix (thick) driver the value is `jdbc:phoenix:localhost:2182/hbase` or you use (thin) driver the value is `jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF`

## Example
use thick client drive
```
Jdbc {
driver = org.apache.phoenix.jdbc.PhoenixDriver
url = "jdbc:phoenix:localhost:2182/hbase"
query = "select age, name from test.source"
}
```

use thin client drive
```
Jdbc {
driver = org.apache.phoenix.queryserver.client.Driver
url = "jdbc:phoenix:thin:url=http://spark_e2e_phoenix_sink:8765;serialization=PROTOBUF"
query = "select age, name from test.source"
}
```
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@
<elasticsearch-rest-client.version>7.5.1</elasticsearch-rest-client.version>
<checker.qual.version>3.10.0</checker.qual.version>
<iotdb.version>0.13.1</iotdb.version>
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<awaitility.version>4.2.0</awaitility.version>
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
</properties>
Expand Down Expand Up @@ -964,6 +965,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
<version>${phoenix.version}</version>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@
<artifactId>postgresql</artifactId>
</dependency>

<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-shaded-thin-client</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class PhoenixDialect implements JdbcDialect {
@Override
public String dialectName() {
return "Phoenix";
}

@Override
public JdbcRowConverter getRowConverter() {
return new PhoenixJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new PhoenixTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;
import lombok.NonNull;

@AutoService(JdbcDialectFactory.class)
public class PhoenixDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(@NonNull String url) {
return url.startsWith("jdbc:phoenix:");
}

@Override
public JdbcDialect create() {
return new PhoenixDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class PhoenixJdbcRowConverter extends AbstractJdbcRowConverter {
@Override
public String converterName() {
return "Phoenix";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class PhoenixTypeMapper implements JdbcDialectTypeMapper {

private static final Logger LOG = LoggerFactory.getLogger(PhoenixTypeMapper.class);

// ============================data types=====================

private static final String PHOENIX_UNKNOWN = "UNKNOWN";
private static final String PHOENIX_BOOLEAN = "BOOLEAN";
private static final String PHOENIX_ARRAY = "ARRAY";

// -------------------------number----------------------------
private static final String PHOENIX_TINYINT = "TINYINT";
private static final String PHOENIX_UNSIGNED_TINYINT = "UNSIGNED_TINYINT";
private static final String PHOENIX_SMALLINT = "SMALLINT";
private static final String PHOENIX_UNSIGNED_SMALLINT = "UNSIGNED_SMALLINT";
private static final String PHOENIX_UNSIGNED_INT = "UNSIGNED_INT";
private static final String PHOENIX_INTEGER = "INTEGER";
private static final String PHOENIX_BIGINT = "BIGINT";
private static final String PHOENIX_UNSIGNED_LONG = "UNSIGNED_LONG";
private static final String PHOENIX_DECIMAL = "DECIMAL";
private static final String PHOENIX_FLOAT = "FLOAT";
private static final String PHOENIX_UNSIGNED_FLOAT = "UNSIGNED_FLOAT";
private static final String PHOENIX_DOUBLE = "DOUBLE";
private static final String PHOENIX_UNSIGNED_DOUBLE = "UNSIGNED_DOUBLE";

// -------------------------string----------------------------
private static final String PHOENIX_CHAR = "CHAR";
private static final String PHOENIX_VARCHAR = "VARCHAR";

// ------------------------------time-------------------------
private static final String PHOENIX_DATE = "DATE";
private static final String PHOENIX_TIME = "TIME";
private static final String PHOENIX_TIMESTAMP = "TIMESTAMP";
private static final String PHOENIX_DATE_UNSIGNED = "UNSIGNED_DATE";
private static final String PHOENIX_TIME_UNSIGNED = "UNSIGNED_TIME";
private static final String PHOENIX_TIMESTAMP_UNSIGNED = "UNSIGNED_TIMESTAMP";

// ------------------------------blob-------------------------
private static final String PHOENIX_BINARY = "BINARY";
private static final String PHOENIX_VARBINARY = "VARBINARY";

@SuppressWarnings("checkstyle:MagicNumber")
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String phoenixType = metadata.getColumnTypeName(colIndex).toUpperCase();
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (phoenixType) {
case PHOENIX_BOOLEAN:
return BasicType.BOOLEAN_TYPE;
case PHOENIX_TINYINT:
case PHOENIX_UNSIGNED_TINYINT:
return BasicType.BYTE_TYPE;
case PHOENIX_UNSIGNED_INT:
case PHOENIX_INTEGER:
return BasicType.INT_TYPE;
case PHOENIX_UNSIGNED_SMALLINT:
case PHOENIX_SMALLINT:
return BasicType.SHORT_TYPE;
case PHOENIX_BIGINT:
case PHOENIX_UNSIGNED_LONG:
return BasicType.LONG_TYPE;
case PHOENIX_DECIMAL:
return new DecimalType(precision, scale);
case PHOENIX_FLOAT:
return BasicType.FLOAT_TYPE;
case PHOENIX_UNSIGNED_FLOAT:
LOG.warn("{} will probably cause value overflow.", PHOENIX_UNSIGNED_FLOAT);
return BasicType.FLOAT_TYPE;
case PHOENIX_DOUBLE:
return BasicType.DOUBLE_TYPE;
case PHOENIX_UNSIGNED_DOUBLE:
LOG.warn("{} will probably cause value overflow.", PHOENIX_UNSIGNED_DOUBLE);
return BasicType.DOUBLE_TYPE;
case PHOENIX_CHAR:
case PHOENIX_VARCHAR:
return BasicType.STRING_TYPE;
case PHOENIX_DATE:
case PHOENIX_DATE_UNSIGNED:
return LocalTimeType.LOCAL_DATE_TYPE;
case PHOENIX_TIME:
case PHOENIX_TIME_UNSIGNED:
return LocalTimeType.LOCAL_TIME_TYPE;
case PHOENIX_TIMESTAMP_UNSIGNED:
case PHOENIX_TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case PHOENIX_VARBINARY:
case PHOENIX_BINARY:
return PrimitiveByteArrayType.INSTANCE;
//Doesn't support yet
case PHOENIX_UNKNOWN:
case PHOENIX_ARRAY:
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support PHOENIX type '%s' on column '%s' yet.",
phoenixType, jdbcColumnName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public void executeBatch() throws SQLException {
}
st.executeBatch();
batch.clear();

// cache commit
st.getConnection().commit();
st.clearParameters();
st.clearBatch();

}
}

Expand Down
Loading

0 comments on commit cb8cf66

Please sign in to comment.