Skip to content

Commit

Permalink
Merge branch 'master' into rxsocks
Browse files Browse the repository at this point in the history
  • Loading branch information
youfanx committed Sep 2, 2024
2 parents c8271a2 + 1fe997f commit 83af5de
Show file tree
Hide file tree
Showing 29 changed files with 413 additions and 233 deletions.
140 changes: 135 additions & 5 deletions rxlib-x/src/main/java/org/rx/jdbc/JdbcUtil.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,36 @@
package org.rx.jdbc;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.fastjson2.JSONObject;
import com.mysql.jdbc.MySQLConnection;
import com.mysql.jdbc.StringUtils;
import com.zaxxer.hikari.HikariDataSource;
import com.zaxxer.hikari.pool.ProxyConnection;
import lombok.NonNull;
import lombok.SneakyThrows;
import org.rx.core.Linq;
import org.rx.core.Reflects;
import org.rx.core.*;
import org.rx.core.StringBuilder;
import org.rx.core.Strings;
import org.rx.exception.InvalidException;
import org.rx.io.EntityQueryLambda;
import org.rx.third.guava.CaseFormat;
import org.rx.util.function.BiAction;
import org.rx.util.function.BiFunc;
import org.rx.util.function.TripleFunc;

import javax.sql.DataSource;
import java.io.InputStream;
import java.io.Reader;
import java.lang.reflect.Field;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.sql.*;
import java.util.Collections;
import java.util.Map;
import java.util.*;

import static org.rx.core.Extends.as;
import static org.rx.core.Sys.fromJson;
import static org.rx.core.Sys.toJsonObject;
import static org.rx.io.EntityQueryLambda.TO_UNDERSCORE_COLUMN_MAPPING;
import static org.rx.io.EntityQueryLambda.TO_UNDERSCORE_TABLE_MAPPING;

public class JdbcUtil {
static final String HINT_PREFIX = "/*", HINT_SUFFIX = "*/";
Expand Down Expand Up @@ -195,4 +204,125 @@ public static void print(ResultSet resultSet) {
}
}
}

public static final BiFunc<String, String> TO_CAMEL_COLUMN_MAPPING = p -> CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, p);

public static <T> List<T> readAs(ResultSet resultSet, Type type) {
return readAs(resultSet, type, TO_CAMEL_COLUMN_MAPPING, null);
}

@SneakyThrows
public static <T> List<T> readAs(@NonNull ResultSet resultSet, @NonNull Type type, BiFunc<String, String> columnMapping, BiAction<Map<String, Object>> rowMapping) {
List<T> list = new ArrayList<>();
try (ResultSet rs = resultSet) {
ResultSetMetaData metaData = rs.getMetaData();
int colSize = metaData.getColumnCount();
List<String> columns = new ArrayList<>(colSize);
if (columnMapping != null) {
for (int i = 1; i <= colSize; i++) {
columns.add(columnMapping.invoke(metaData.getColumnLabel(i)));
}
} else {
for (int i = 1; i <= colSize; i++) {
columns.add(metaData.getColumnLabel(i));
}
}

JSONObject row = new JSONObject(colSize);
while (rs.next()) {
row.clear();
for (int i = 0; i < colSize; ) {
row.put(columns.get(i), rs.getObject(++i));
}
if (rowMapping != null) {
rowMapping.invoke(row);
}
list.add(fromJson(row, type));
}
}
return list;
}

public static <T> String buildInsertSql(T po) {
return buildInsertSql(po, TO_UNDERSCORE_TABLE_MAPPING, TO_UNDERSCORE_COLUMN_MAPPING, null);
}

public static <T> String buildInsertSql(@NonNull T po, BiFunc<Class<?>, String> tableMapping, BiFunc<String, String> columnMapping, TripleFunc<String, Object, Object> valueMapping) {
JSONObject row = toJsonObject(po);
if (row.isEmpty()) {
throw new InvalidException("Type {} hasn't any getters", po.getClass());
}

List<String> columns = new ArrayList<>(row.size()), values = new ArrayList<>(row.size());
if (columnMapping != null) {
for (String k : row.keySet()) {
String nk = columnMapping.apply(k);
if (nk == null) {
continue;
}
columns.add("`" + nk + "`");
Object val = row.get(k);
if (valueMapping != null) {
val = valueMapping.apply(nk, val);
}
values.add(EntityQueryLambda.toValueString(val));
}
} else {
for (String k : row.keySet()) {
columns.add("`" + k + "`");
Object val = row.get(k);
if (valueMapping != null) {
val = valueMapping.apply(k, val);
}
values.add(EntityQueryLambda.toValueString(val));
}
}

Class<?> poType = po.getClass();
return new StringBuilder(128)
.appendMessageFormat(Constants.SQL_INSERT,
tableMapping != null ? tableMapping.apply(poType) : poType.getSimpleName(),
String.join(",", columns), String.join(",", values)).toString();
}

