亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

PowerJob?AbstractSqlProcessor方法工作流程源碼解讀

 更新時(shí)間:2024年01月12日 09:45:25   作者:codecraft  
這篇文章主要為大家介紹了PowerJob?AbstractSqlProcessor方法工作流程源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

本文主要研究一下PowerJob的AbstractSqlProcessor

AbstractSqlProcessor

tech/powerjob/official/processors/impl/sql/AbstractSqlProcessor.java

@Slf4j
public abstract class AbstractSqlProcessor extends CommonBasicProcessor {
    /**
     * 默認(rèn)超時(shí)時(shí)間
     */
    protected static final int DEFAULT_TIMEOUT = 60;
    /**
     * name => SQL validator
     * 注意 :
     * - 返回 true 表示驗(yàn)證通過(guò)
     * - 返回 false 表示 SQL 非法,將被拒絕執(zhí)行
     */
    protected final Map<String, Predicate<String>> sqlValidatorMap = Maps.newConcurrentMap();
    /**
     * 自定義 SQL 解析器
     */
    protected SqlParser sqlParser;
    private static final Joiner JOINER = Joiner.on("|").useForNull("-");
    @Override
    public ProcessResult process0(TaskContext taskContext) {
        OmsLogger omsLogger = taskContext.getOmsLogger();
        // 解析參數(shù)
        SqlParams sqlParams = extractParams(taskContext);
        omsLogger.info("origin sql params: {}", JSON.toJSON(sqlParams));
        // 校驗(yàn)參數(shù)
        validateParams(sqlParams);
        StopWatch stopWatch = new StopWatch(this.getClass().getSimpleName());
        // 解析
        stopWatch.start("Parse SQL");
        if (sqlParser != null) {
            omsLogger.info("before parse sql: {}", sqlParams.getSql());
            String newSQL = sqlParser.parse(sqlParams.getSql(), taskContext);
            sqlParams.setSql(newSQL);
            omsLogger.info("after parse sql: {}", newSQL);
        }
        stopWatch.stop();
        // 校驗(yàn) SQL
        stopWatch.start("Validate SQL");
        validateSql(sqlParams.getSql(), omsLogger);
        stopWatch.stop();
        // 執(zhí)行
        stopWatch.start("Execute SQL");
        omsLogger.info("final sql params: {}", JSON.toJSON(sqlParams));
        executeSql(sqlParams, taskContext);
        stopWatch.stop();
        omsLogger.info(stopWatch.prettyPrint());
        String message = String.format("execute successfully, used time: %s millisecond", stopWatch.getTotalTimeMillis());
        return new ProcessResult(true, message);
    }
    abstract Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException;
    public void setSqlParser(SqlParser sqlParser) {
        this.sqlParser = sqlParser;
    }
    public void registerSqlValidator(String validatorName, Predicate<String> sqlValidator) {
        sqlValidatorMap.put(validatorName, sqlValidator);
        log.info("register sql validator({})' successfully.", validatorName);
    }
    //......
}
AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗(yàn),針對(duì)sqlParser不為null的會(huì)通過(guò)sqlParser進(jìn)行解析,接著通過(guò)validateSql校驗(yàn)sql,最后通過(guò)executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法

SqlParams

@Data
    public static class SqlParams {
        /**
         * 數(shù)據(jù)源名稱
         */
        private String dataSourceName;
        /**
         * 需要執(zhí)行的 SQL
         */
        private String sql;
        /**
         * 超時(shí)時(shí)間
         */
        private Integer timeout;
        /**
         * jdbc url
         * 具體格式可參考 https://www.baeldung.com/java-jdbc-url-format
         */
        private String jdbcUrl;
        /**
         * 是否展示 SQL 執(zhí)行結(jié)果
         */
        private boolean showResult;
    }
SqlParams定義了dataSourceName、sql、timeout、jdbcUrl、showResult屬性

validateSql

private void validateSql(String sql, OmsLogger omsLogger) {
        if (sqlValidatorMap.isEmpty()) {
            return;
        }
        for (Map.Entry<String, Predicate<String>> entry : sqlValidatorMap.entrySet()) {
            Predicate<String> validator = entry.getValue();
            if (!validator.test(sql)) {
                omsLogger.error("validate sql by validator[{}] failed, skip to process!", entry.getKey());
                throw new IllegalArgumentException("illegal sql, can't pass the validation of " + entry.getKey());
            }
        }
    }
