文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

mysql JDBC的三种查询(普通、流式、游标)

2023-09-14 20:25

关注

使用JDBC向mysql发送查询时,有三种方式:

1、常规查询

public static void normalQuery() throws SQLException {    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");    PreparedStatement statement = connection.prepareStatement(sql);    //statement.setFetchSize(100); //不起作用    ResultSet resultSet = statement.executeQuery();        while(resultSet.next()){        System.out.println(resultSet.getString(2));    }    resultSet.close();    statement.close();    connection.close();}

1)说明:

  1. 第四行设置featchSize不起作用。
  2. 第五行statement.executeQuery()执行查询会阻塞,因为需要等到所有数据返回并放到内存中;接下来每次执行resultSet.next()方法会从内存中获取数据。

2)将jvm内存设置较小(-Xms16m -Xmx16m),对于大数据的查询会产生OOM:

为了避免OOM,通常我们会使用分页查询,或者下面的两种方式。

2、流式查询

public static void streamQuery() throws Exception {     Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");    PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);        statement.setFetchSize(Integer.MIN_VALUE);     //或者通过 com.mysql.jdbc.StatementImpl    ((StatementImpl) statement).enableStreamingResults();        ResultSet rs = statement.executeQuery();    while (rs.next()) {        System.out.println(rs.getString(2));    }    rs.close();    statement.close();    connection.close();}

2.1)流式查询的条件:

随着大数据的到来,对于百万、千万的数据使用流式查询可以有效避免OOM。在执行statement.executeQuery()时不会从TCP响应流中读取完所有数据,当下面执行rs.next()时会按照需要从TCP响应流中读取部分数据。

  1. 创建Statement的时候需要制定ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
  2. 设置fetchSize位Integer.MIN_VALUE

或者通过com.mysql.jdbc.StatementImpl的enableStreamingResults()方法设置。二者是一致的。看mysql的jdbc(com.mysql.jdbc.StatementImpl)源码:

2.2)流式查询原理:

1)基本概念

我们要知道jdbc客户端和mysql服务器之间是通过TCP建立的通信,使用mysql协议进行传输数据。首先声明一个概念:在三次握手建立了TCP连接后,就可以在这个通道上进行通信了,直到关闭该连接。

在 TCP 中发送端和接收端**可以是客户端/服务端,也可以是服务器/客户端**,通信的双方在任意时刻既可以是接收数据也可以是发送数据(全双工)。在通信中,收发双方都不保持记录的边界,所以需要按照一定的协议进行表示。在mysql中会按照mysql协议来进行交互。

有了上面的概念,我们重新来定义这两种查询:

在执行st.executeQuery()时,jdbc驱动会通过connection对象和mysql服务器建立TCP连接,同时在这个链接通道中发送sql命令,并接受返回。二者的区别是:

  1. 普通查询:也叫批量查询,jdbc客户端会阻塞的一次性从TCP通道中读取完mysql服务的返回数据;
  2. 流式查询:分批的从TCP通道中读取mysql服务返回的数据,每次读取的数据量并不是一行(通常是一个package大小),jdbc客户端在调用rs.next()方法时会根据需要从TCP流通道中读取部分数据。(并不是每次读区一行数据,网上说的几乎都是错的!)

2)源码查看:

从statement.executeQuery()方法跟进去,主要的调用连如下:

