文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Sharding-JDBC源码解析与vivo的定制开发

2024-11-30 00:19

关注

本文源码基于Sharding-JDBC 4.1.1版本。

一、业务背景

随着业务并发请求和数据规模的不断扩大,单节点库表压力往往会成为系统的性能瓶颈。公司IT内部营销库存、交易订单、财经台账、考勤记录等多领域的业务场景的日增数据量巨大,存在着数据库节点压力过大、连接过多、查询速度变慢等情况,根据数据来源、时间、工号等信息来将没有联系的数据尽量均分到不同的库表中,从而在不影响业务需求的前提下,减轻数据库节点压力,提升查询效率和系统稳定性。

二、技术选型

我们对比了几款比较常见的支持分库分表和读写分离的中间件。

Sharding-JDBC作为轻量化的增强版的JDBC框架,相较其他中间件性能更好,接入难度更低,其数据分片、读写分离功能也覆盖了我们的业务诉求,因此我们在业务中广泛使用了Sharding-JDBC。但在使用Sharding-JDBC的过程中,我们也发现了诸多问题,为了业务更便捷的使用Sharding-JDBC,我们对源码做了针对性的定制开发和组件封装来满足业务需求。

三、源码解析

3.1 引言

Sharding-JDBC作为基于JDBC的数据库中间件,实现了JDBC的标准api,Sharding-JDBC与原生JDBC的执行对比流程如下图所示:


相关执行流程的代码样例如下:

JDBC执行样例

//获取数据库连接
try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) {
    String sql = "SELECT * FROM  t_user WHERE name = ?";
    //预编译SQL
    try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
        //参数设置与执行
        preparedStatement.setString(1, "vivo");
        preparedStatement.execute(sql);
        //获取结果集
        try (ResultSet resultSet = preparedStatement.getResultSet()) {
            while (resultSet.next()) {
                //处理结果
            }
        }
    }
}

Sharding-JDBC 源码

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute
    public boolean execute() throws SQLException {
        try {
            clearPrevious();
            //解析+路由+重写 内部调用BasePrepareEngine#prepare方法
            prepare();
            initPreparedStatementExecutor();
            //执行
            return preparedStatementExecutor.execute();
        } finally {
            clearBatch();
        }
    }
 
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
    public ExecutionContext prepare(final String sql, final List parameters) {
        List clonedParameters = cloneParameters(parameters);
        //解析+路由(executeRoute内部先进行解析再执行路由)
        RouteContext routeContext = executeRoute(sql, clonedParameters);
        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
        //重写
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
        if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
            SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
        }
        return result;
    }
 
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
    public ResultSet getResultSet() throws SQLException {
        if (null != currentResultSet) {
            return currentResultSet;
        }
        if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
            List resultSets = getResultSets();
            //归并结果集
            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        }
        return currentResultSet;
    }

从对比的执行流程图可见:

  • 【JDBC】:执行的主要流程是通过Datasource获取Connection,再注入SQL语句生成PreparedStatement对象,PreparedStatement设置占位符参数执行后得到结果集ResultSet。
  • 【Sharding-JDBC】:主要流程基本一致,但Sharding基于PreparedStatement进行了实现与扩展,具体实现类ShardingPreparedStatement中会抽象出解析、路由、重写、归并等引擎,从而实现分库分表、读写分离等能力,每个引擎的作用说明如下表所示:

/ public String route(final SQLStatement sqlStatement) { if (isMasterRoute(sqlStatement)) { MasterVisitedManager.setMasterVisited(); return masterSlaveRule.getMasterDataSourceName(); } return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource( masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())); } private boolean isMasterRoute(final SQLStatement sqlStatement) { return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly(); } private boolean containsLockSegment(final SQLStatement sqlStatement) { return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent(); } }

是否走主库的信息存在MasterVisitedManager中,MasterVisitedManager是通过ThreadLocal实现的,但这种实现会有一个问题,当我们使用事务先查询再更新/插入时,第一条查询SQL并不会走主库,而是走从库,如果业务需要事务的第一条查询也走主库,事务查询前需要手动调用一次MasterVisitedManager.setMasterVisited()。

MasterVisitedManager

public final class MasterVisitedManager {
     
