From 085ab3baca1464dc08d39bfaafb90624624826a9 Mon Sep 17 00:00:00 2001 From: jiayang Date: Thu, 26 Oct 2023 19:51:34 +0800 Subject: [PATCH 1/2] [improve] update package name --- .../datasource/AbstractDataSourceClient.java | 155 ++++++++++-------- 1 file changed, 91 insertions(+), 64 deletions(-) diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java index 34fb2850e..0225b7038 100644 --- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java +++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java @@ -31,6 +31,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import lombok.NonNull; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import java.io.File; @@ -43,6 +45,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import static com.google.common.base.Preconditions.checkNotNull; @@ -50,17 +53,17 @@ public abstract class AbstractDataSourceClient implements DataSourceService { private static final String ST_WEB_BASEDIR_PATH = "ST_WEB_BASEDIR_PATH"; // private ClassLoader datasourceClassLoader; // thradlocal - private ThreadLocal datasourceClassLoader = new ThreadLocal<>(); + private final ThreadLocal datasourceClassLoader = new ThreadLocal<>(); - private Map supportedDataSourceInfo = new HashMap<>(); + private final Map supportedDataSourceInfo = new HashMap<>(); - private Map supportedDataSourceIndex = new HashMap<>(); + private final Map supportedDataSourceIndex = new HashMap<>(); - protected List supportedDataSources = new ArrayList<>(); + private final List supportedDataSources = new ArrayList<>(); - private List dataSourceChannels = new ArrayList<>(); + private final List dataSourceChannels = new ArrayList<>(); - private Map classLoaderChannel = new HashMap<>(); + private final Map classLoaderChannel = new HashMap<>(); protected AbstractDataSourceClient() { AtomicInteger dataSourceIndex = new AtomicInteger(); @@ -99,7 +102,7 @@ protected AbstractDataSourceClient() { .get(pluginName.toUpperCase()) .toString()); } catch (Exception e) { - log.warn("datasource " + pluginName + "is error" + ExceptionUtils.getMessage(e)); + log.warn("datasource " + pluginName + " is error " + ExceptionUtils.getMessage(e)); } Thread.currentThread().setContextClassLoader(contextClassLoader); } @@ -108,15 +111,19 @@ protected AbstractDataSourceClient() { } } + public Boolean isVirtualTableDatasource(String pluginName) { + log.info("pluginName: {}", pluginName); + return supportedDataSourceInfo.get(pluginName.toUpperCase()).getSupportVirtualTables(); + } + @Override public Boolean checkDataSourceConnectivity( String pluginName, Map dataSourceParams) { - updateClassLoader(pluginName); - boolean isConnect = - getDataSourceChannel(pluginName) - .checkDataSourceConnectivity(pluginName, dataSourceParams); - classLoaderRestore(); - return isConnect; + return executeByCustomerClassLoader( + pluginName, + () -> + getDataSourceChannel(pluginName) + .checkDataSourceConnectivity(pluginName, dataSourceParams)); } @Override @@ -126,27 +133,28 @@ public List listAllDataSources() { protected DataSourceChannel getDataSourceChannel(String pluginName) { checkNotNull(pluginName, "pluginName cannot be null"); - + // Integer index = supportedDataSourceIndex.get(pluginName.toUpperCase()); + // if (index == null) { + // throw new DataSourceSDKException( + // "The %s plugin is not supported or plugin not exist.", pluginName); + // } return DatasourceLoadConfig.classLoaderChannel.get(pluginName.toUpperCase()); } @Override public OptionRule queryDataSourceFieldByName(String pluginName) { - updateClassLoader(pluginName); - OptionRule dataSourceOptions = - getDataSourceChannel(pluginName).getDataSourceOptions(pluginName); - classLoaderRestore(); - return dataSourceOptions; + return executeByCustomerClassLoader( + pluginName, + () -> getDataSourceChannel(pluginName).getDataSourceOptions(pluginName)); } @Override public OptionRule queryMetadataFieldByName(String pluginName) { - updateClassLoader(pluginName); - OptionRule datasourceMetadataFieldsByDataSourceName = - getDataSourceChannel(pluginName) - .getDatasourceMetadataFieldsByDataSourceName(pluginName); - classLoaderRestore(); - return datasourceMetadataFieldsByDataSourceName; + return executeByCustomerClassLoader( + pluginName, + () -> + getDataSourceChannel(pluginName) + .getDatasourceMetadataFieldsByDataSourceName(pluginName)); } @Override @@ -155,21 +163,18 @@ public List getTables( String databaseName, Map requestParams, Map options) { - updateClassLoader(pluginName); - List tables = - getDataSourceChannel(pluginName) - .getTables(pluginName, requestParams, databaseName, options); - classLoaderRestore(); - return tables; + return executeByCustomerClassLoader( + pluginName, + () -> + getDataSourceChannel(pluginName) + .getTables(pluginName, requestParams, databaseName, options)); } @Override public List getDatabases(String pluginName, Map requestParams) { - updateClassLoader(pluginName); - List databases = - getDataSourceChannel(pluginName).getDatabases(pluginName, requestParams); - classLoaderRestore(); - return databases; + return executeByCustomerClassLoader( + pluginName, + () -> getDataSourceChannel(pluginName).getDatabases(pluginName, requestParams)); } @Override @@ -178,12 +183,12 @@ public List getTableFields( Map requestParams, String databaseName, String tableName) { - updateClassLoader(pluginName); - List tableFields = - getDataSourceChannel(pluginName) - .getTableFields(pluginName, requestParams, databaseName, tableName); - classLoaderRestore(); - return tableFields; + return executeByCustomerClassLoader( + pluginName, + () -> + getDataSourceChannel(pluginName) + .getTableFields( + pluginName, requestParams, databaseName, tableName)); } @Override @@ -192,12 +197,12 @@ public Map> getTableFields( Map requestParams, String databaseName, List tableNames) { - updateClassLoader(pluginName); - Map> tableFields = - getDataSourceChannel(pluginName) - .getTableFields(pluginName, requestParams, databaseName, tableNames); - classLoaderRestore(); - return tableFields; + return executeByCustomerClassLoader( + pluginName, + () -> + getDataSourceChannel(pluginName) + .getTableFields( + pluginName, requestParams, databaseName, tableNames)); } @Override @@ -207,17 +212,16 @@ public Pair getTableSyncMaxValue( String databaseName, String tableName, String updateFieldType) { - updateClassLoader(pluginName); - Pair tableSyncMaxValue = - getDataSourceChannel(pluginName) - .getTableSyncMaxValue( - pluginName, - requestParams, - databaseName, - tableName, - updateFieldType); - classLoaderRestore(); - return tableSyncMaxValue; + return executeByCustomerClassLoader( + pluginName, + () -> + getDataSourceChannel(pluginName) + .getTableSyncMaxValue( + pluginName, + requestParams, + databaseName, + tableName, + updateFieldType)); } private ClassLoader getCustomClassloader(String pluginName) { @@ -225,12 +229,20 @@ private ClassLoader getCustomClassloader(String pluginName) { log.info("ST_WEB_BASEDIR_PATH is : " + getenv); String libPath = StringUtils.isEmpty(getenv) ? "/datasource" : (getenv + "/datasource"); + // String libPath = "/root/apache-seatunnel-web-2.4.7-WS-SNAPSHOT/datasource/"; File jarDirectory = new File(libPath); File[] jarFiles = jarDirectory.listFiles( (dir, name) -> { String pluginUpperCase = pluginName.toUpperCase(); String nameLowerCase = name.toLowerCase(); + String pluginJar = + DatasourceLoadConfig.classLoaderJarName.get(pluginUpperCase); + if (StringUtils.isEmpty(pluginJar)) { + log.warn( + "classLoaderJarName get pluginUpperCase jar name is null : {} ", + pluginUpperCase); + } if (pluginUpperCase.equals("KAFKA")) { return !nameLowerCase.contains("kingbase") && nameLowerCase.startsWith( @@ -294,10 +306,25 @@ private void classLoaderRestore() { @Override public Connection getConnection(String pluginName, Map requestParams) { - updateClassLoader(pluginName); - Connection connection = - getDataSourceChannel(pluginName).getConnection(pluginName, requestParams); - classLoaderRestore(); - return connection; + return executeByCustomerClassLoader( + pluginName, + () -> getDataSourceChannel(pluginName).getConnection(pluginName, requestParams)); + } + + /** + * Execute the given {@code Callable} within the {@link ClassLoader} of the current thread. + * + * @param supplier + * @param + * @return + */ + @SneakyThrows + private T executeByCustomerClassLoader(String pluginName, @NonNull Supplier supplier) { + try { + updateClassLoader(pluginName); + return supplier.get(); + } finally { + classLoaderRestore(); + } } } From b395773a79bf82981c56bd0d0b32087b1fd9d998 Mon Sep 17 00:00:00 2001 From: jiayang Date: Mon, 13 Nov 2023 11:23:48 +0800 Subject: [PATCH 2/2] [feature] merge main --- .../seatunnel/datasource/AbstractDataSourceClient.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java index 0225b7038..a49c9d705 100644 --- a/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java +++ b/seatunnel-datasource/seatunnel-datasource-client/src/main/java/org/apache/seatunnel/datasource/AbstractDataSourceClient.java @@ -133,11 +133,6 @@ public List listAllDataSources() { protected DataSourceChannel getDataSourceChannel(String pluginName) { checkNotNull(pluginName, "pluginName cannot be null"); - // Integer index = supportedDataSourceIndex.get(pluginName.toUpperCase()); - // if (index == null) { - // throw new DataSourceSDKException( - // "The %s plugin is not supported or plugin not exist.", pluginName); - // } return DatasourceLoadConfig.classLoaderChannel.get(pluginName.toUpperCase()); }