ShardingSphere如何進(jìn)行sql重寫示例詳解
序
本文主要研究一下ShardingSphere進(jìn)行sql重寫的原理
prepareStatement
org/apache/shardingsphere/driver/jdbc/core/connection/ShardingSphereConnection.java
public final class ShardingSphereConnection extends AbstractConnectionAdapter { @Override public PreparedStatement prepareStatement(final String sql) throws SQLException { return new ShardingSpherePreparedStatement(this, sql); } //...... }
ShardingSphereConnection的prepareStatement創(chuàng)建的是ShardingSpherePreparedStatement
ShardingSpherePreparedStatement
org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
public final class ShardingSpherePreparedStatement extends AbstractPreparedStatementAdapter { @Getter private final ShardingSphereConnection connection; public ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql) throws SQLException { this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT, false, null); } private ShardingSpherePreparedStatement(final ShardingSphereConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability, final boolean returnGeneratedKeys, final String[] columns) throws SQLException { if (Strings.isNullOrEmpty(sql)) { throw new EmptySQLException().toSQLException(); } this.connection = connection; metaDataContexts = connection.getContextManager().getMetaDataContexts(); SQLParserRule sqlParserRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(SQLParserRule.class); hintValueContext = sqlParserRule.isSqlCommentParseEnabled() ? new HintValueContext() : SQLHintUtils.extractHint(sql).orElseGet(HintValueContext::new); this.sql = sqlParserRule.isSqlCommentParseEnabled() ? sql : SQLHintUtils.removeHint(sql); statements = new ArrayList<>(); parameterSets = new ArrayList<>(); SQLParserEngine sqlParserEngine = sqlParserRule.getSQLParserEngine( DatabaseTypeEngine.getTrunkDatabaseTypeName(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType())); sqlStatement = sqlParserEngine.parse(this.sql, true); sqlStatementContext = SQLStatementContextFactory.newInstance(metaDataContexts.getMetaData(), sqlStatement, connection.getDatabaseName()); parameterMetaData = new ShardingSphereParameterMetaData(sqlStatement); statementOption = returnGeneratedKeys ? new StatementOption(true, columns) : new StatementOption(resultSetType, resultSetConcurrency, resultSetHoldability); executor = new DriverExecutor(connection); JDBCExecutor jdbcExecutor = new JDBCExecutor(connection.getContextManager().getExecutorEngine(), connection.getDatabaseConnectionManager().getConnectionContext()); batchPreparedStatementExecutor = new BatchPreparedStatementExecutor(metaDataContexts, jdbcExecutor, connection.getDatabaseName()); kernelProcessor = new KernelProcessor(); statementsCacheable = isStatementsCacheable(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getRuleMetaData()); trafficRule = metaDataContexts.getMetaData().getGlobalRuleMetaData().getSingleRule(TrafficRule.class); selectContainsEnhancedTable = sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsEnhancedTable(); statementManager = new StatementManager(); } //...... }
ShardingSpherePreparedStatement繼承了AbstractPreparedStatementAdapter,其構(gòu)造器主要是通過(guò)SQLParserEngine解析sql得到SQLStatement,創(chuàng)建DriverExecutor、BatchPreparedStatementExecutor、KernelProcessor、StatementManager;這里即使useServerPrepStmts=true,也不會(huì)觸發(fā)mysql server的prepare操作
executeUpdate
public int executeUpdate() throws SQLException { try { if (statementsCacheable && !statements.isEmpty()) { resetParameters(); return statements.iterator().next().executeUpdate(); } clearPrevious(); QueryContext queryContext = createQueryContext(); trafficInstanceId = getInstanceIdAndSet(queryContext).orElse(null); if (null != trafficInstanceId) { JDBCExecutionUnit executionUnit = createTrafficExecutionUnit(trafficInstanceId, queryContext); return executor.getTrafficExecutor().execute(executionUnit, (statement, sql) -> ((PreparedStatement) statement).executeUpdate()); } executionContext = createExecutionContext(queryContext); if (hasRawExecutionRule()) { Collection<ExecuteResult> executeResults = executor.getRawExecutor().execute(createRawExecutionGroupContext(), executionContext.getQueryContext(), new RawSQLExecutorCallback()); return accumulate(executeResults); } return isNeedImplicitCommitTransaction(connection, executionContext) ? executeUpdateWithImplicitCommitTransaction() : useDriverToExecuteUpdate(); // CHECKSTYLE:OFF } catch (final RuntimeException ex) { // CHECKSTYLE:ON handleExceptionInTransaction(connection, metaDataContexts); throw SQLExceptionTransformEngine.toSQLException(ex, metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType().getType()); } finally { clearBatch(); } } private void clearPrevious() { statements.clear(); parameterSets.clear(); generatedValues.clear(); } private ExecutionContext createExecutionContext(final QueryContext queryContext) { ShardingSphereRuleMetaData globalRuleMetaData = metaDataContexts.getMetaData().getGlobalRuleMetaData(); ShardingSphereDatabase currentDatabase = metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()); SQLAuditEngine.audit(queryContext.getSqlStatementContext(), queryContext.getParameters(), globalRuleMetaData, currentDatabase, null, queryContext.getHintValueContext()); ExecutionContext result = kernelProcessor.generateExecutionContext( queryContext, currentDatabase, globalRuleMetaData, metaDataContexts.getMetaData().getProps(), connection.getDatabaseConnectionManager().getConnectionContext()); findGeneratedKey(result).ifPresent(optional -> generatedValues.addAll(optional.getGeneratedValues())); return result; }
這里executeUpdate會(huì)先執(zhí)行clearPrevious方法,清空statements、parameterSets、generatedValues,然后createExecutionContext,這里有一步是kernelProcessor.generateExecutionContext
KernelProcessor
generateExecutionContext
shardingsphere-infra-context-5.4.0-sources.jar!/org/apache/shardingsphere/infra/connection/kernel/KernelProcessor.java
public ExecutionContext generateExecutionContext(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData, final ConfigurationProperties props, final ConnectionContext connectionContext) { RouteContext routeContext = route(queryContext, database, globalRuleMetaData, props, connectionContext); SQLRewriteResult rewriteResult = rewrite(queryContext, database, globalRuleMetaData, props, routeContext, connectionContext); ExecutionContext result = createExecutionContext(queryContext, database, routeContext, rewriteResult); logSQL(queryContext, props, result); return result; }
KernelProcessor的generateExecutionContext方法先創(chuàng)建routeContext,然后執(zhí)行rewrite,最后執(zhí)行createExecutionContext
rewrite
private SQLRewriteResult rewrite(final QueryContext queryContext, final ShardingSphereDatabase database, final ShardingSphereRuleMetaData globalRuleMetaData, final ConfigurationProperties props, final RouteContext routeContext, final ConnectionContext connectionContext) { SQLRewriteEntry sqlRewriteEntry = new SQLRewriteEntry(database, globalRuleMetaData, props); return sqlRewriteEntry.rewrite(queryContext.getSql(), queryContext.getParameters(), queryContext.getSqlStatementContext(), routeContext, connectionContext, queryContext.getHintValueContext()); }
rewrite主要是通過(guò)SQLRewriteEntry的rewrite方法進(jìn)行的
SQLRewriteEntry
shardingsphere-infra-rewrite-5.4.0-sources.jar!/org/apache/shardingsphere/infra/rewrite/SQLRewriteEntry.java
/** * Rewrite. * * @param sql SQL * @param params SQL parameters * @param sqlStatementContext SQL statement context * @param routeContext route context * @param connectionContext connection context * @param hintValueContext hint value context * * @return route unit and SQL rewrite result map */ public SQLRewriteResult rewrite(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext, final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) { SQLRewriteContext sqlRewriteContext = createSQLRewriteContext(sql, params, sqlStatementContext, routeContext, connectionContext, hintValueContext); SQLTranslatorRule rule = globalRuleMetaData.getSingleRule(SQLTranslatorRule.class); DatabaseType protocolType = database.getProtocolType(); Map<String, DatabaseType> storageTypes = database.getResourceMetaData().getStorageTypes(); return routeContext.getRouteUnits().isEmpty() ? new GenericSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext) : new RouteSQLRewriteEngine(rule, protocolType, storageTypes).rewrite(sqlRewriteContext, routeContext); } private SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> params, final SQLStatementContext sqlStatementContext, final RouteContext routeContext, final ConnectionContext connectionContext, final HintValueContext hintValueContext) { SQLRewriteContext result = new SQLRewriteContext(database.getName(), database.getSchemas(), sqlStatementContext, sql, params, connectionContext, hintValueContext); decorate(decorators, result, routeContext, hintValueContext); result.generateSQLTokens(); return result; } private void decorate(final Map<ShardingSphereRule, SQLRewriteContextDecorator> decorators, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final HintValueContext hintValueContext) { if (hintValueContext.isSkipSQLRewrite()) { return; } for (Entry<ShardingSphereRule, SQLRewriteContextDecorator> entry : decorators.entrySet()) { entry.getValue().decorate(entry.getKey(), props, sqlRewriteContext, routeContext); } }
SQLRewriteEntry的rewrite方法,先通過(guò)createSQLRewriteContext來(lái)創(chuàng)建SQLRewriteContext,這里通過(guò)decorate方法遍歷decorators,挨個(gè)執(zhí)行SQLRewriteContextDecorator的decorate方法;最后通過(guò)GenericSQLRewriteEngine或者RouteSQLRewriteEngine進(jìn)行rewrite
SQLRewriteContextDecorator
org/apache/shardingsphere/infra/rewrite/context/SQLRewriteContextDecorator.java
@SingletonSPI public interface SQLRewriteContextDecorator<T extends ShardingSphereRule> extends OrderedSPI<T> { /** * Decorate SQL rewrite context. * * @param rule rule * @param props ShardingSphere properties * @param sqlRewriteContext SQL rewrite context to be decorated * @param routeContext route context */ void decorate(T rule, ConfigurationProperties props, SQLRewriteContext sqlRewriteContext, RouteContext routeContext); }
SQLRewriteContextDecorator定義了decorate方法,它有諸如ShardingSQLRewriteContextDecorator、EncryptSQLRewriteContextDecorator的實(shí)現(xiàn)類
EncryptSQLRewriteContextDecorator
org/apache/shardingsphere/encrypt/rewrite/context/EncryptSQLRewriteContextDecorator.java
/** * SQL rewrite context decorator for encrypt. */ public final class EncryptSQLRewriteContextDecorator implements SQLRewriteContextDecorator<EncryptRule> { @Override public void decorate(final EncryptRule encryptRule, final ConfigurationProperties props, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) { SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext(); if (!containsEncryptTable(encryptRule, sqlStatementContext)) { return; } Collection<EncryptCondition> encryptConditions = createEncryptConditions(encryptRule, sqlRewriteContext); if (!sqlRewriteContext.getParameters().isEmpty()) { Collection<ParameterRewriter> parameterRewriters = new EncryptParameterRewriterBuilder(encryptRule, sqlRewriteContext.getDatabaseName(), sqlRewriteContext.getSchemas(), sqlStatementContext, encryptConditions).getParameterRewriters(); rewriteParameters(sqlRewriteContext, parameterRewriters); } Collection<SQLTokenGenerator> sqlTokenGenerators = new EncryptTokenGenerateBuilder(encryptRule, sqlStatementContext, encryptConditions, sqlRewriteContext.getDatabaseName()).getSQLTokenGenerators(); sqlRewriteContext.addSQLTokenGenerators(sqlTokenGenerators); } private Collection<EncryptCondition> createEncryptConditions(final EncryptRule encryptRule, final SQLRewriteContext sqlRewriteContext) { SQLStatementContext sqlStatementContext = sqlRewriteContext.getSqlStatementContext(); if (!(sqlStatementContext instanceof WhereAvailable)) { return Collections.emptyList(); } Collection<WhereSegment> whereSegments = ((WhereAvailable) sqlStatementContext).getWhereSegments(); Collection<ColumnSegment> columnSegments = ((WhereAvailable) sqlStatementContext).getColumnSegments(); return new EncryptConditionEngine(encryptRule, sqlRewriteContext.getSchemas()) .createEncryptConditions(whereSegments, columnSegments, sqlStatementContext, sqlRewriteContext.getDatabaseName()); } private boolean containsEncryptTable(final EncryptRule encryptRule, final SQLStatementContext sqlStatementContext) { for (String each : sqlStatementContext.getTablesContext().getTableNames()) { if (encryptRule.findEncryptTable(each).isPresent()) { return true; } } return false; } private void rewriteParameters(final SQLRewriteContext sqlRewriteContext, final Collection<ParameterRewriter> parameterRewriters) { for (ParameterRewriter each : parameterRewriters) { each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters()); } } @Override public int getOrder() { return EncryptOrder.ORDER; } @Override public Class<EncryptRule> getTypeClass() { return EncryptRule.class; } }
rewriteParameters是通過(guò)ParameterRewriter進(jìn)行rewrite,主要是修改ParameterBuilder;而具體sql語(yǔ)句的修改則通過(guò)sqlTokenGenerators進(jìn)行
SQLToken
@RequiredArgsConstructor @Getter public abstract class SQLToken implements Comparable<SQLToken> { private final int startIndex; @Override public final int compareTo(final SQLToken sqlToken) { return startIndex - sqlToken.startIndex; } }
SQLToken它有諸如InsertValuesToken、SubstitutableColumnNameToken、InsertColumnsToken之類的實(shí)現(xiàn)類
RouteSQLRewriteEngine
/** * Rewrite SQL and parameters. * * @param sqlRewriteContext SQL rewrite context * @param routeContext route context * @return SQL rewrite result */ public RouteSQLRewriteResult rewrite(final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext) { Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits = new LinkedHashMap<>(routeContext.getRouteUnits().size(), 1F); for (Entry<String, Collection<RouteUnit>> entry : aggregateRouteUnitGroups(routeContext.getRouteUnits()).entrySet()) { Collection<RouteUnit> routeUnits = entry.getValue(); if (isNeedAggregateRewrite(sqlRewriteContext.getSqlStatementContext(), routeUnits)) { sqlRewriteUnits.put(routeUnits.iterator().next(), createSQLRewriteUnit(sqlRewriteContext, routeContext, routeUnits)); } else { addSQLRewriteUnits(sqlRewriteUnits, sqlRewriteContext, routeContext, routeUnits); } } return new RouteSQLRewriteResult(translate(sqlRewriteContext.getSqlStatementContext().getSqlStatement(), sqlRewriteUnits)); } private void addSQLRewriteUnits(final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits, final SQLRewriteContext sqlRewriteContext, final RouteContext routeContext, final Collection<RouteUnit> routeUnits) { for (RouteUnit each : routeUnits) { sqlRewriteUnits.put(each, new SQLRewriteUnit(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeContext, each))); } } private Map<RouteUnit, SQLRewriteUnit> translate(final SQLStatement sqlStatement, final Map<RouteUnit, SQLRewriteUnit> sqlRewriteUnits) { Map<RouteUnit, SQLRewriteUnit> result = new LinkedHashMap<>(sqlRewriteUnits.size(), 1F); for (Entry<RouteUnit, SQLRewriteUnit> entry : sqlRewriteUnits.entrySet()) { DatabaseType storageType = storageTypes.get(entry.getKey().getDataSourceMapper().getActualName()); String sql = translatorRule.translate(entry.getValue().getSql(), sqlStatement, protocolType, storageType); SQLRewriteUnit sqlRewriteUnit = new SQLRewriteUnit(sql, entry.getValue().getParameters()); result.put(entry.getKey(), sqlRewriteUnit); } return result; }
addSQLRewriteUnits是往sqlRewriteUnits添加SQLRewriteUnit,最后translate方法構(gòu)建SQLRewriteUnit;SQLRewriteUnit包含了更改之后的sql以及對(duì)應(yīng)改動(dòng)后的參數(shù)
useDriverToExecuteUpdate
org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
private int useDriverToExecuteUpdate() throws SQLException { ExecutionGroupContext<JDBCExecutionUnit> executionGroupContext = createExecutionGroupContext(); cacheStatements(executionGroupContext.getInputGroups()); return executor.getRegularExecutor().executeUpdate(executionGroupContext, executionContext.getQueryContext(), executionContext.getRouteContext().getRouteUnits(), createExecuteUpdateCallback()); } private ExecutionGroupContext<JDBCExecutionUnit> createExecutionGroupContext() throws SQLException { DriverExecutionPrepareEngine<JDBCExecutionUnit, Connection> prepareEngine = createDriverExecutionPrepareEngine(); return prepareEngine.prepare(executionContext.getRouteContext(), executionContext.getExecutionUnits(), new ExecutionGroupReportContext(connection.getDatabaseName())); } private void cacheStatements(final Collection<ExecutionGroup<JDBCExecutionUnit>> executionGroups) throws SQLException { for (ExecutionGroup<JDBCExecutionUnit> each : executionGroups) { each.getInputs().forEach(eachInput -> { statements.add((PreparedStatement) eachInput.getStorageResource()); parameterSets.add(eachInput.getExecutionUnit().getSqlUnit().getParameters()); }); } replay(); } private void replay() throws SQLException { replaySetParameter(); for (Statement each : statements) { getMethodInvocationRecorder().replay(each); } } private void replaySetParameter() throws SQLException { for (int i = 0; i < statements.size(); i++) { replaySetParameter(statements.get(i), parameterSets.get(i)); } } protected final void replaySetParameter(final PreparedStatement preparedStatement, final List<Object> params) throws SQLException { setParameterMethodInvocations.clear(); addParameters(params); for (PreparedStatementInvocationReplayer each : setParameterMethodInvocations) { each.replayOn(preparedStatement); } } private void addParameters(final List<Object> params) { int i = 0; for (Object each : params) { int index = ++i; setParameterMethodInvocations.add(preparedStatement -> preparedStatement.setObject(index, each)); } }
useDriverToExecuteUpdate方法會(huì)執(zhí)行createExecutionGroupContext(會(huì)執(zhí)行prepare方法
),cacheStatements這里主要是把eachInput.getStorageResource()真正的PrepareStatement賦值到ShardingSpherePreparedStatement的statements變量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()賦值到parameterSets,然后執(zhí)行replay方法通過(guò)PreparedStatementInvocationReplayer把修改后的變量replay到真正的PrepareStatement
該方法委托給executor.getRegularExecutor().executeUpdate,最后一個(gè)參數(shù)為callback,即createExecuteUpdateCallback
DriverExecutionPrepareEngine.prepare
org/apache/shardingsphere/infra/executor/sql/prepare/AbstractExecutionPrepareEngine.java
public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Collection<ExecutionUnit> executionUnits, final ExecutionGroupReportContext reportContext) throws SQLException { return prepare(routeContext, Collections.emptyMap(), executionUnits, reportContext); } public final ExecutionGroupContext<T> prepare(final RouteContext routeContext, final Map<String, Integer> connectionOffsets, final Collection<ExecutionUnit> executionUnits, final ExecutionGroupReportContext reportContext) throws SQLException { Collection<ExecutionGroup<T>> result = new LinkedList<>(); for (Entry<String, List<SQLUnit>> entry : aggregateSQLUnitGroups(executionUnits).entrySet()) { String dataSourceName = entry.getKey(); List<SQLUnit> sqlUnits = entry.getValue(); List<List<SQLUnit>> sqlUnitGroups = group(sqlUnits); ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY; result.addAll(group(dataSourceName, connectionOffsets.getOrDefault(dataSourceName, 0), sqlUnitGroups, connectionMode)); } return decorate(routeContext, result, reportContext); } protected List<ExecutionGroup<T>> group(final String dataSourceName, final int connectionOffset, final List<List<SQLUnit>> sqlUnitGroups, final ConnectionMode connectionMode) throws SQLException { List<ExecutionGroup<T>> result = new LinkedList<>(); List<C> connections = databaseConnectionManager.getConnections(dataSourceName, connectionOffset, sqlUnitGroups.size(), connectionMode); int count = 0; for (List<SQLUnit> each : sqlUnitGroups) { result.add(createExecutionGroup(dataSourceName, each, connections.get(count++), connectionMode)); } return result; } private ExecutionGroup<T> createExecutionGroup(final String dataSourceName, final List<SQLUnit> sqlUnits, final C connection, final ConnectionMode connectionMode) throws SQLException { List<T> result = new LinkedList<>(); for (SQLUnit each : sqlUnits) { result.add((T) sqlExecutionUnitBuilder.build(new ExecutionUnit(dataSourceName, each), statementManager, connection, connectionMode, option, databaseTypes.get(dataSourceName))); } return new ExecutionGroup<>(result); }
group方法調(diào)用遍歷SQLUnit執(zhí)行createExecutionGroup,而后者則執(zhí)行sqlExecutionUnitBuilder.build;這里databaseConnectionManager.getConnections獲取的connection是通過(guò)真正driver獲取的connection(com.mysql.jdbc.Driver)
PreparedStatementExecutionUnitBuilder
org/apache/shardingsphere/infra/executor/sql/prepare/driver/jdbc/builder/PreparedStatementExecutionUnitBuilder.java
public JDBCExecutionUnit build(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { PreparedStatement preparedStatement = createPreparedStatement( executionUnit, statementManager, connection, connectionMode, option, databaseType); return new JDBCExecutionUnit(executionUnit, connectionMode, preparedStatement); } private PreparedStatement createPreparedStatement(final ExecutionUnit executionUnit, final ExecutorJDBCStatementManager statementManager, final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { return (PreparedStatement) statementManager.createStorageResource(executionUnit, connection, connectionMode, option, databaseType); }
PreparedStatementExecutionUnitBuilder的build方法這里才真正創(chuàng)建PreparedStatement
StatementManager
org/apache/shardingsphere/driver/jdbc/core/statement/StatementManager.java
public Statement createStorageResource(final ExecutionUnit executionUnit, final Connection connection, final ConnectionMode connectionMode, final StatementOption option, final DatabaseType databaseType) throws SQLException { Statement result = cachedStatements.get(new CacheKey(executionUnit, connectionMode)); if (null == result || result.isClosed() || result.getConnection().isClosed()) { String sql = executionUnit.getSqlUnit().getSql(); if (option.isReturnGeneratedKeys()) { result = null == option.getColumns() || 0 == option.getColumns().length ? connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS) : connection.prepareStatement(sql, option.getColumns()); } else { result = connection.prepareStatement(sql, option.getResultSetType(), option.getResultSetConcurrency(), option.getResultSetHoldability()); } cachedStatements.put(new CacheKey(executionUnit, connectionMode), result); } return result; }
createStorageResource則是通過(guò)connection.prepareStatement來(lái)創(chuàng)建真正的PrepareStatement,而此時(shí)傳入的sql也是經(jīng)過(guò)重寫之后的sql
createExecuteUpdateCallback
org/apache/shardingsphere/driver/jdbc/core/statement/ShardingSpherePreparedStatement.java
private JDBCExecutorCallback<Integer> createExecuteUpdateCallback() { boolean isExceptionThrown = SQLExecutorExceptionHandler.isExceptionThrown(); return new JDBCExecutorCallback<Integer>(metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getProtocolType(), metaDataContexts.getMetaData().getDatabase(connection.getDatabaseName()).getResourceMetaData().getStorageTypes(), sqlStatement, isExceptionThrown) { @Override protected Integer executeSQL(final String sql, final Statement statement, final ConnectionMode connectionMode, final DatabaseType storageType) throws SQLException { return ((PreparedStatement) statement).executeUpdate(); } @Override protected Optional<Integer> getSaneResult(final SQLStatement sqlStatement, final SQLException ex) { return Optional.empty(); } }; }
createExecuteUpdateCallback創(chuàng)建的JDBCExecutorCallback,其executeSQL方法則是通過(guò)((PreparedStatement) statement).executeUpdate()來(lái)執(zhí)行,即委托給了真正的PreparedStatement
小結(jié)
- ShardingSphereConnection的prepareStatement創(chuàng)建的是ShardingSpherePreparedStatement,它在ShardingSpherePreparedStatement的executeUpdate的時(shí)候進(jìn)行sql重寫,然后prepare,最后執(zhí)行的時(shí)候是通過(guò)JDBCExecutorCallback,其executeSQL方法則是通過(guò)((PreparedStatement) statement).executeUpdate()來(lái)執(zhí)行,即委托給了真正的PreparedStatement
- rewriteParameters是通過(guò)ParameterRewriter進(jìn)行rewrite,主要是修改ParameterBuilder;而具體sql語(yǔ)句的修改則通過(guò)sqlTokenGenerators進(jìn)行
- PreparedStatementExecutionUnitBuilder的build方法這里才真正創(chuàng)建PreparedStatement:它通過(guò)StatementManager.createStorageResource則是通過(guò)connection.prepareStatement來(lái)創(chuàng)建真正的PrepareStatement,而此時(shí)傳入的sql也是經(jīng)過(guò)重寫之后的sql
useDriverToExecuteUpdate方法會(huì)執(zhí)行createExecutionGroupContext(
會(huì)執(zhí)行prepare方法
),cacheStatements這里主要是把eachInput.getStorageResource()真正的PrepareStatement賦值到ShardingSpherePreparedStatement的statements變量中,把eachInput.getExecutionUnit().getSqlUnit().getParameters()賦值到parameterSets,然后執(zhí)行replay方法通過(guò)PreparedStatementInvocationReplayer把修改后的變量replay到真正的PrepareStatementShardingSpherePreparedStatement實(shí)現(xiàn)了java.sql.PreparedStatement接口,其sql屬性是用戶傳入的sql,即未經(jīng)過(guò)重寫的sql,而實(shí)際execute的時(shí)候,會(huì)觸發(fā)sql重寫(包括重寫sql語(yǔ)句及參數(shù)),最后會(huì)通過(guò)connection.prepareStatement(傳入重寫之后的sql)來(lái)創(chuàng)建真正的PrepareStatement,然后有一步replay操作,把重寫后的參數(shù)作用到真正的PrepareStatement,最后通過(guò)((PreparedStatement) statement).executeUpdate()來(lái)觸發(fā)執(zhí)行至此我們可以得到sql重寫的一個(gè)基本思路:通過(guò)實(shí)現(xiàn)java.sql.PreparedStatement接口偽裝一個(gè)PreparedStatement類,其創(chuàng)建和set參數(shù)先內(nèi)存緩存起來(lái),之后在execute的時(shí)候進(jìn)行sql重寫,創(chuàng)建真正的PreparedStatement,replay參數(shù),執(zhí)行execute方法
以上就是ShardingSphere如何進(jìn)行sql重寫示例詳解的詳細(xì)內(nèi)容,更多關(guān)于ShardingSphere重寫sql的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
MyBatisPlus報(bào)錯(cuò):Failed to process,please exclud
這篇文章主要介紹了MyBatisPlus報(bào)錯(cuò):Failed to process,please exclude the tableName or statementId問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08Mybatis調(diào)用SQL?Server存儲(chǔ)過(guò)程的實(shí)現(xiàn)示例
在軟件開發(fā)過(guò)程中,經(jīng)常會(huì)使用到存儲(chǔ)過(guò)程,本文就來(lái)介紹一下Mybatis調(diào)用SQL?Server存儲(chǔ)過(guò)程的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01詳解JDK 5 Annotation 注解之@Target的用法介紹
這篇文章主要介紹了詳解JDK 5 Annotation 注解之@Target的用法介紹,需要的朋友可以參考下2016-02-02springboot啟動(dòng)的注意事項(xiàng)之不同包下有同樣名字的class類問(wèn)題
這篇文章主要介紹了springboot啟動(dòng)的注意事項(xiàng)之不同包下有同樣名字的class類問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-06-06利用SpringDataJPA開啟審計(jì)功能,自動(dòng)保存操作人操作時(shí)間
這篇文章主要介紹了利用SpringDataJPA開啟審計(jì)功能,自動(dòng)保存操作人操作時(shí)間,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12Java實(shí)現(xiàn)ftp上傳下載、刪除文件及在ftp服務(wù)器上傳文件夾的方法
這篇文章主要介紹了Java實(shí)現(xiàn)ftp上傳下載、刪除文件及在ftp服務(wù)器上傳文件夾的方法,需要的朋友可以參考下2015-11-11java?Springboot對(duì)接開發(fā)微信支付詳細(xì)流程
最近要做一個(gè)微信小程序,需要微信支付,所以研究了下怎么在java上集成微信支付功能,下面這篇文章主要給大家介紹了關(guān)于java?Springboot對(duì)接開發(fā)微信支付的相關(guān)資料,需要的朋友可以參考下2024-08-08Java之SpringCloudAlibaba Sentinel組件案例講解
這篇文章主要介紹了Java之SpringCloudAlibaba Sentinel組件案例講解,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-07-07