    private static final ThreadLocal MASTER_VISITED = ThreadLocal.withInitial(() -> false);
     
    
    public static boolean isMasterVisited() {
        return MASTER_VISITED.get();
    }
     
    
    public static void setMasterVisited() {
        MASTER_VISITED.set(true);
    }
     
    
    public static void clear() {
        MASTER_VISITED.remove();
    }
}

3.3.2 引擎总结

路由引擎的作用是将SQL根据参数通过实现的策略算法计算出实际该在哪些库的哪些表执行,也就是路由结果。路由引擎有两种实现,分别是分片路由和主从路由,两者都提供了标准化的策略接口来让业务实现自己的路由策略,分片路由需要注意自身SQL场景和策略算法相匹配,主从路由中同一线程且同一数据库连接内,有写入操作后,之后的读操作会从主库读取,写入操作前的读操作不会走主库。

3.4 改写引擎

3.4.1 引擎解析

经过解析路由后虽然确定了执行的实际库表,但SQL中表名依旧是逻辑表,不能执行,改写引擎可以将逻辑表替换为物理表。同时,路由至多库表的SQL也需要拆分为多条SQL执行。

改写的入口仍旧在BasePrepareEngine中,创建重写上下文createSQLRewriteContext,再根据上下文进行改写rewrite,最终返回执行单元ExecutionUnit。

改写逻辑入口

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite
    private Collection executeRewrite(final String sql, final List parameters, final RouteContext routeContext) {
        //注册重写装饰器
        registerRewriteDecorator();
        //创建 SQLRewriteContext
        SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
        //重写
        return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
    }

执行单元包含了数据源名称,改写后的SQL,以及对应的参数,SQL一样的两个SQLUnit会被视为相等。

ExecutionUnit

@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根据sql判断是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
 
    private String sql;
 
    private final List parameters;
 
}

createSQLRewriteContext完成了两件事,一个是对SQL参数进行了重写,一个是生成了SQLToken。

createSQLRewriteContext

org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext
    public SQLRewriteContext createSQLRewriteContext(final String sql, final List parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
        SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
        //sql参数重写
        decorate(decorators, result, routeContext);
        //生成SQLToken
        result.generateSQLTokens();
        return result;
    }
 
org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate
    public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
        for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
            if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
                //参数重写
                each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
            }
        }
        //sqlTokenGenerators
        sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
    }
 
org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens
    public void generateSQLTokens() {
        sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
    }

ParameterRewriter中与分片相关的实现有两种。

/ private boolean skipSharding; private boolean masterRoute; public static boolean isSkipSharding() { return SKIP_CONTEXT_HOLDER.get().skipSharding; } public static void setSkipSharding(boolean skipSharding) { SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding; } public static boolean isMasterRoute() { return SKIP_CONTEXT_HOLDER.get().masterRoute; } public static void setMasterRoute(boolean masterRoute) { SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute; } public static void clear(){ SKIP_CONTEXT_HOLDER.remove(); } }

判断SQL是否包含分片表

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext
// 判断是否可以跳过sharding,构造RuleContextManager的值
private void buildSkipContext(final String sql){
    Set sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[\\s]")));
        if (CollectionUtils.isNotEmpty(rules)) {
            for (BaseRule baseRule : rules) {
                //定制方法,ShardingRule实现,判断sqlTokenSet是否包含逻辑表即可
                if(baseRule.hasContainShardingTable(sqlTokenSet)){
                    RuleContextManager.setSkipSharding(false);
                    break;
                }else {
                    RuleContextManager.setSkipSharding(true);
                }
            }
        }
}
 
org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTable
public Boolean hasContainShardingTable(Set sqlTokenSet) {
      //logicTableNameList通过遍历TableRule可以得到
       for (String logicTable : logicTableNameList) {
            if (sqlTokenSet.contains(logicTable)) {
                return true;
            }
        }
        return false;
    }

(2)跳过解析路由:通过RuleContextManager中的skipSharding判断是否需要跳过Sharding解析路由,但为了兼容读写分离的场景,我们还需要知道这条SQL应该走主库还是从库,走主库的场景在后面强制路由主库部分有说明,SQL走主库实际上只有两种情况,一种是非SELECT语句,另一种就是SELECT语句带锁,如SELECT...FOR UPDATE,因此整体实现的步骤如下:

  • 如果标记了跳过Sharding且不为select语句,直接返回SkipShardingStatement,单独构造一个SkipShardingStatement的目的是为了能利用解析引擎中的缓存,缓存中不能放入null值。
  • 如果是select语句需要继续解析,判断是否有锁后直接返回,避免后续解析造成语法不兼容,这里也曾尝试用反射获取lockClause来判断是否包含锁,但最终没有成功。
  • ShardingRouteDecorator根据

   RuleContextManager.isSkipSharding判断是否跳过路由。

