问题描述
一般情况下,在MybatisPlus中使用saveBatch方法进行批量保存只需要:在数据库连接串中添加&rewriteBatchedStatements=true,并将MySQL驱动保证在5.0.18以上即可。
但是在这里实际使用中批量保存并没有生效,列表数据被分组成几批数据保存,而不是一批数据保存,通过调试、查看数据库日志等方式可以验证。
所以现在是配置正确,驱动正确,批量保存的数据正确,但是批量保存没有生效。
批量保存原理
框架是不会出问题的,这里来看下MybatisPlus实现批量保存的实现方式,调用saveBatch方法后发生了什么。
ServiceImpl.saveBatch()
@Transactional(rollbackFor = Exception.class)
@Override
public boolean saveBatch(Collection<T> entityList, int batchSize) {
// ${className}.insert
String sqlStatement = getSqlStatement(SqlMethod.INSERT_ONE);
// 函数式编程 BiConsumer<T, U>
return executeBatch(entityList, batchSize, (sqlSession, entity) -> sqlSession.insert(sqlStatement, entity));
}
采用函数式编程实现BiConsumer接口,其用法相当于
@Override
public boolean saveBatch(Collection entityList, int batchSize) {
BiConsumer<SqlSession, Object> consumer = new EntityConsumer();
return executeBatch(entityList, batchSize, consumer);
}
class EntityConsumer implements BiConsumer<SqlSession, Object>{
@Override
public void accept(SqlSession sqlSession, Object object) {
sqlSession.insert("${className}.insert", object);
}
}
SqlHelper.executeBatch()
定义批量保存的模板方法
public static <E> boolean executeBatch(Class<?> entityClass, Log log, Collection<E> list, int batchSize, BiConsumer<SqlSession, E> consumer) {
Assert.isFalse(batchSize < 1, "batchSize must not be less than one");
return !CollectionUtils.isEmpty(list) && executeBatch(entityClass, log, sqlSession -> {
int size = list.size();
int i = 1;
for (E element : list) {
// 调用sqlSession.insert("${className}.insert", object);
// 数据最终保存到StatementImpl.batchedArgs中,用于后面做批量保存
consumer.accept(sqlSession, element);
if ((i % batchSize == 0) || i == size) {
// 批量保存StatementImpl.batchedArgs中的数据
sqlSession.flushStatements();
}
i++;
}
});
}
MybatisBatchExecutor.doUpdate()
将待执行对象添加到对应的Statement中,可以理解为将批量数据分组,分组的依据包含两个:
if (sql.equals(currentSql) && ms.equals(currentStatement))
● 数据的SQL语句必须完全一致,包括表名和列
● 使用的MappedStatement一致,即Mapper一致
@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
final Configuration configuration = ms.getConfiguration();
final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
final BoundSql boundSql = handler.getBoundSql();
final String sql = boundSql.getSql();
final Statement stmt;
// **
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
int last = statementList.size() - 1;
stmt = statementList.get(last);
applyTransactionTimeout(stmt);
handler.parameterize(stmt);//fix Issues 322
BatchResult batchResult = batchResultList.get(last);
batchResult.addParameterObject(parameterObject);
} else {
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
if (stmt == null) {
return 0;
}
handler.parameterize(stmt); //fix Issues 322
currentSql = sql;
currentStatement = ms;
statementList.add(stmt);
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
handler.batch(stmt);
return BATCH_UPDATE_RETURN_VALUE;
}
PreparedStatement.addBatch()
将数据添加到StatementImpl.batchedArgs中,至此第一阶段完成
public void addBatch() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
if (this.batchedArgs == null) {
this.batchedArgs = new ArrayList<Object>();
}
for (int i = 0; i < this.parameterValues.length; i++) {
checkAllParametersSet(this.parameterValues[i], this.parameterStreams[i], i);
}
this.batchedArgs.add(new BatchParams(this.parameterValues, this.parameterStreams, this.isStream, this.streamLengths, this.isNull));
}
}
MybatisBatchExecutor.doFlushStatements()
遍历Statemen,并执行executeBatch()方法
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<>();
if (isRollback) {
return Collections.emptyList();
}
for (int i = 0, n = statementList.size(); i < n; i++) {
Statement stmt = statementList.get(i);
applyTransactionTimeout(stmt);
BatchResult batchResult = batchResultList.get(i);
try {
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
List<Object> parameterObjects = batchResult.getParameterObjects();
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
// Close statement to close cursor #1109
closeStatement(stmt);
} catch (BatchUpdateException e) {
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId())
.append(" (batch index #")
.append(i + 1)
.append(")")
.append(" failed.");
if (i > 0) {
message.append(" ")
.append(i)
.append(" prior sub executor(s) completed successfully, but will be rolled back.");
}
throw new BatchExecutorException(message.toString(), e, results, batchResult);
}
results.add(batchResult);
}
return results;
} finally {
for (Statement stmt : statementList) {
closeStatement(stmt);
}
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}
PreparedStatement.executeBatchInternal()
最终执行批量操作的逻辑,这里会判断rewriteBatchedStatements参数
@Override
protected long[] executeBatchInternal() throws SQLException {
synchronized (checkClosed().getConnectionMutex()) {
if (this.connection.isReadOnly()) {
throw new SQLException(Messages.getString("PreparedStatement.25") + Messages.getString("PreparedStatement.26"),
SQLError.SQL_STATE_ILLEGAL_ARGUMENT);
}
if (this.batchedArgs == null || this.batchedArgs.size() == 0) {
return new long[0];
}
// we timeout the entire batch, not individual statements
int batchTimeout = this.timeoutInMillis;
this.timeoutInMillis = 0;
resetCancelledState();
try {
statementBegins();
clearWarnings();
// 判断rewriteBatchedStatements参数
if (!this.batchHasPlainStatements && this.connection.getRewriteBatchedStatements()) {
if (canRewriteAsMultiValueInsertAtSqlLevel()) {
return executeBatchedInserts(batchTimeout);
}
if (this.connection.versionMeetsMinimum(4, 1, 0) && !this.batchHasPlainStatements && this.batchedArgs != null
&& this.batchedArgs.size() > 3 ) {
return executePreparedBatchAsMultiStatement(batchTimeout);
}
}
return executeBatchSerially(batchTimeout);
} finally {
this.statementExecuting.set(false);
clearBatch();
}
}
}
问题排查
在流程中几个关键节点为:
● MybatisBatchExecutor:1、执行数据分组逻辑。2、遍历Statement执行批量保存逻辑。
● StatementImpl:保存batchedArgs
● PreparedStatement:执行最终的批量保存逻辑
可以看出JDK层只执行最终的保存操作,如果这里的数据batchedArgs没有拿到批量的,那一定是MybatisPlus的分组逻辑出现问题。通过调试发现问题出现在
if (sql.equals(currentSql) && ms.equals(currentStatement))
由于有些数据有些列没有默认值导致SQL的列不同,数据被添加到不同的Statement执行,导致最终的批量操作失效。
总结
整个批量过程可以分为两个阶段:
- 1、将批量数据添加到statementImpl.batchedArgs中保存。
- 2、调用statement.executeBatch方法完成批量。
来看下这两步最基本的操作之上,MybatisPlus做了哪些事情:
- 1、定义批量操作的模板。
- 2、验证集合中的数据,将完全一致的SQL添加到同一个Statement中。
- 3、Jdbc3KeyGenerator?
值得注意的是,批量模板中的单条新增调用的是sqlSession.insert(),这个方法是没有执行execute的,只是将数据放到statementImpl.batchedArgs中。而常规的单条新增调用的是baseMapper.insert()方法,其是基于动态代理的方法来实现。
可以看出以上流程经历了已知的四层结构:MybatisPlus–>Mybatis–>MySQL connector–>JDK。其最底层还是通过preparedStatement来实现批量操作,和我们通过原始JDBC来实现批量操作的原理相同,上层都是框架实现的封装。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。