public static <T> String buildUpdateSql(T po, EntityQueryLambda<T> query) {
return buildUpdateSql(po, query, TO_UNDERSCORE_TABLE_MAPPING, TO_UNDERSCORE_COLUMN_MAPPING, null);
}

public static <T> String buildUpdateSql(@NonNull T po, @NonNull EntityQueryLambda<T> query, BiFunc<Class<?>, String> tableMapping, BiFunc<String, String> columnMapping, TripleFunc<String, Object, Object> valueMapping) {
JSONObject row = toJsonObject(po);
if (row.isEmpty()) {
throw new InvalidException("Type {} hasn't any getters", po.getClass());
}
query.setColumnMapping(columnMapping);

List<String> colVals = new ArrayList<>(row.size());
if (columnMapping != null) {
for (String k : row.keySet()) {
String nk = columnMapping.apply(k);
if (nk == null) {
continue;
}
Object val = row.get(k);
if (valueMapping != null) {
val = valueMapping.apply(nk, val);
}
colVals.add("`" + nk + "`=" + EntityQueryLambda.toValueString(val));
}
} else {
for (String k : row.keySet()) {
Object val = row.get(k);
if (valueMapping != null) {
val = valueMapping.apply(k, val);
}
colVals.add("`" + k + "`=" + EntityQueryLambda.toValueString(val));
}
}

Class<?> poType = po.getClass();
return new StringBuilder(128)
.appendMessageFormat(Constants.SQL_UPDATE, tableMapping != null ? tableMapping.apply(poType) : poType.getSimpleName(),
String.join(",", colVals))
.append(query).toString();
}
}
6 changes: 5 additions & 1 deletion rxlib-x/src/main/java/org/rx/redis/RedisCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ public int size() {
}

public RedisCache(String redisUrl) {
client = create(redisUrl);
this(create(redisUrl));
}

public RedisCache(@NonNull RedissonClient redissonClient) {
client = redissonClient;
}

@Override
Expand Down
43 changes: 39 additions & 4 deletions rxlib-x/src/test/java/org/rx/jdbc/TestJdbc.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,48 @@
package org.rx.jdbc;

import lombok.Data;
import org.junit.jupiter.api.Test;
import org.rx.bean.DateTime;
import org.rx.io.EntityQueryLambda;

import java.util.Date;

public class TestJdbc {
@Data
public static class PoUser {
Long id;
String userName;
String pwd;
Integer age;
Date createAt;
Date modifyAt;
}

@Test
public void jdbcExec() {
JdbcExecutor d = new JdbcExecutor("jdbc:mysql://", "", "bG1hbG1#");
JdbcUtil.print(d.executeQuery("select * from emr.t_third_api_record\n" +
"# where third_order_id = 'A01202402201715030375693'\n" +
"order by updated_time desc"));
// JdbcExecutor d = new JdbcExecutor("jdbc:mysql://", "", "bG1hbG1#");
// JdbcUtil.print(d.executeQuery("select * from emr.t_third_api_record\n" +
// "# where third_order_id = 'A01202402201715030375693'\n" +
// "order by updated_time desc"));

PoUser po = new PoUser();
po.setId(1L);
po.setUserName("rocky");
po.setAge(16);
po.setCreateAt(DateTime.now());
po.setModifyAt(new Date());
System.out.println(JdbcUtil.buildInsertSql(po, t -> t.getSimpleName().toUpperCase(), c -> {
switch (c) {
case "id":
return "_id";
}
return c;
}, (c, v) -> {
if (v instanceof Date) {
return "2024-01-01";
}
return v;
}));
System.out.println(JdbcUtil.buildUpdateSql(po, new EntityQueryLambda<>(PoUser.class).eq(PoUser::getId, 1024).eq(PoUser::getUserName, "wyf")));
}
}
29 changes: 10 additions & 19 deletions rxlib/src/main/java/org/rx/bean/CircularBlockingQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@

import lombok.Getter;
import org.rx.core.*;
import org.rx.util.function.TripleFunc;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

import static org.rx.core.Extends.ifNull;