validateSql遍歷sqlValidatorMap,挨個(gè)執(zhí)行test方法,驗(yàn)證不通過(guò)拋出IllegalArgumentException

executeSql

@SneakyThrows
    private void executeSql(SqlParams sqlParams, TaskContext ctx) {
        OmsLogger omsLogger = ctx.getOmsLogger();
        boolean originAutoCommitFlag ;
        try (Connection connection = getConnection(sqlParams, ctx)) {
            originAutoCommitFlag = connection.getAutoCommit();
            connection.setAutoCommit(false);
            try (Statement statement = connection.createStatement()) {
                statement.setQueryTimeout(sqlParams.getTimeout() == null ? DEFAULT_TIMEOUT : sqlParams.getTimeout());
                statement.execute(sqlParams.getSql());
                connection.commit();
                if (sqlParams.showResult) {
                    outputSqlResult(statement, omsLogger);
                }
            } catch (Throwable e) {
                omsLogger.error("execute sql failed, try to rollback", e);
                connection.rollback();
                throw e;
            } finally {
                connection.setAutoCommit(originAutoCommitFlag);
            }
        }
    }
executeSql通過(guò)getConnection獲取連接,設(shè)置為手動(dòng)提交,然后創(chuàng)建Statement,設(shè)置queryTimeout,執(zhí)行,最后提交,針對(duì)showResult的執(zhí)行outputSqlResult

outputSqlResult

private void outputSqlResult(Statement statement, OmsLogger omsLogger) throws SQLException {
        omsLogger.info("====== SQL EXECUTE RESULT ======");

        for (int index = 0; index < Integer.MAX_VALUE; index++) {

            // 某一個(gè)結(jié)果集
            ResultSet resultSet = statement.getResultSet();
            if (resultSet != null) {
                try (ResultSet rs = resultSet) {
                    int columnCount = rs.getMetaData().getColumnCount();
                    List<String> columnNames = Lists.newLinkedList();
                    //column – the first column is 1, the second is 2, ...
                    for (int i = 1; i <= columnCount; i++) {
                        columnNames.add(rs.getMetaData().getColumnName(i));
                    }
                    omsLogger.info("[Result-{}] [Columns] {}" + System.lineSeparator(), index, JOINER.join(columnNames));
                    int rowIndex = 0;
                    List<Object> row = Lists.newLinkedList();
                    while (rs.next()) {
                        for (int i = 1; i <= columnCount; i++) {
                            row.add(rs.getObject(i));
                        }
                        omsLogger.info("[Result-{}] [Row-{}] {}" + System.lineSeparator(), index, rowIndex++, JOINER.join(row));
                    }
                }
            } else {
                int updateCount = statement.getUpdateCount();
                if (updateCount != -1) {
                    omsLogger.info("[Result-{}] update count: {}", index, updateCount);
                }
            }
            if (((!statement.getMoreResults()) && (statement.getUpdateCount() == -1))) {
                break;
            }
        }
        omsLogger.info("====== SQL EXECUTE RESULT ======");
    }
outputSqlResult從statement獲取resultSet,然后打印columnName,在打印每行數(shù)據(jù),對(duì)于更新操作則打印updateCount

SqlParser

@FunctionalInterface
    public interface SqlParser {
        /**
         * 自定義 SQL 解析邏輯
         *
         * @param sql         原始 SQL 語(yǔ)句
         * @param taskContext 任務(wù)上下文
         * @return 解析后的 SQL
         */
        String parse(String sql, TaskContext taskContext);
    }
SqlParser接口定義了parse方法

DynamicDatasourceSqlProcessor

tech/powerjob/official/processors/impl/sql/DynamicDatasourceSqlProcessor.java