跳过解析路由

public class SkipShardingStatement implements SQLStatement{
    @Override
    public int getParameterCount() {
        return 0;
    }
}
 
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0
    private SQLStatement parse0(final String sql, final boolean useCache) {
        if (useCache) {
            Optional cachedSQLStatement = cache.getSQLStatement(sql);
            if (cachedSQLStatement.isPresent()) {
                return cachedSQLStatement.get();
            }
        }
        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
        
        SQLStatement result ;
        if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){
            RuleContextManager.setMasterRoute(true);
            result = new SkipShardingStatement();
        }else {
            result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
        }
        if (useCache) {
            cache.put(sql, result);
        }
        return result;
    }
 
org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause
    public ASTNode visitSelectClause(final SelectClauseContext ctx) {
        SelectStatement result = new SelectStatement();
        // 跳过sharding 只需要判断是否有锁来决定是否路由至主库即可
        if(RuleContextManager.isSkipSharding()){
            if (null != ctx.lockClause()) {
                result.setLock((LockSegment) visit(ctx.lockClause()));
                RuleContextManager.setMasterRoute(true);
            }
            return result;
        }
        //...后续解析
    }
 
org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext
    private RouteContext createRouteContext(final String sql, final List parameters, final boolean useCache) {
        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
        //如果需要跳过sharding 不进行后续的解析直接返回
        if (RuleContextManager.isSkipSharding()) {
            return new RouteContext(sqlStatement, parameters, new RouteResult());
        }
        //...解析
    }
 
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        // 跳过sharding路由
        if(RuleContextManager.isSkipSharding()){
            return routeContext;
        }
        //...路由
    }

(3)手动构造ExecutionUnit:ExecutionUnit中我们需要确定的内容就是datasourceName,这里我们认为跳过Sharding的SQL最终执行的库一定只有一个。如果只是跳过Sharding的情况,直接从元数据中获取数据源名称即可,如果存在读写分离的情况,主从路由的结果也一定是唯一的。创建完ExecutionUnit直接放入ExecutionContext返回即可,从而跳过后续的改写逻辑。

手动构造ExecutionUnit

public ExecutionContext prepare(final String sql, final List parameters) {
    List clonedParameters = cloneParameters(parameters);
    // 判断是否可以跳过sharding,构造RuleContextManager的值
    buildSkipContext(sql);  
    RouteContext routeContext = executeRoute(sql, clonedParameters);
    ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
    // 跳过sharding的sql最后的路由结果一定只有一个库
    if(RuleContextManager.isSkipSharding()){
        log.debug("可以跳过sharding的场景 {}", sql);
        if(!Objects.isNull(routeContext.getRouteResult())){
            Collection allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames();
            int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size();
            
            if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){
                throw new ShardingSphereException("可以跳过sharding,但是路由结果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits());
            }
            Collection actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames();
            // 手动创建执行单元
            String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next();
            ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters));
            result.getExecutionUnits().add(executionUnit);
            //标记该结果需要跳过
            result.setSkipShardingScenarioFlag(true);
        }
    }else {
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
    }
    if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
        SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
    }
    return result;
}

(4)跳过合并:跳过查询结果的合并和影响行数计算的合并,注意ShardingPreparedStatement和ShardingStatement都需要跳过

跳过合并

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            List queryResults = preparedStatementExecutor.executeQuery();
            List resultSets = preparedStatementExecutor.getResultSets();
        // 定制开发,不分片跳过合并
            if(executionContext.isSkipShardingScenarioFlag()){
                return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
            }
            MergedResult mergedResult = mergeQuery(queryResults);
            result = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
    public ResultSet getResultSet() throws SQLException {
        if (null != currentResultSet) {
            return currentResultSet;
        }
        List resultSets = getResultSets();
        // 定制开发,不分片跳过合并
        if(executionContext.isSkipShardingScenarioFlag()){
            return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
        }
 
        if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        }
        return currentResultSet;
    }
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate
    public boolean isAccumulate() {
        //定制开发,不分片跳过计算
        if(executionContext.isSkipShardingScenarioFlag()){
            return false;
        }
        return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
    }