public class CircularBlockingQueue<T> extends LinkedBlockingQueue<T> implements EventPublisher<CircularBlockingQueue<T>> {
private static final long serialVersionUID = 4685018531330571106L;
public final Delegate<CircularBlockingQueue<T>, NEventArgs<T>> onConsume = Delegate.create();
public TripleFunc<CircularBlockingQueue<T>, T, Boolean> onFull;
public final Delegate<CircularBlockingQueue<T>, T> onConsume = Delegate.create();
public final Delegate<CircularBlockingQueue<T>, NEventArgs<T>> onFull = Delegate.create();
final ReentrantLock pLock = Reflects.readField(this, "putLock");
TimeoutFuture<?> consumeTimer;
@Getter
Expand All @@ -34,10 +31,8 @@ public synchronized void setConsumePeriod(long consumePeriod) {
}
consumeTimer = Tasks.timer().setTimeout(() -> {
T t;
NEventArgs<T> e = new NEventArgs<>();
while ((t = poll()) != null) {
e.setValue(t);
raiseEvent(onConsume, e);
raiseEvent(onConsume, t);
}
}, d -> consumePeriod, null, Constants.TIMER_PERIOD_FLAG);
} else {
Expand All @@ -48,26 +43,20 @@ public synchronized void setConsumePeriod(long consumePeriod) {
}

public CircularBlockingQueue(int capacity) {
this(capacity, null);
onFull = (q, t) -> {
super(capacity);
onFull.combine((q, t) -> {
pLock.lock();
try {
boolean ok;
do {
q.poll();
ok = q.innerOffer(t);
ok = q.innerOffer(t.getValue());
}
while (!ok);
return true;
} finally {
pLock.unlock();
}
};
}

public CircularBlockingQueue(int capacity, TripleFunc<CircularBlockingQueue<T>, T, Boolean> onFull) {
super(capacity);
this.onFull = onFull;
});
}

//Full会抛异常
Expand All @@ -80,7 +69,9 @@ public CircularBlockingQueue(int capacity, TripleFunc<CircularBlockingQueue<T>,
public boolean offer(T t) {
boolean r = super.offer(t);
if (!r && onFull != null) {
return ifNull(onFull.apply(this, t), false);
NEventArgs<T> e = new NEventArgs<>(t);
raiseEvent(onFull, e);
return !e.isCancel();
}
return r;
}
Expand Down
13 changes: 6 additions & 7 deletions rxlib/src/main/java/org/rx/bean/DataTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.h2.jdbc.JdbcResultSet;
import org.h2.result.LocalResult;
import org.h2.value.ValueToObjectConverter;
import org.rx.core.Arrays;
import org.rx.core.StringBuilder;
import org.rx.core.*;
import org.rx.exception.InvalidException;
Expand All @@ -20,10 +21,7 @@
import java.lang.reflect.Type;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.*;

import static org.rx.core.Extends.as;
import static org.rx.core.Extends.tryAs;
Expand Down Expand Up @@ -183,13 +181,14 @@ public <T> List<T> toList(@NonNull Type type, boolean toLowerCamelColumn) {
List<T> list = new ArrayList<>();
int colSize = columns.size();
Iterator<DataRow> rows = getRows();
Map<String, Object> row = new HashMap<>(colSize);
while (rows.hasNext()) {
row.clear();
List<Object> cells = rows.next().items;
JSONObject j = new JSONObject(colSize);
for (int i = 0; i < colSize; i++) {
j.put(columns.get(i).columnName, cells.get(i));
row.put(columns.get(i).columnName, cells.get(i));
}
list.add(fromJson(j, type));
list.add(fromJson(row, type));
}
return list;
}
Expand Down
16 changes: 1 addition & 15 deletions rxlib/src/main/java/org/rx/bean/IntWaterMark.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,12 @@

import java.io.Serializable;

import static org.rx.core.Extends.require;

@Getter
@Setter
@ToString
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@NoArgsConstructor
@AllArgsConstructor
public class IntWaterMark implements Serializable {
private static final long serialVersionUID = -6996645790082139283L;
private int low, high;

public void setLow(int low) {
require(low, low < high);

this.low = low;
}

public void setHigh(int high) {
require(high, high > low);

this.high = high;
}
}
8 changes: 8 additions & 0 deletions rxlib/src/main/java/org/rx/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,15 @@ enum MetricName {
/**
* do not edit
*/
// String SQL_INSERT = "INSERT INTO ${table} (${columns}) VALUES (${values})";
// String SQL_UPDATE = "UPDATE ${table} SET ${columnValues} WHERE ";
String SQL_INSERT = "INSERT INTO {} ({}) VALUES ({})";
String SQL_UPDATE = "UPDATE {} SET {} WHERE ";

FlagsEnum<TimeoutFlag> TIMER_PERIOD_FLAG = TimeoutFlag.PERIOD.flags();
FlagsEnum<TimeoutFlag> TIMER_SINGLE_FLAG = TimeoutFlag.SINGLE.flags();
FlagsEnum<TimeoutFlag> TIMER_REPLACE_FLAG = TimeoutFlag.REPLACE.flags();

FlagsEnum<EventPublisher.EventFlags> EVENT_DYNAMIC_FLAG = EventPublisher.EventFlags.DYNAMIC_ATTACH.flags();
FlagsEnum<EventPublisher.EventFlags> EVENT_ALL_FLAG = EventPublisher.EventFlags.DYNAMIC_ATTACH.flags(EventPublisher.EventFlags.QUIETLY);
}
Loading

0 comments on commit 83af5de

Please sign in to comment.