短信预约-IT技能 免费直播动态提醒
Sharding-JDBC源码解析与vivo的定制开发 本文源码基于Sharding-JDBC 4.1.1版本。
一、业务背景 随着业务并发请求和数据规模的不断扩大,单节点库表压力往往会成为系统的性能瓶颈。公司IT内部营销库存、交易订单、财经台账、考勤记录 等多领域的业务场景的日增数据量巨大,存在着数据库节点压力过大、连接过多、查询速度变慢 等情况,根据数据来源、时间、工号等信息来将没有联系的数据 尽量均分到不同的库表中,从而在不影响业务需求的前提下,减轻数据库节点压力,提升查询效率和系统稳定性。
二、技术选型 我们对比了几款比较常见的支持分库分表和读写分离的中间件。
Sharding-JDBC作为轻量化的增强版的JDBC框架,相较其他中间件性能更好,接入难度更低,其数据分片、读写分离功能也覆盖了我们的业务诉求,因此我们在业务中广泛使用了Sharding-JDBC。但在使用Sharding-JDBC的过程中,我们也发现了诸多问题,为了业务更便捷的使用Sharding-JDBC,我们对源码做了针对性的定制开发和组件封装来满足业务需求。
三、源码解析 3.1 引言 Sharding-JDBC作为基于JDBC的数据库中间件,实现了JDBC的标准api,Sharding-JDBC与原生JDBC的执行对比流程如下图所示:
相关执行流程的代码样例如下:
JDBC执行样例
//获取数据库连接
try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) {
String sql = "SELECT * FROM t_user WHERE name = ?";
//预编译SQL
try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
//参数设置与执行
preparedStatement.setString(1, "vivo");
preparedStatement.execute(sql);
//获取结果集
try (ResultSet resultSet = preparedStatement.getResultSet()) {
while (resultSet.next()) {
//处理结果
}
}
}
}
Sharding-JDBC 源码
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute
public boolean execute() throws SQLException {
try {
clearPrevious();
//解析+路由+重写 内部调用BasePrepareEngine#prepare方法
prepare();
initPreparedStatementExecutor();
//执行
return preparedStatementExecutor.execute();
} finally {
clearBatch();
}
}
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
public ExecutionContext prepare(final String sql, final List parameters) {
List clonedParameters = cloneParameters(parameters);
//解析+路由(executeRoute内部先进行解析再执行路由)
RouteContext routeContext = executeRoute(sql, clonedParameters);
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
//重写
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
List resultSets = getResultSets();
//归并结果集
MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
}
return currentResultSet;
}
从对比的执行流程图可见:
【JDBC】:执行的主要流程是通过Datasource获取Connection,再注入SQL语句生成PreparedStatement对象,PreparedStatement设置占位符参数执行后得到结果集ResultSet。 【Sharding-JDBC】:主要流程基本一致,但Sharding基于PreparedStatement进行了实现与扩展,具体实现类 ShardingPreparedStatement中会抽象出解析、路由、重写、归并等引擎,从而实现分库分表、读写分离 等能力,每个引擎的作用说明如下表所示:
/
public String route(final SQLStatement sqlStatement) {
if (isMasterRoute(sqlStatement)) {
MasterVisitedManager.setMasterVisited();
return masterSlaveRule.getMasterDataSourceName();
}
return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));
}
private boolean isMasterRoute(final SQLStatement sqlStatement) {
return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
}
private boolean containsLockSegment(final SQLStatement sqlStatement) {
return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();
}
}
是否走主库的信息存在MasterVisitedManager中,MasterVisitedManager是通过ThreadLocal实现的,但这种实现会有一个问题,当我们使用事务先查询再更新/插入时,第一条查询SQL并不会走主库,而是走从库 ,如果业务需要事务的第一条查询也走主库,事务查询前需要手动调用一次MasterVisitedManager.setMasterVisited()。
MasterVisitedManager
public final class MasterVisitedManager {
private static final ThreadLocal MASTER_VISITED = ThreadLocal.withInitial(() -> false);
public static boolean isMasterVisited() {
return MASTER_VISITED.get();
}
public static void setMasterVisited() {
MASTER_VISITED.set(true);
}
public static void clear() {
MASTER_VISITED.remove();
}
}
3.3.2 引擎总结
路由引擎的作用是将SQL根据参数通过实现的策略算法计算出实际该在哪些库的哪些表执行,也就是路由结果。路由引擎有两种实现,分别是分片路由和主从路由,两者都提供了标准化的策略接口来让业务实现自己的路由策略,分片路由需要注意自身SQL场景和策略算法相匹配,主从路由中同一线程且同一数据库连接内,有写入操作后,之后的读操作会从主库读取,写入操作前的读操作不会走主库。
3.4 改写引擎 3.4.1 引擎解析
经过解析路由后虽然确定了执行的实际库表,但SQL中表名依旧是逻辑表,不能执行,改写引擎可以将逻辑表替换为物理表。同时,路由至多库表的SQL也需要拆分为多条SQL执行。
改写的入口仍旧在BasePrepareEngine中,创建重写上下文createSQLRewriteContext,再根据上下文进行改写rewrite,最终返回执行单元ExecutionUnit。
改写逻辑入口
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite
private Collection executeRewrite(final String sql, final List parameters, final RouteContext routeContext) {
//注册重写装饰器
registerRewriteDecorator();
//创建 SQLRewriteContext
SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
//重写
return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
}
执行单元包含了数据源名称,改写后的SQL,以及对应的参数,SQL一样的两个SQLUnit会被视为相等。
ExecutionUnit
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
private final String dataSourceName;
private final SQLUnit sqlUnit;
}
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根据sql判断是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
private String sql;
private final List parameters;
}
createSQLRewriteContext完成了两件事,一个是对SQL参数进行了重写,一个是生成了SQLToken。
createSQLRewriteContext
org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext
public SQLRewriteContext createSQLRewriteContext(final String sql, final List parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
//sql参数重写
decorate(decorators, result, routeContext);
//生成SQLToken
result.generateSQLTokens();
return result;
}
org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate
public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
//参数重写
each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
}
}
//sqlTokenGenerators
sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
}
org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens
public void generateSQLTokens() {
sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
}
ParameterRewriter中与分片相关的实现有两种。
/
private boolean skipSharding;
private boolean masterRoute;
public static boolean isSkipSharding() {
return SKIP_CONTEXT_HOLDER.get().skipSharding;
}
public static void setSkipSharding(boolean skipSharding) {
SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding;
}
public static boolean isMasterRoute() {
return SKIP_CONTEXT_HOLDER.get().masterRoute;
}
public static void setMasterRoute(boolean masterRoute) {
SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute;
}
public static void clear(){
SKIP_CONTEXT_HOLDER.remove();
}
}
判断SQL是否包含分片表
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext
// 判断是否可以跳过sharding,构造RuleContextManager的值
private void buildSkipContext(final String sql){
Set sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[\\s]")));
if (CollectionUtils.isNotEmpty(rules)) {
for (BaseRule baseRule : rules) {
//定制方法,ShardingRule实现,判断sqlTokenSet是否包含逻辑表即可
if(baseRule.hasContainShardingTable(sqlTokenSet)){
RuleContextManager.setSkipSharding(false);
break;
}else {
RuleContextManager.setSkipSharding(true);
}
}
}
}
org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTable
public Boolean hasContainShardingTable(Set sqlTokenSet) {
//logicTableNameList通过遍历TableRule可以得到
for (String logicTable : logicTableNameList) {
if (sqlTokenSet.contains(logicTable)) {
return true;
}
}
return false;
}
(2)跳过解析路由: 通过RuleContextManager中的skipSharding判断是否需要跳过Sharding解析路由,但为了兼容读写分离的场景,我们还需要知道这条SQL应该走主库还是从库,走主库的场景在后面强制路由主库部分有说明,SQL走主库实际上只有两种情况,一种是非SELECT语句,另一种就是SELECT语句带锁,如SELECT...FOR
UPDATE,因此整体实现的步骤如下:
如果标记了跳过Sharding且不为select语句,直接返回SkipShardingStatement,单独构造一个SkipShardingStatement的目的是为了能利用解析引擎中的缓存,缓存中不能放入null值。 如果是select语句需要继续解析,判断是否有锁后直接返回,避免后续解析造成语法不兼容,这里也曾尝试用反射获取lockClause来判断是否包含锁,但最终没有成功。 ShardingRouteDecorator根据 RuleContextManager.isSkipSharding判断是否跳过路由。
跳过解析路由
public class SkipShardingStatement implements SQLStatement{
@Override
public int getParameterCount() {
return 0;
}
}
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0
private SQLStatement parse0(final String sql, final boolean useCache) {
if (useCache) {
Optional cachedSQLStatement = cache.getSQLStatement(sql);
if (cachedSQLStatement.isPresent()) {
return cachedSQLStatement.get();
}
}
ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
SQLStatement result ;
if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){
RuleContextManager.setMasterRoute(true);
result = new SkipShardingStatement();
}else {
result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
}
if (useCache) {
cache.put(sql, result);
}
return result;
}
org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause
public ASTNode visitSelectClause(final SelectClauseContext ctx) {
SelectStatement result = new SelectStatement();
// 跳过sharding 只需要判断是否有锁来决定是否路由至主库即可
if(RuleContextManager.isSkipSharding()){
if (null != ctx.lockClause()) {
result.setLock((LockSegment) visit(ctx.lockClause()));
RuleContextManager.setMasterRoute(true);
}
return result;
}
//...后续解析
}
org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext
private RouteContext createRouteContext(final String sql, final List parameters, final boolean useCache) {
SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
//如果需要跳过sharding 不进行后续的解析直接返回
if (RuleContextManager.isSkipSharding()) {
return new RouteContext(sqlStatement, parameters, new RouteResult());
}
//...解析
}
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
// 跳过sharding路由
if(RuleContextManager.isSkipSharding()){
return routeContext;
}
//...路由
}
(3)手动构造ExecutionUnit: ExecutionUnit中我们需要确定的内容就是datasourceName,这里我们认为跳过Sharding的SQL最终执行的库一定只有一个。如果只是跳过Sharding的情况,直接从元数据中获取数据源名称即可,如果存在读写分离的情况,主从路由的结果也一定是唯一的。创建完ExecutionUnit直接放入ExecutionContext返回即可,从而跳过后续的改写逻辑。
手动构造ExecutionUnit
public ExecutionContext prepare(final String sql, final List parameters) {
List clonedParameters = cloneParameters(parameters);
// 判断是否可以跳过sharding,构造RuleContextManager的值
buildSkipContext(sql);
RouteContext routeContext = executeRoute(sql, clonedParameters);
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
// 跳过sharding的sql最后的路由结果一定只有一个库
if(RuleContextManager.isSkipSharding()){
log.debug("可以跳过sharding的场景 {}", sql);
if(!Objects.isNull(routeContext.getRouteResult())){
Collection allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames();
int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size();
if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){
throw new ShardingSphereException("可以跳过sharding,但是路由结果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits());
}
Collection actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames();
// 手动创建执行单元
String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next();
ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters));
result.getExecutionUnits().add(executionUnit);
//标记该结果需要跳过
result.setSkipShardingScenarioFlag(true);
}
}else {
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
}
if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
(4)跳过合并: 跳过查询结果的合并和影响行数计算的合并,注意ShardingPreparedStatement和ShardingStatement都需要跳过
跳过合并
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
List queryResults = preparedStatementExecutor.executeQuery();
List resultSets = preparedStatementExecutor.getResultSets();
// 定制开发,不分片跳过合并
if(executionContext.isSkipShardingScenarioFlag()){
return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
}
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
List resultSets = getResultSets();
// 定制开发,不分片跳过合并
if(executionContext.isSkipShardingScenarioFlag()){
return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
}
if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
}
return currentResultSet;
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate
public boolean isAccumulate() {
//定制开发,不分片跳过计算
if(executionContext.isSkipShardingScenarioFlag()){
return false;
}
return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
}
(5)清空RuleContextManager: 查看一下Sharding-JDBC其他ThreadLocal的清空位置,对应的清空RuleContextManager就好。
清空ThreadLocal
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#close
public final void close() throws SQLException {
closed = true;
MasterVisitedManager.clear();
TransactionTypeHolder.clear();
RuleContextManager.clear();
int connectionSize = cachedConnections.size();
try {
forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close());
} finally {
cachedConnections.clear();
rootInvokeHook.finish(connectionSize);
}
}
举个例子,比如Sharding-JDBC本身是不支持INSERT
INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE
col3 = ? 这种语法的,会报空指针异常。
经过我们上述改造验证后,非分片表是可以跳过语法限制执行如下的SQL的。
通过该功能的实现,业务可以更关注与分片表的SQL改造,而无需担心引入Sharding-JDBC造成所有SQL的验证改造,大幅减少改造成本和风险。
4.2 强制路由主库 Sharding-JDBC可以通过配置主从库数据源方便的实现读写分离的功能,但使用读写分离就必须面对主从延迟和从库失联的痛点,针对这一问题,我们实现了强制路由主库的动态配置,当主从延迟过大或从库失联时,通过修改配置来实现SQL语句强制走主库的不停机路由切换。
后面会说明了配置的动态生效的实现方式,这里只说明强制路由主库的实现,我们直接使用前文的RuleContextManager即可,在主从路由引擎里判断下是否开启了强制主库路由。
MasterSlaveRouteDecorator.decorate改造
org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
if(properties.getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){
MasterVisitedManager.setMasterVisited();
}
//...路由逻辑
return routeContext;
}
为了兼容之前跳过Sharding的功能,我们需要同步修改下isMasterRoute方法,如果是跳过了Sharding路由需要通过RuleContextManager来判断是否走主库。
isMasterRoute改造
org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute
private boolean isMasterRoute(final SQLStatement sqlStatement) {
if(sqlStatement instanceof SkipShardingStatement){
// 优先以MasterVisitedManager中的值为准
return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute();
}
return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
}
当然,更理想的状况是通过监控主从同步延迟和数据库拨测,当超过阈值时或从库失联时直接自动修改配置中心的库,实现自动切换主库,减少业务故障时间和运维压力。
4.3 配置动态生效 Sharding-JDBC中的ConfigurationPropertyKey中提供了许多配置属性,而Sharding-JDBCB并没有为这些配置提供在线修改的方法,而在实际的应用场景中,像SQL_SHOW这样控制SQL打印的开关配置,我们更希望能够在线修改配置值来控制SQL日志的打印,而不是修改完配置再重启服务。
以SQL打印为例,BasePrepareEngine中存在ConfigurationProperties对象,通过调用getValue方法来获取SQL_SHOW的值。
SQL 打印
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
public ExecutionContext prepare(final String sql, final List parameters) {
List clonedParameters = cloneParameters(parameters);
RouteContext routeContext = executeRoute(sql, clonedParameters);
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
//sql打印
if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
ConfigurationProperties继承了抽象类TypedProperties,其getValue方法就是根据key获取对应的配置值,因此我们直接在TypedProperties中实现刷新缓存中的配置值的方法。
TypedProperties刷新配置
public abstract class TypedProperties {
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
@Getter
private final Properties props;
private final Map cache;
public TypedProperties(final Class keyClass, final Properties props) {
this.props = props;
cache = preload(keyClass);
}
private Map preload(final Class keyClass) {
E[] enumConstants = keyClass.getEnumConstants();
Map result = new HashMap<>(enumConstants.length, 1);
Collection errorMessages = new LinkedList<>();
for (E each : enumConstants) {
TypedPropertyValue value = null;
try {
value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString());
} catch (final TypedPropertyValueException ex) {
errorMessages.add(ex.getMessage());
}
result.put(each, value);
}
if (!errorMessages.isEmpty()) {
throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages));
}
return result;
}
@SuppressWarnings("unchecked")
public T getValue(final E key) {
return (T) cache.get(key).getValue();
}
public boolean refreshValue(String key, String value){
//获取配置类支持的配置项
E[] enumConstants = targetKeyClass.getEnumConstants();
for (E each : enumConstants) {
//遍历新的值
if(each.getKey().equals(key)){
try {
//空白value认为无效,取默认值
if(!StringUtils.isBlank(value)){
value = each.getDefaultValue();
}
//构造新属性
TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value);
//替换缓存
cache.put(each, typedPropertyValue);
//原始属性也替换下,有可能会通过RuntimeContext直接获取Properties
props.put(key,value);
return true;
} catch (final TypedPropertyValueException ex) {
log.error("refreshValue error. key={} , value={}", key, value, ex);
}
}
}
return false;
}
}
实现了刷新方法后,我们还需要将该方法一步步暴露至一个外部可以调用的类中,以便在服务监听配置的方法中,能够调用这个刷新方法。ConfigurationProperties直接在asePrepareEngine的构造函数中传入,我们通过构造函数逐步反推最外层的这一对象调用来源,最终可以定位到在AbstractDataSourceAdapter中的getRuntimeContext()方法中可以获取到这个配置,而这个就是Sharding-JDBC实现的JDBC中Datasource接口的抽象类,我们直接在这个类中调用刚刚实现的refreshValue方法,剩下的就是监听配置,通过自己实现的AbstractDataSourceAdapter来调用这个方法就好了。
通过这一功能,我们可以方便的控制一些开关属性的在线修改,如SQL打印、强制路由主库等,业务无需重启服务即可做到配置的动态生效。
4.4 批量update语法支持 业务中存在使用foreach标签来批量update的语句,这种SQL在Sharding-JDBC中无法被正确路由,只会路由第一组参数,后面的无法被路由改写,原因是解析引擎无法将语句拆分解析。
批量update样例
update t_order set
status = 1,
updated_by = #{item.updatedBy}
WHERE created_by = #{item.createdBy};
我们通过将批量update按照;拆分为多个语句,然后分别路由,最后手动汇总路有结果生成执行单元。
为了能正确重写SQL,批量update拆分后的语句需要完全一样,这样就不能使用动态拼接set条件,而是使用ifnull语法或者字段值不发生变化时也将原来的值放入set中,只不过set前后的值保持一致,整体思路与实现如下。
prepareBatch实现
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch
private ExecutionContext prepareBatch(List splitSqlList, final List allParameters) {
//SQL去重
List sqlList = splitSqlList.stream().distinct().collect(Collectors.toList());
if (sqlList.size() > 1) {
throw new ShardingSphereException("不支持多条SQL,请检查SQL," + sqlList.toString());
}
//以第一条SQL为标准
String sql = sqlList.get(0);
//所有的执行单元
Collection globalExecutionUnitList = new ArrayList<>();
//初始化最后的执行结果
ExecutionContext executionContextResult = null;
//根据所有参数数量和SQL语句数量 计算每组参数的数量
int eachSqlParameterCount = allParameters.size() / splitSqlList.size();
//平均分配每条SQL的参数
List> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount);
for (List eachSqlParameterList : eachSqlParameterListList) {
//每条SQL参数不同 需要根据参数路由不同的结果 实际的SqlStatementContext 是一致的
RouteContext routeContext = executeRoute(sql, eachSqlParameterList);
//由于SQL一样 实际的SqlStatementContext 是一致的 只需初始化一次
if (executionContextResult == null) {
executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext());
}
globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext));
}
//排序打印日志
executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList()));
if (properties.getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.getValue(ConfigurationPropertyKey.SQL_SIMPLE),
executionContextResult.getSqlStatementContext(), (Collection) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST));
}
return executionContextResult;
}
这里我们在ExecutionContext单独构造了一个了ExtendMap来存放ExecutionUnit,原因是ExecutionContext中的executionUnits是HashSet,而判断ExecutionUnit中的SqlUnit只会根据SQL去重,批量update的SQL是一致的,但parameters不同,为了不影响原有的逻辑,单独使用了另外的变量来存放。
ExecutionContext改造
@RequiredArgsConstructor
@Getter
public class ExecutionContext {
private final SQLStatementContext sqlStatementContext;
private final Collection executionUnits = new LinkedHashSet<>();
private final Map extendMap = new HashMap<>();
@Setter
private boolean skipShardingScenarioFlag = false;
}
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
private final String dataSourceName;
private final SQLUnit sqlUnit;
}
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根据SQL判断是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
private String sql;
private final List parameters;
}
我们还需要改造下执行方法,在初始化执行器的时候,判断下ExtendMap中存在我们自定义的EXECUTION_UNIT_LIST是否存在,存在则使用生成InputGroup,同一个数据源下的ExecutionUnit会被放入同一个InputGroup中。
InputGroup改造
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init
public void init(final ExecutionContext executionContext) throws SQLException {
setSqlStatementContext(executionContext.getSqlStatementContext());
//兼容批量update 分库分表后同一张表的情况 判断是否存在EXECUTION_UNIT_LIST 存在则使用未去重的List进行后续的操作
if (MapUtils.isNotEmpty(executionContext.getExtendMap())){
Collection executionUnitCollection = (Collection) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST);
if(CollectionUtils.isNotEmpty(executionUnitCollection)){
getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection));
}
}else {
getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
}
cacheStatements();
}
改造完成后,批量update中的每条SQL都可以被正确路由执行。
4.5 ShardingCondition去重 当where语句包括多个or条件时,而or条件不包含分片键时,会造成createShardingConditions方法生成重复的分片条件,导致重复调用doSharding方法。
如SELECT * FROM t_order WHERE created_by = ? and ( (status = ?) or (status = ?) or (status = ?) ) 这种SQL,存在三个or条件,分片键是created_by
,实际产生的shardingCondition会是三个一样的值,并会调用三次doSharding的方法。虽然实际执行还是只有一次(批量update那里说明过执行单元会去重),但为了减少方法的重复调用,我们还是对这里做了一次去重。
去重的方法也比较简单粗暴,我们对ListRouteValue和RangeRouteValue添加了@EqualsAndHashCode注解,然后在WhereClauseShardingConditionEngine的createShardingConditions方法返回最终结果前加一次去重,从而避免生成重复的shardingCondition造成doSharding方法的重复调用。
createShardingConditions去重
org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions
private Collection createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection andPredicates, final List parameters) {
Collection result = new LinkedList<>();
for (AndPredicate each : andPredicates) {
Map> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);
if (routeValueMap.isEmpty()) {
return Collections.emptyList();
}
result.add(createShardingCondition(routeValueMap));
}
//去重
Collection distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new));
return distinctResult;
}
4.6 全路由校验 分片表的SQL中如果没有携带分片键(或者带上了分片键结果没有被正确解析)将会导致全路由,产生性能问题,而这种SQL并不会报错,这就导致在实际的业务改造中,开发和测试很难保证百分百改造彻底。为此,我们在源码层面对这种情况做了额外的校验,当产生全路由,也就是ShardingConditions为空时,主动抛出异常,从而方便开发和测试能够快速发现全路由SQL。
实现方式也比较简单,校验下ShardingConditions是否为空即可,只不过需要额外兼容下Hint策略ShardingConditions始终为空的特殊情况。
全路由校验
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
//省略...
//获取 ShardingConditions
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule);
//判断是否允许全路由
if (!properties.getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) {
//如果不是Hint算法
if(!isHintAlgorithm(sqlStatementContext, shardingRule)){
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {
if(shardingConditions.getConditions().isEmpty()) {
throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");
}else {
if (sqlStatementContext instanceof InsertStatementContext) {
List routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList());
if(CollectionUtils.isEmpty(routeValuesNotEmpty)){
throw new ShardingSphereException("SQL不包含分库分表键,请检查SQL");
}
}
}
}
}
}
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
//省略...
return new RouteContext(sqlStatementContext, parameters, routeResult);
}
private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
// 场景a 全局默认策略是否使用强制路由策略
if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy
|| shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){
return true;
}
for (String each : sqlStatementContext.getTablesContext().getTableNames()) {
Optional tableRule = shardingRule.findTableRule(each);
//场景b 指定表是否使用强制路由策略
if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy
|| shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) {
return true;
}
}
return false;
}
当然这块功能也可以在完善些,比如对分片路由结果中的数据源数量进行校验,从而避免跨库操作,我们这边没有实现也就不再赘述了。
4.7 组件封装 业务接入Sharding-JDBC的步骤是一样的,都需要通过Java创建数据源和配置对象或者使用SpringBoot进行配置,存在一定的熟悉成本和重复开发的问题,为此我们也对定制开发版本的Sharding-JDBC封装了一个公共组件,从而简化业务配置,减少重复开发,提升业务的开发效率,具体功能可见下。这块没有涉及源码的改造,只是在定制版本上包装的一个公共组件。
提供了默认的数据源与连接池配置 简化分库分表配置,业务配置逻辑表名和后缀,组件拼装行表达式和actual-data-nodes 封装常用的分片算法(时间、业务字段值等), 统一的配置监听与动态修改(SQL打印、强制主从切换等) 开源Sharding-JDBC配置
//数据源名称
spring.shardingsphere.datasource.names=ds0,ds1
//ds0配置
spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=
//ds1配置
spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=
//分表规则
spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expressinotallow=t_order$->{order_id % 2}
spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expressinotallow=t_order_item$->{order_id % 2}
//默认分库规则
spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expressinotallow=ds$->{user_id % 2}
组件简化配置
//数据源名称
vivo.it.sharding.datasource.names = ds0,ds1
//ds0配置
vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds0.username = root
vivo.it.sharding.datasource.ds0.password =
//ds1配置
vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds1.username = root
vivo.it.sharding.datasource.ds1.password =
//分表规则
vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]
//默认分库规则
vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}
五、使用建议 结合官方文档和业务实践经验,我们也梳理了部分使用Sharding-JDBC的建议供大家参考,实际具体如何优化SQL写法(比如子查询、分页、分组排序等)还需要结合业务的实际场景来进行测试和调优。
(1)强制等级
建议①:涉及分片表的SQL必须携带分片键
原因:无分片键会导致全路由,存在严重的性能隐患
建议②:禁止一条SQL中的分片值路由至不同的库
原因:跨库操作存在严重的性能隐患,事务操作会升级为分布式事务,增加业务复杂度
建议③:禁止对分片键使用运算表达式或函数操作
原因:无法提前计算表达式和函数获取分片值,导致全路由
说明:详见官方文档
建议④:禁止在子查询中使用分片表
原因:无法正常解析子查询中的分片表,导致业务错误
说明:虽然 官方文档中说有限支持子查询 ,但在实际的使用中发现4.1.1并不支持子查询, 可见官方 issue6164 | issue 6228 。
建议⑤:包含CASE WHEN、HAVING、UNION (ALL)语法的分片SQL,不支持路由至多数据节点
说明:详见官方文档
(2)建议等级
① 建议使用分布式id来保证分片表主键的全局唯一性
原因:方便判断数据的唯一性和后续的迁移扩容
说明:详见文章《vivo 自研鲁班分布式 ID 服务实践》
② 建议跨多表的分组SQL的分组字段与排序字段保证一致
原因:分组和排序字段不一致只能通过内存合并,大数据量时存在性能隐患
说明:详见官方文档
③ 建议通过全局递增的分布式id来优化分页查询
原因:Sharding-JDBC的分页优化侧重于结果集的流式合并来避免内存爆涨,但深度分页自身的性能问题并不能解决
说明:详见官方文档
六、总结 本文结合个人理解梳理了各个引擎的源码入口和关键逻辑,读者可以结合本文和官方文档更好的定位理解Sharding-JDBC的源码实现。定制开发的目的是为了降低业务接入成本 ,尽可能减少业务存量SQL的改造,部分改造思想其实与官方社区也存在差异,比如跳过语法解析,官方社区致力于通过优化解析引擎来适配各种语法,而不是跳过解析阶段,可参考官方issue 。源码分析和定制改造只涉及了Sharding-JDBC的数据分片和读写分离功能,定制开发的功能也在生产环境经过了考验,如有不足和优化建议,也欢迎大家批评指正。
免责声明:
① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。
② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)
难度 813人已做
查看 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析
难度 354人已做
查看 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析
难度 318人已做
查看 2024年上半年软考高项第一、二批次真题考点汇总(完整版)
难度 435人已做
查看 2024年上半年系统架构设计师考试综合知识真题
难度 224人已做
查看