(5)清空RuleContextManager:查看一下Sharding-JDBC其他ThreadLocal的清空位置,对应的清空RuleContextManager就好。

清空ThreadLocal

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#close
public final void close() throws SQLException {
        closed = true;
        MasterVisitedManager.clear();
        TransactionTypeHolder.clear();
        RuleContextManager.clear();
        int connectionSize = cachedConnections.size();
        try {
            forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close());
        } finally {
            cachedConnections.clear();
            rootInvokeHook.finish(connectionSize);
        }
    }

举个例子,比如Sharding-JDBC本身是不支持INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE col3 = ?    这种语法的,会报空指针异常。

经过我们上述改造验证后,非分片表是可以跳过语法限制执行如下的SQL的。

通过该功能的实现,业务可以更关注与分片表的SQL改造,而无需担心引入Sharding-JDBC造成所有SQL的验证改造,大幅减少改造成本和风险。

4.2 强制路由主库

Sharding-JDBC可以通过配置主从库数据源方便的实现读写分离的功能,但使用读写分离就必须面对主从延迟和从库失联的痛点,针对这一问题,我们实现了强制路由主库的动态配置,当主从延迟过大或从库失联时,通过修改配置来实现SQL语句强制走主库的不停机路由切换。

后面会说明了配置的动态生效的实现方式,这里只说明强制路由主库的实现,我们直接使用前文的RuleContextManager即可,在主从路由引擎里判断下是否开启了强制主库路由。

MasterSlaveRouteDecorator.decorate改造

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
        
        if(properties.getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){
            MasterVisitedManager.setMasterVisited();
        }
        //...路由逻辑
        return routeContext;
    }

为了兼容之前跳过Sharding的功能,我们需要同步修改下isMasterRoute方法,如果是跳过了Sharding路由需要通过RuleContextManager来判断是否走主库。

isMasterRoute改造

org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute
    private boolean isMasterRoute(final SQLStatement sqlStatement) {
        if(sqlStatement instanceof SkipShardingStatement){
            // 优先以MasterVisitedManager中的值为准
            return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute();
        }
        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
    }

当然,更理想的状况是通过监控主从同步延迟和数据库拨测,当超过阈值时或从库失联时直接自动修改配置中心的库,实现自动切换主库,减少业务故障时间和运维压力。

4.3 配置动态生效

Sharding-JDBC中的ConfigurationPropertyKey中提供了许多配置属性,而Sharding-JDBCB并没有为这些配置提供在线修改的方法,而在实际的应用场景中,像SQL_SHOW这样控制SQL打印的开关配置,我们更希望能够在线修改配置值来控制SQL日志的打印,而不是修改完配置再重启服务。

以SQL打印为例,BasePrepareEngine中存在ConfigurationProperties对象,通过调用getValue方法来获取SQL_SHOW的值。

SQL 打印

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
    
    public ExecutionContext prepare(final String sql, final List parameters) {
        List clonedParameters = cloneParameters(parameters);
        RouteContext routeContext = executeRoute(sql, clonedParameters);
        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
        //sql打印
        if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
            SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
        }
        return result;
    }

ConfigurationProperties继承了抽象类TypedProperties,其getValue方法就是根据key获取对应的配置值,因此我们直接在TypedProperties中实现刷新缓存中的配置值的方法。

TypedProperties刷新配置

public abstract class TypedProperties {
     
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
     
    @Getter
    private final Properties props;
     
    private final Map cache;
     
    public TypedProperties(final Class keyClass, final Properties props) {
        this.props = props;
        cache = preload(keyClass);
    }
     