public class DynamicDatasourceSqlProcessor extends AbstractSqlProcessor {
    @Override
    protected void validateParams(SqlParams sqlParams) {
        if (StringUtils.isEmpty(sqlParams.getJdbcUrl())) {
            throw new IllegalArgumentException("jdbcUrl can't be empty in DynamicDatasourceSqlProcessor!");
        }
    }
    @Override
    Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {
        JSONObject params = JSONObject.parseObject(CommonUtils.parseParams(taskContext));
        Properties properties = new Properties();
        // normally at least a "user" and "password" property should be included
        params.forEach((k, v) -> properties.setProperty(k, String.valueOf(v)));
        return DriverManager.getConnection(sqlParams.getJdbcUrl(), properties);
    }
    @Override
    protected String getSecurityDKey() {
        return SecurityUtils.ENABLE_DYNAMIC_SQL_PROCESSOR;
    }
}
DynamicDatasourceSqlProcessor繼承了AbstractSqlProcessor,其validateParams要求jdbcUrl不能為空,其getConnection方法會(huì)從taskContext提取properties作為DriverManager.getConnection的屬性,其getSecurityDKey返回的是powerjob.official-processor.dynamic-datasource.enable配置

SpringDatasourceSqlProcessor

tech/powerjob/official/processors/impl/sql/SpringDatasourceSqlProcessor.java

@Slf4j
public class SpringDatasourceSqlProcessor extends AbstractSqlProcessor {
    /**
     * 默認(rèn)的數(shù)據(jù)源名稱
     */
    private static final String DEFAULT_DATASOURCE_NAME = "default";
    /**
     * name => data source
     */
    private final Map<String, DataSource> dataSourceMap;
    /**
     * 指定默認(rèn)的數(shù)據(jù)源
     *
     * @param defaultDataSource 默認(rèn)數(shù)據(jù)源
     */
    public SpringDatasourceSqlProcessor(DataSource defaultDataSource) {
        dataSourceMap = Maps.newConcurrentMap();
        registerDataSource(DEFAULT_DATASOURCE_NAME, defaultDataSource);
    }
    @Override
    Connection getConnection(SqlParams sqlParams, TaskContext taskContext) throws SQLException {
        return dataSourceMap.get(sqlParams.getDataSourceName()).getConnection();
    }
    /**
     * 校驗(yàn)參數(shù),如果校驗(yàn)不通過(guò)直接拋異常
     *
     * @param sqlParams SQL 參數(shù)信息
     */
    @Override
    protected void validateParams(SqlParams sqlParams) {
        // 檢查數(shù)據(jù)源
        if (StringUtils.isEmpty(sqlParams.getDataSourceName())) {
            // use the default data source when current data source name is empty
            sqlParams.setDataSourceName(DEFAULT_DATASOURCE_NAME);
        }
        dataSourceMap.computeIfAbsent(sqlParams.getDataSourceName(), dataSourceName -> {
            throw new IllegalArgumentException("can't find data source with name " + dataSourceName);
        });
    }
    /**
     * 注冊(cè)數(shù)據(jù)源
     *
     * @param dataSourceName 數(shù)據(jù)源名稱
     * @param dataSource     數(shù)據(jù)源
     */
    public void registerDataSource(String dataSourceName, DataSource dataSource) {
        Objects.requireNonNull(dataSourceName, "DataSource name must not be null");
        Objects.requireNonNull(dataSource, "DataSource must not be null");
        dataSourceMap.put(dataSourceName, dataSource);
        log.info("register data source({})' successfully.", dataSourceName);
    }
    /**
     * 移除數(shù)據(jù)源
     *
     * @param dataSourceName 數(shù)據(jù)源名稱
     */
    public void removeDataSource(String dataSourceName) {
        DataSource remove = dataSourceMap.remove(dataSourceName);
        if (remove != null) {
            log.warn("remove data source({})' successfully.", dataSourceName);
        }
    }
}
SpringDatasourceSqlProcessor繼承了AbstractSqlProcessor,其構(gòu)造器注冊(cè)名為default的DataSource,其getConnection根據(jù)sqlParams的dataSourceName來(lái)獲取連接,validateParams會(huì)先校驗(yàn)指定的dataSource是否存在;它提供了registerDataSource、removeDataSource方法

小結(jié)