protected ResultSetInternalMethods executeInternal(int maxRowsToRetrieve, Buffer sendPacket, boolean createStreamingResultSet, boolean queryIsSelectOnly,            Field[] metadataFromCache, boolean isBatch) throws SQLException {        synchronized (checkClosed().getConnectionMutex()) {            MySQLConnection locallyScopedConnection = this.connection;            rs = locallyScopedConnection.execSQL(this, null, maxRowsToRetrieve, sendPacket, this.resultSetType, this.resultSetConcurrency,createStreamingResultSet, this.currentCatalog, metadataFromCache, isBatch);            return rs;        }public ResultSetInternalMethods execSQL(StatementImpl callingStatement, String sql, int maxRows, Buffer packet, int resultSetType, int resultSetConcurrency,            boolean streamResults, String catalog, Field[] cachedMetadata, boolean isBatch) throws SQLException {        synchronized (getConnectionMutex()) {            return this.io.sqlQueryDirect(callingStatement, null, null, packet, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,                        cachedMetadata);        }}final ResultSetInternalMethods sqlQueryDirect(StatementImpl callingStatement, String query, String characterEncoding, Buffer queryPacket, int maxRows,            int resultSetType, int resultSetConcurrency, boolean streamResults, String catalog, Field[] cachedMetadata) throws Exception {        Buffer resultPacket = sendCommand(MysqlDefs.QUERY, null, queryPacket, false, null, 0);        ResultSetInternalMethods rs = readAllResults(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog, resultPacket,                    false, -1L, cachedMetadata);        return rs;}ResultSetImpl readAllResults(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency, boolean streamResults,            String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {        ResultSetImpl topLevelResultSet = readResultsForQueryOrUpdate(callingStatement, maxRows, resultSetType, resultSetConcurrency, streamResults, catalog,                resultPacket, isBinaryEncoded, preSentColumnCount, metadataFromCache);        return topLevelResultSet;}protected final ResultSetImpl readResultsForQueryOrUpdate(StatementImpl callingStatement, int maxRows, int resultSetType, int resultSetConcurrency,            boolean streamResults, String catalog, Buffer resultPacket, boolean isBinaryEncoded, long preSentColumnCount, Field[] metadataFromCache) throws SQLException {            com.mysql.jdbc.ResultSetImpl results = getResultSet(callingStatement, columnCount, maxRows, resultSetType, resultSetConcurrency, streamResults,                    catalog, isBinaryEncoded, metadataFromCache);            return results;        }}protected ResultSetImpl getResultSet(StatementImpl callingStatement, long columnCount, int maxRows, int resultSetType, int resultSetConcurrency,            boolean streamResults, String catalog, boolean isBinaryEncoded, Field[] metadataFromCache) throws SQLException {        Buffer packet; // The packet from the server        RowData rowData = null;        if (!streamResults) {            rowData = readSingleRowSet(columnCount, maxRows, resultSetConcurrency, isBinaryEncoded, (metadataFromCache == null) ? fields : metadataFromCache);        } else {            rowData = new RowDataDynamic(this, (int) columnCount, (metadataFromCache == null) ? fields : metadataFromCache, isBinaryEncoded);            this.streamingData = rowData;        }        ResultSetImpl rs = buildResultSetWithRows(callingStatement, catalog, (metadataFromCache == null) ? fields : metadataFromCache, rowData, resultSetType,                resultSetConcurrency, isBinaryEncoded);        return rs;}

说明:

  1. sqlQueryDirect()方法中的sendCommand会通过io发送sql命令请求到mysql服务器,并获取返回流mysqlOutput
  2. getResultSet()方法会判断是否是流式查询还是批量查询。MySQL驱动会根据不同的参数设置选择对应的ResultSet实现类,分别对应三种查询方式:

看上述代码(41行),对于批量查询:readSingleRowSet方法会循环掉用nextRow方法获取所有数据,然后放到jvm内存的rows中:

对于流式查询:直接创建RowDataDynamic对象返回。后面在掉用rs.next()获取数据时会根据需要从mysqlOutput流中读取数据。

2.3)流式查询的坑:

public static void streamQuery2() throws Exception {     Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false", "root", "123456");    //statement1    PreparedStatement statement = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);        statement.setFetchSize(Integer.MIN_VALUE);     ResultSet rs = statement.executeQuery();    if (rs.next()) {        System.out.println(rs.getString(2));    }    //statement2    PreparedStatement statement2 = connection.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);        statement2.setFetchSize(Integer.MIN_VALUE);     ResultSet rs2 = statement2.executeQuery();    if (rs2.next()) {        System.out.println(rs2.getString(2));    }//      rs.close();//      statement.close();//      connection.close();}

执行结果:

test1java.sql.SQLException: Streaming result set com.mysql.jdbc.RowDataDynamic@45c8e616 is still active. No statements may be issued when any streaming result sets are open and in use on a given connection. Ensure that you have called .close() on any active streaming result sets before attempting more queries.at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:869)at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:865)at com.mysql.jdbc.MysqlIO.checkForOutstandingStreamingData(MysqlIO.java:3217)at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2453)at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683)at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2482)at com.mysql.jdbc.StatementImpl.executeSimpleNonQuery(StatementImpl.java:1465)at com.mysql.jdbc.StatementImpl.setupStreamingTimeout(StatementImpl.java:726)at com.mysql.jdbc.PreparedStatement.executeQuery(PreparedStatement.java:1939)at com.tencent.clue_disp_api.MysqlTest.streamQuery2(MysqlTest.java:79)at com.tencent.clue_disp_api.MysqlTest.main(MysqlTest.java:25)

MySQL Connector/J 5.1 Developer Guide中原文:

There are some caveats with this approach. You must read all of the rows in the result set (or close it) before you can issue any other queries on the connection, or an exception will be thrown. 也就是说当通过流式查询获取一个ResultSet后,通过next迭代出所有元素之前或者调用close关闭它之前,不能使用同一个数据库连接去发起另外一个查询,否者抛出异常(第一次调用的正常,第二次的抛出异常)。

2.4)抓包验证:

查看3307 > 62169的包可以发现,ack都是1324,证明都是针对当时sql请求的返回数据。

3、游标查询

public static void cursorQuery() throws Exception {    Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3307/test?useSSL=false&useCursorFetch=true", "root", "123456");    ((JDBC4Connection) connection).setUseCursorFetch(true); //com.mysql.jdbc.JDBC4Connection    Statement statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);        statement.setFetchSize(2);        ResultSet rs = statement.executeQuery(sql);        while (rs.next()) {        System.out.println(rs.getString(2));        Thread.sleep(5000);    }        rs.close();    statement.close();    connection.close();}

1)说明:

2)抓包验证:

通过wireshark抓包,可以看到每执行一次rs.next() 就会向mysql服务发送一个请求,同时mysql服务返回两条数据:

3)游标查询需要注意的点:

由于MySQL方不知道客户端什么时候将数据消费完,而自身的对应表可能会有DML写入操作,此时MySQL需要建立一个临时空间来存放需要拿走的数据。因此对于当你启用useCursorFetch读取大表的时候会看到MySQL上的几个现象:

  1. IOPS飙升 (IOPS (Input/Output Per Second):磁盘每秒的读写次数)
  2. 磁盘空间飙升
  3. 客户端JDBC发起SQL后,长时间等待SQL响应数据,这段时间就是服务端在准备数据
  4. 在数据准备完成后,开始传输数据的阶段,网络响应开始飙升,IOPS由“读写”转变为“读取”。
  5. CPU和内存会有一定比例的上升

来源地址:https://blog.csdn.net/liuxiao723846/article/details/130726967

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