    private Map preload(final Class keyClass) {
        E[] enumConstants = keyClass.getEnumConstants();
        Map result = new HashMap<>(enumConstants.length, 1);
        Collection errorMessages = new LinkedList<>();
        for (E each : enumConstants) {
            TypedPropertyValue value = null;
            try {
                value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString());
            } catch (final TypedPropertyValueException ex) {
                errorMessages.add(ex.getMessage());
            }
            result.put(each, value);
        }
        if (!errorMessages.isEmpty()) {
            throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages));
        }
        return result;
    }
     
    
    @SuppressWarnings("unchecked")
    public  T getValue(final E key) {
        return (T) cache.get(key).getValue();
    }
 
    
    public boolean refreshValue(String key, String value){
        //获取配置类支持的配置项
        E[] enumConstants = targetKeyClass.getEnumConstants();
        for (E each : enumConstants) {
            //遍历新的值
            if(each.getKey().equals(key)){
                try {
                    //空白value认为无效,取默认值
                    if(!StringUtils.isBlank(value)){
                        value = each.getDefaultValue();
                    }
                    //构造新属性
                    TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value);
                    //替换缓存
                    cache.put(each, typedPropertyValue);
                    //原始属性也替换下,有可能会通过RuntimeContext直接获取Properties
                    props.put(key,value);
                    return true;
                } catch (final TypedPropertyValueException ex) {
                    log.error("refreshValue error. key={} , value={}", key, value, ex);
                }
            }
        }
        return false;
    }
}

实现了刷新方法后,我们还需要将该方法一步步暴露至一个外部可以调用的类中,以便在服务监听配置的方法中,能够调用这个刷新方法。ConfigurationProperties直接在asePrepareEngine的构造函数中传入,我们通过构造函数逐步反推最外层的这一对象调用来源,最终可以定位到在AbstractDataSourceAdapter中的getRuntimeContext()方法中可以获取到这个配置,而这个就是Sharding-JDBC实现的JDBC中Datasource接口的抽象类,我们直接在这个类中调用刚刚实现的refreshValue方法,剩下的就是监听配置,通过自己实现的AbstractDataSourceAdapter来调用这个方法就好了。

通过这一功能,我们可以方便的控制一些开关属性的在线修改,如SQL打印、强制路由主库等,业务无需重启服务即可做到配置的动态生效。

4.4 批量update语法支持

业务中存在使用foreach标签来批量update的语句,这种SQL在Sharding-JDBC中无法被正确路由,只会路由第一组参数,后面的无法被路由改写,原因是解析引擎无法将语句拆分解析。

批量update样例


        
               update t_order set
               status = 1,
               updated_by = #{item.updatedBy}
               WHERE created_by = #{item.createdBy};
        
    

我们通过将批量update按照;拆分为多个语句,然后分别路由,最后手动汇总路有结果生成执行单元。

为了能正确重写SQL,批量update拆分后的语句需要完全一样,这样就不能使用动态拼接set条件,而是使用ifnull语法或者字段值不发生变化时也将原来的值放入set中,只不过set前后的值保持一致,整体思路与实现如下。

prepareBatch实现

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch
   private ExecutionContext prepareBatch(List splitSqlList, final List allParameters) {
       //SQL去重
       List sqlList = splitSqlList.stream().distinct().collect(Collectors.toList());
       if (sqlList.size() > 1) {
           throw new ShardingSphereException("不支持多条SQL,请检查SQL," + sqlList.toString());
       }
       //以第一条SQL为标准
       String sql = sqlList.get(0);
       //所有的执行单元
       Collection globalExecutionUnitList = new ArrayList<>();
       //初始化最后的执行结果
       ExecutionContext executionContextResult = null;
       //根据所有参数数量和SQL语句数量 计算每组参数的数量
       int eachSqlParameterCount = allParameters.size() / splitSqlList.size();
       //平均分配每条SQL的参数
       List> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount);
       for (List eachSqlParameterList : eachSqlParameterListList) {
           //每条SQL参数不同 需要根据参数路由不同的结果  实际的SqlStatementContext 是一致的
           RouteContext routeContext = executeRoute(sql, eachSqlParameterList);
           //由于SQL一样  实际的SqlStatementContext 是一致的 只需初始化一次
           if (executionContextResult == null) {
               executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext());
           }
           globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext));
       }
       //排序打印日志
       executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList()));
       if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
           SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE),
                   executionContextResult.getSqlStatementContext(), (Collection) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST));
       }
       return executionContextResult;
   }

这里我们在ExecutionContext单独构造了一个了ExtendMap来存放ExecutionUnit,原因是ExecutionContext中的executionUnits是HashSet,而判断ExecutionUnit中的SqlUnit只会根据SQL去重,批量update的SQL是一致的,但parameters不同,为了不影响原有的逻辑,单独使用了另外的变量来存放。

