diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java index 680f403..37553e6 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaRowVertexOutputFormatConverter.java @@ -52,6 +52,7 @@ public String createValue(Row row, PolicyEnum policy) { } Object id = row.getField(idIndex); if (id == null) { + Log.error("wrong id, your id is null "); return null; } List vertexProps = new ArrayList<>(); @@ -69,7 +70,8 @@ public String createValue(Row row, PolicyEnum policy) { if (policy == null) { if (vidType == VidTypeEnum.STRING) { - formatId = NebulaUtils.mkString(formatId, "\"", "", "\""); + formatId = NebulaUtils.mkString(NebulaUtils.escapeUtil(String.valueOf(formatId)), + "\"", "", "\""); } else { assert (NebulaUtils.isNumeric(formatId)); } @@ -80,6 +82,5 @@ public String createValue(Row row, PolicyEnum policy) { return String.format(NebulaConstant.VERTEX_VALUE_TEMPLATE_WITH_POLICY, policy.policy(), formatId, String.join(",", vertexProps)); } - } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java index 36b0f19..13fd568 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaUtils.java @@ -20,13 +20,22 @@ public static List getHostAndPorts(String address) { List hostAndPortList = new ArrayList<>(); for (String addr : address.split(NebulaConstant.COMMA)) { String[] hostPort = addr.split(NebulaConstant.COLON); + if (hostPort.length < 2) { + throw new IllegalArgumentException("wrong address"); + } hostAndPortList.add(new HostAddress(hostPort[0], Integer.parseInt(hostPort[1]))); } return hostAndPortList; } public static boolean isNumeric(String str) { - for (char c : str.toCharArray()) { + String newStr = null; + if (str.startsWith("-")) { + newStr = str.substring(1); + } else { + newStr = str; + } + for (char c : newStr.toCharArray()) { if (!Character.isDigit(c)) { return false; } @@ -36,6 +45,9 @@ public static boolean isNumeric(String str) { public static String extraValue(Object value, int type) { + if (value == null) { + return null; + } switch (type) { case PropertyType.STRING: case PropertyType.FIXED_STRING: @@ -46,13 +58,23 @@ public static String extraValue(Object value, int type) { return "time(\"" + value + "\")"; case PropertyType.DATETIME: return "datetime(\"" + value + "\")"; - default: + case PropertyType.TIMESTAMP: { + if (isNumeric(String.valueOf(value))) { + return String.valueOf(value); + } else { + return "timestamp(\"" + value + "\")"; + } + } + default: { return String.valueOf(value); + + } + } } - private static String escapeUtil(String value) { + public static String escapeUtil(String value) { String s = value; if (s.contains("\\")) { s = s.replaceAll("\\\\", "\\\\\\\\"); @@ -83,7 +105,7 @@ public static String mkString(String value, String start, String sep, String end boolean first = true; builder.append(start); for (char c : value.toCharArray()) { - if (first = true) { + if (first) { builder.append(c); first = false; } else { diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaUtilsTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaUtilsTest.java new file mode 100644 index 0000000..8e7a712 --- /dev/null +++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaUtilsTest.java @@ -0,0 +1,56 @@ +package org.apache.flink.connector.nebula.utils; + +import com.vesoft.nebula.meta.PropertyType; +import junit.framework.TestCase; + +public class NebulaUtilsTest extends TestCase { + + public void testGetHostAndPorts() { + assert (NebulaUtils.getHostAndPorts("127.0.0.1:9669").size() == 1); + assert (NebulaUtils.getHostAndPorts("127.0.0.1:9669,127.0.0.1:9670").size() == 2); + try { + NebulaUtils.getHostAndPorts(null); + } catch (IllegalArgumentException e) { + assert (true); + } catch (Exception e) { + assert (false); + } + + try { + NebulaUtils.getHostAndPorts("127.0.0.1"); + } catch (IllegalArgumentException e) { + assert (true); + } catch (Exception e) { + assert (false); + } + } + + public void testIsNumeric() { + assert (NebulaUtils.isNumeric("123456")); + assert (NebulaUtils.isNumeric("0123456")); + assert (NebulaUtils.isNumeric("-123456")); + assert (NebulaUtils.isNumeric("000")); + assert (!NebulaUtils.isNumeric("aaa")); + assert (!NebulaUtils.isNumeric("0123aaa")); + assert (!NebulaUtils.isNumeric("123a8")); + } + + public void testExtraValue() { + assert (null == NebulaUtils.extraValue(null, PropertyType.STRING)); + assert ("\"\"".equals(NebulaUtils.extraValue("", PropertyType.STRING))); + assert ("\"\"".equals(NebulaUtils.extraValue("", PropertyType.FIXED_STRING))); + assert ("1".equals(NebulaUtils.extraValue(1, PropertyType.INT8))); + assert ("timestamp(\"2021-01-01T12:12:12\")".equals( + NebulaUtils.extraValue("2021-01-01T12:12:12", PropertyType.TIMESTAMP))); + assert ("datetime(\"2021-01-01T12:12:12\")".equals( + NebulaUtils.extraValue("2021-01-01T12:12:12", PropertyType.DATETIME))); + assert ("date(\"2021-01-01\")".equals(NebulaUtils.extraValue("2021-01-01", + PropertyType.DATE))); + assert ("time(\"12:12:12\")".equals(NebulaUtils.extraValue("12:12:12", PropertyType.TIME))); + } + + public void testMkString() { + assertEquals("\"test\"", NebulaUtils.mkString("test", "\"", "", "\"")); + assertEquals("\"t,e,s,t\"", NebulaUtils.mkString("test", "\"", ",", "\"")); + } +}