AbstractSqlProcessor繼承了CommonBasicProcessor,其process0先將入?yún)⒔馕鰹镾qlParams,然后調(diào)用validateParams進(jìn)行參數(shù)校驗(yàn),針對(duì)sqlParser不為null的會(huì)通過(guò)sqlParser進(jìn)行解析,接著通過(guò)validateSql校驗(yàn)sql,最后通過(guò)executeSql執(zhí)行sql;它定義了getConnection抽象方法,提供了setSqlParser、registerSqlValidator方法。它有兩個(gè)實(shí)現(xiàn)類分別是DynamicDatasourceSqlProcessor(通過(guò)jdbcUrl來(lái)構(gòu)造連接)、SpringDatasourceSqlProcessor(通過(guò)給定的dataSource獲取連接)。

以上就是PowerJob AbstractSqlProcessor方法工作流程源碼解讀的詳細(xì)內(nèi)容,更多關(guān)于PowerJob AbstractSqlProcessor的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java selenium處理極驗(yàn)滑動(dòng)驗(yàn)證碼示例

    Java selenium處理極驗(yàn)滑動(dòng)驗(yàn)證碼示例

    本篇文章主要介紹了Java selenium處理極驗(yàn)滑動(dòng)驗(yàn)證碼示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2017-10-10
  • Spring?MVC?請(qǐng)求映射路徑的配置實(shí)現(xiàn)前后端交互

    Spring?MVC?請(qǐng)求映射路徑的配置實(shí)現(xiàn)前后端交互

    在Spring?MVC中,請(qǐng)求映射路徑是指與特定的請(qǐng)求處理方法關(guān)聯(lián)的URL路徑,這篇文章主要介紹了Spring?MVC?請(qǐng)求映射路徑的配置,實(shí)現(xiàn)前后端交互,需要的朋友可以參考下
    2023-09-09
  • Springboot @Configuration與自動(dòng)配置詳解

    Springboot @Configuration與自動(dòng)配置詳解

    這篇文章主要介紹了SpringBoot中的@Configuration自動(dòng)配置,在進(jìn)行項(xiàng)目編寫前,我們還需要知道一個(gè)東西,就是SpringBoot對(duì)我們的SpringMVC還做了哪些配置,包括如何擴(kuò)展,如何定制,只有把這些都搞清楚了,我們?cè)谥笫褂貌艜?huì)更加得心應(yīng)手
    2022-07-07
  • Java基礎(chǔ)教程之封裝與接口

    Java基礎(chǔ)教程之封裝與接口

    這篇文章主要介紹了Java基礎(chǔ)教程之封裝與接口,本文用淺顯易懂的語(yǔ)言講解了Java中的封裝與接口,很形象的說(shuō)明了這兩個(gè)面向?qū)ο笮g(shù)語(yǔ),需要的朋友可以參考下
    2014-08-08
  • Java的接口調(diào)用時(shí)的權(quán)限驗(yàn)證功能的實(shí)現(xiàn)

    Java的接口調(diào)用時(shí)的權(quán)限驗(yàn)證功能的實(shí)現(xiàn)

    這篇文章主要介紹了Java的接口調(diào)用時(shí)的權(quán)限驗(yàn)證功能的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • 如何在JAVA中使用Synchronized

    如何在JAVA中使用Synchronized

    這篇文章主要介紹了如何在JAVA中使用Synchronized,文中代碼非常詳細(xì),對(duì)大家的學(xué)習(xí)有所幫助,感興趣的朋友可以參考下
    2020-06-06
  • java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法

    java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法

    這篇文章主要介紹了java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09
  • Spring Boot使用Allatori代碼混淆的方法

    Spring Boot使用Allatori代碼混淆的方法

    這篇文章主要介紹了Spring Boot使用Allatori代碼混淆的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2018-03-03
  • 詳解JAVA 反射機(jī)制

    詳解JAVA 反射機(jī)制

    這篇文章主要介紹了JAVA 反射機(jī)制的相關(guān)知識(shí),文中講解的非常細(xì)致,代碼幫助大家更好的理解學(xué)習(xí),感興趣的朋友可以了解下
    2020-06-06
  • 淺談Java引用和Threadlocal的那些事

    淺談Java引用和Threadlocal的那些事

    這篇文章主要介紹了Java引用和Threadlocal的那些事,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧
    2019-03-03

最新評(píng)論