ExecutionContext改造

@RequiredArgsConstructor
@Getter
public class ExecutionContext {
 
    private final SQLStatementContext sqlStatementContext;
 
    private final Collection executionUnits = new LinkedHashSet<>();
 
    
    private final Map extendMap = new HashMap<>();
 
    
    @Setter
    private boolean skipShardingScenarioFlag = false;
}
 
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根据SQL判断是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
 
    private String sql;
 
    private final List parameters;
 
}

我们还需要改造下执行方法,在初始化执行器的时候,判断下ExtendMap中存在我们自定义的EXECUTION_UNIT_LIST是否存在,存在则使用生成InputGroup,同一个数据源下的ExecutionUnit会被放入同一个InputGroup中。

InputGroup改造

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init
    public void init(final ExecutionContext executionContext) throws SQLException {
        setSqlStatementContext(executionContext.getSqlStatementContext());
        //兼容批量update 分库分表后同一张表的情况 判断是否存在EXECUTION_UNIT_LIST 存在则使用未去重的List进行后续的操作
        if (MapUtils.isNotEmpty(executionContext.getExtendMap())){
            Collection executionUnitCollection = (Collection) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST);
            if(CollectionUtils.isNotEmpty(executionUnitCollection)){
                getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection));
            }
        }else {
            getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
        }
        cacheStatements();
    }

改造完成后,批量update中的每条SQL都可以被正确路由执行。

4.5 ShardingCondition去重

当where语句包括多个or条件时,而or条件不包含分片键时,会造成createShardingConditions方法生成重复的分片条件,导致重复调用doSharding方法。

SELECT * FROM t_order  WHERE created_by = ? and (   (status = ?) or  (status = ?) or  (status = ?) )这种SQL,存在三个or条件,分片键是created_by ,实际产生的shardingCondition会是三个一样的值,并会调用三次doSharding的方法。虽然实际执行还是只有一次(批量update那里说明过执行单元会去重),但为了减少方法的重复调用,我们还是对这里做了一次去重。

去重的方法也比较简单粗暴,我们对ListRouteValue和RangeRouteValue添加了@EqualsAndHashCode注解,然后在WhereClauseShardingConditionEngine的createShardingConditions方法返回最终结果前加一次去重,从而避免生成重复的shardingCondition造成doSharding方法的重复调用。

createShardingConditions去重

org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions
    private Collection createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection andPredicates, final List parameters) {
        Collection result = new LinkedList<>();
        for (AndPredicate each : andPredicates) {
            Map> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);
            if (routeValueMap.isEmpty()) {
                return Collections.emptyList();
            }
            result.add(createShardingCondition(routeValueMap));
        }
        //去重
        Collection distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new));
        return distinctResult;
    }
4.6  全路由校验

分片表的SQL中如果没有携带分片键(或者带上了分片键结果没有被正确解析)将会导致全路由,产生性能问题,而这种SQL并不会报错,这就导致在实际的业务改造中,开发和测试很难保证百分百改造彻底。为此,我们在源码层面对这种情况做了额外的校验,当产生全路由,也就是ShardingConditions为空时,主动抛出异常,从而方便开发和测试能够快速发现全路由SQL。

实现方式也比较简单,校验下ShardingConditions是否为空即可,只不过需要额外兼容下Hint策略ShardingConditions始终为空的特殊情况。

全路由校验

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        //省略...
        //获取 ShardingConditions
        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
        boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule);
        //判断是否允许全路由
        if (!properties.getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) {
            //如果不是Hint算法
            if(!isHintAlgorithm(sqlStatementContext, shardingRule)){
                
                if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {
                    if(shardingConditions.getConditions().isEmpty()) {
                        throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");
                    }else {
                        if (sqlStatementContext instanceof InsertStatementContext) {
                            List routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList());
                            if(CollectionUtils.isEmpty(routeValuesNotEmpty)){
                                throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");
                            }
                        }
                    }
                }
            }
        }
        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
        //省略...
        return new RouteContext(sqlStatementContext, parameters, routeResult);
    }
 
private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
        // 场景a 全局默认策略是否使用强制路由策略
        if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy
                || shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){
            return true;
        }
        for (String each : sqlStatementContext.getTablesContext().getTableNames()) {
            Optional tableRule = shardingRule.findTableRule(each);
            //场景b 指定表是否使用强制路由策略
            if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy
                    || shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) {
                return true;
            }
        }
        return false;
    }

当然这块功能也可以在完善些,比如对分片路由结果中的数据源数量进行校验,从而避免跨库操作,我们这边没有实现也就不再赘述了。

4.7 组件封装

业务接入Sharding-JDBC的步骤是一样的,都需要通过Java创建数据源和配置对象或者使用SpringBoot进行配置,存在一定的熟悉成本和重复开发的问题,为此我们也对定制开发版本的Sharding-JDBC封装了一个公共组件,从而简化业务配置,减少重复开发,提升业务的开发效率,具体功能可见下。这块没有涉及源码的改造,只是在定制版本上包装的一个公共组件。

  • 提供了默认的数据源与连接池配置
  • 简化分库分表配置,业务配置逻辑表名和后缀,组件拼装行表达式和actual-data-nodes
  • 封装常用的分片算法(时间、业务字段值等),
  • 统一的配置监听与动态修改(SQL打印、强制主从切换等)

开源Sharding-JDBC配置

//数据源名称
spring.shardingsphere.datasource.names=ds0,ds1
//ds0配置
spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=
//ds1配置
spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=
//分表规则
spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expressinotallow=t_order$->{order_id % 2}
spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expressinotallow=t_order_item$->{order_id % 2}
//默认分库规则
spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expressinotallow=ds$->{user_id % 2}

组件简化配置

//数据源名称
vivo.it.sharding.datasource.names = ds0,ds1
//ds0配置
vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds0.username = root
vivo.it.sharding.datasource.ds0.password =
//ds1配置
vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds1.username = root
vivo.it.sharding.datasource.ds1.password =
//分表规则
vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]
//默认分库规则
vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}

五、使用建议

结合官方文档和业务实践经验,我们也梳理了部分使用Sharding-JDBC的建议供大家参考,实际具体如何优化SQL写法(比如子查询、分页、分组排序等)还需要结合业务的实际场景来进行测试和调优。

(1)强制等级

建议①:涉及分片表的SQL必须携带分片键

原因:无分片键会导致全路由,存在严重的性能隐患

建议②:禁止一条SQL中的分片值路由至不同的库

原因:跨库操作存在严重的性能隐患,事务操作会升级为分布式事务,增加业务复杂度

建议③:禁止对分片键使用运算表达式或函数操作

原因:无法提前计算表达式和函数获取分片值,导致全路由

说明:详见官方文档

建议④:禁止在子查询中使用分片表

原因:无法正常解析子查询中的分片表,导致业务错误

说明:虽然官方文档中说有限支持子查询 ,但在实际的使用中发现4.1.1并不支持子查询,可见官方issue6164 | issue 6228

建议⑤:包含CASE WHEN、HAVING、UNION (ALL)语法的分片SQL,不支持路由至多数据节点

说明:详见官方文档

(2)建议等级

① 建议使用分布式id来保证分片表主键的全局唯一性

原因:方便判断数据的唯一性和后续的迁移扩容

说明:详见文章《vivo 自研鲁班分布式 ID 服务实践》

② 建议跨多表的分组SQL的分组字段与排序字段保证一致

原因:分组和排序字段不一致只能通过内存合并,大数据量时存在性能隐患

说明:详见官方文档

③ 建议通过全局递增的分布式id来优化分页查询

原因:Sharding-JDBC的分页优化侧重于结果集的流式合并来避免内存爆涨,但深度分页自身的性能问题并不能解决

说明:详见官方文档

六、总结

本文结合个人理解梳理了各个引擎的源码入口和关键逻辑,读者可以结合本文和官方文档更好的定位理解Sharding-JDBC的源码实现。定制开发的目的是为了降低业务接入成本,尽可能减少业务存量SQL的改造,部分改造思想其实与官方社区也存在差异,比如跳过语法解析,官方社区致力于通过优化解析引擎来适配各种语法,而不是跳过解析阶段,可参考官方issue。源码分析和定制改造只涉及了Sharding-JDBC的数据分片和读写分离功能,定制开发的功能也在生产环境经过了考验,如有不足和优化建议,也欢迎大家批评指正。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容
咦!没有更多了?去看看其它编程学习网 内容吧