文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

SpringBoot如何集成MaxCompute

2023-07-06 12:52

关注

这篇文章主要介绍“SpringBoot如何集成MaxCompute”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“SpringBoot如何集成MaxCompute”文章能帮助大家解决问题。

1、SDK方式集成

1.1、依赖引入odps-sdk-core
<properties>    <java.version>1.8</java.version>    <!--maxCompute sdk 版本号-->    <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version></properties><dependencies>  <!--max compute sdk-->  <dependency>    <groupId>com.aliyun.odps</groupId>    <artifactId>odps-sdk-core</artifactId>    <version>${max-compute-sdk.version}</version></dependency></dependencies>
1.2、编写连接工具类

编写MaxComputeSdkUtil以SDK方式连接MaxCompute

1.2.1、重要类和方法说明

连接参数类:

@Datapublic class MaxComputeSdkConnParam {        private String aliyunAccessId;        private String aliyunAccessKey;        private String maxComputeEndpoint;        private String projectName;}

查询表元数据信息实体

主要是字段:tableName, comment。还可以自己添加其他字段

@Data@NoArgsConstructor@AllArgsConstructorpublic class TableMetaInfo {        private String tableName;        private String comment;}

公共方法(初始化)

private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";private static final String FULL_SCAN_CONFIG = "odps.sql.allow.fullscan";private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";private final Odps odps;private final MaxComputeSdkConnParam connParam;public MaxComputeSdkUtil(MaxComputeSdkConnParam param){    this.connParam = param;    // 构建odps客户端    this.odps = buildOdps();}private Odps buildOdps() {    // 阿里云账号密码  AccessId 和 AccessKey    final String aliyunAccessId = connParam.getAliyunAccessId();    final String aliyunAccessKey = connParam.getAliyunAccessKey();    // 创建阿里云账户    final AliyunAccount aliyunAccount = new AliyunAccount(aliyunAccessId, aliyunAccessKey);    // 使用阿里云账户创建odps客户端    final Odps odps = new Odps(aliyunAccount);    // 传入了的话就是用传入的 没有传入使用默认的    final String endpoint = connParam.getMaxComputeEndpoint();    try {        odps.setEndpoint(ObjectUtils.isEmpty(endpoint) ? defaultEndpoint : endpoint);    } catch (Exception e) {        // 端点格式不正确        throw new BizException(ResultCode.MAX_COMPUTE_ENDPOINT_ERR);    }    // 设置项目    odps.setDefaultProject(connParam.getProjectName());    return odps;}

查询表信息

public List<TableMetaInfo> getTableInfos(){    final Tables tables = odps.tables();    List<TableMetaInfo> resultTables = new ArrayList<>();    try {        for (Table table : tables) {            // tableName            final String name = table.getName();            // 描述            final String comment = table.getComment();            final TableMetaInfo info = new TableMetaInfo(name, comment);            resultTables.add(info);        }    } catch (Exception e) {        e.printStackTrace();        final String errMsg = ObjectUtils.isEmpty(e.getMessage()) ? "" : e.getMessage();        if (errMsg.contains("ODPS-0410051:Invalid credentials")){            throw new BizException(ResultCode.MAX_COMPUTE_UNAME_ERR);        }        if (errMsg.contains("ODPS-0410042:Invalid signature value")){            throw new BizException(ResultCode.MAX_COMPUTE_PWD_ERR);        }        if (errMsg.contains("ODPS-0420095: Access Denied")){            throw new BizException(ResultCode.MAX_COMPUTE_PROJECT_ERR);        }    }    return resultTables;}

执行SQL封装

public List<Map<String, Object>> queryData(String querySql, boolean fullScan){    try {        // 配置全表扫描吗        configFullScan(fullScan);        // 使用任务执行SQL        final Instance instance = SQLTask.run(odps, querySql);        // 等待执行成功        instance.waitForSuccess();        // 封装返回结果        List<Record> records = SQLTask.getResult(instance);        // 结果转换为Map        return buildMapByRecords(records);    } catch (OdpsException e) {        e.printStackTrace();        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);    }}private void configFullScan(boolean fullScan) {        if (fullScan){        // 开启全表扫描配置        Map<String, String> config = new HashMap<>();        log.info("===>>开启全表扫描, 查询多个分区数据");        config.put(FULL_SCAN_CONFIG, "true");        odps.setGlobalSettings(config);        }else {        // 移除全表扫描配置        odps.getGlobalSettings().remove(FULL_SCAN_CONFIG);        }    }private List<Map<String, Object>> buildMapByRecords(List<Record> records) {        List<Map<String, Object>> listMap = new ArrayList<>();        for (Record record : records) {        Column[] columns = record.getColumns();        Map<String, Object> map = new LinkedHashMap<>();        for (Column column : columns) {        String name = column.getName();        Object value = record.get(name);        // maxCompute里面的空返回的是使用\n        if ("\\N".equalsIgnoreCase(String.valueOf(value))) {        map.put(name, "");        } else {        map.put(name, value);        }        }        listMap.add(map);        }        return listMap;    }

分页查询分装

public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size, boolean fullScan){    // 重写SQl,添加limit offset, limit    // 1、替换分号    querySql = querySql.replaceAll(";", "");    // 2、格式化SQL    Integer offset = (page - 1 ) * size;    // 得到执行sql    final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);    log.info("=======>>>执行分页sql为:{}", execSql);    // 调用执行SQL数据    return queryData(execSql, fullScan);}public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){    // 1、替换分号    querySql = querySql.replaceAll(";", "");    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);    log.info("=======>>>执行分页统计总数sql为:{}", countSql);    // 查询总数    final List<Map<String, Object>> countMap = queryData(countSql, false);    if (CollectionUtils.isEmpty(countMap)){        return new PageResult<>(0L, new ArrayList<>());    }    long count = 0L;    for (Object value : countMap.get(0).values()) {        count = Long.parseLong(String.valueOf(value));    }    if (count == 0){        return new PageResult<>(0L, new ArrayList<>());    }    // 执行分页查询 开启全表扫描    final List<Map<String, Object>> resultList = queryData(querySql, page, size, true);    return new PageResult<>(count, resultList);}public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){    final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size);    List<T> rows = new ArrayList<>();    for (Map<String, Object> row : result.getRows()) {        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);        rows.add(t);    }    return new PageResult<>(result.getTotal(), rows);}
1.2.2 工具类测试

使用测试数据测试工具类

public static void main(String[] args) {    // 构建连接参数    final MaxComputeSdkConnParam connParam = new MaxComputeSdkConnParam();    connParam.setAliyunAccessId("您的阿里云账号accessId");    connParam.setAliyunAccessKey("您的阿里云账号accessKey");    connParam.setProjectName("项目名");    // 实例化工具类    final MaxComputeSdkUtil sdkUtil = new MaxComputeSdkUtil(connParam);    // 查询所有表    final List<TableMetaInfo> tableInfos = sdkUtil.getTableInfos();    for (TableMetaInfo tableInfo : tableInfos) {        System.out.println(tableInfo.getTableName());    }    // 分页查询数据    final PageResult<Map<String, Object>> page = sdkUtil.pageQueryMap("select * from ods_cust;", 2, 10);    System.out.println(page.getTotal());    for (Map<String, Object> map : page.getRows()) {        System.out.println(JSONObject.toJSONString(map));    }}
1.2.3 为什么要开启全表扫描

maxCompute存在使用限制如下:

当使用select语句时,屏显最多只能显示10000行结果。当select语句作为子句时则无此限制,select子句会将全部结果返回给上层查询。
select语句查询分区表时默认禁止全表扫描。
自2018年1月10日20:00:00后,在新创建的项目上执行SQL语句时,默认情况下,针对该项目里的分区表不允许执行全表扫描操作。在查询分区表数据时必须指定分区,由此减少SQL的不必要I/O,从而减少计算资源的浪费以及按量计费模式下不必要的计算费用。

如果您需要对分区表进行全表扫描,可以在全表扫描的SQL语句前加上命令set odps.sql.allow.fullscan=true;,并和SQL语句一起提交执行。假设sale_detail表为分区表,需要同时执行如下语句进行全表查询:

2、JDBC方式集成

使用odps-jdbc集成, 官方文档地址MaxCompute Java JDBC介绍

<properties>    <java.version>1.8</java.version>    <!--maxCompute-jdbc-版本号-->    <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version></properties><dependencies>  <!--max compute jdbc-->  <dependency>    <groupId>com.aliyun.odps</groupId>    <artifactId>odps-jdbc</artifactId>    <version>${max-compute-jdbc.version}</version>    <classifier>jar-with-dependencies</classifier>  </dependency></dependencies>
2.2、编写连接工具类

编写MaxComputeSdkUtil以JDBC方式连接MaxCompute

2.2.1、重要类和方法说明

连接参数类:

@Datapublic class MaxComputeJdbcConnParam {    private String aliyunAccessId;    private String aliyunAccessKey;    private String endpoint;    private String projectName;}

公共方法(初始化)

private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";private static final String SELECT_ALL_TABLE_SQL = "select table_name, table_comment from Information_Schema.TABLES";private static final String SELECT_FIELD_BY_TABLE_SQL = "select column_name, column_comment from Information_Schema.COLUMNS where table_name = '%s'";private static final String PAGE_SELECT_TEMPLATE_SQL = "select z.* from (%s) z limit %s, %s;";private static final String PAGE_COUNT_TEMPLATE_SQL = "select count(1) from (%s) z;";private final Connection conn;private final MaxComputeJdbcConnParam connParam;public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {    this.connParam = connParam;    this.conn = buildConn();}private Connection buildConn() {    try {        Class.forName(DRIVER_NAME);    } catch (ClassNotFoundException e) {        e.printStackTrace();        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);    }    try {        // JDBCURL连接模板        String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";        // 使用驱动管理器连接获取连接        return DriverManager.getConnection(                String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()),                connParam.getAliyunAccessId(), connParam.getAliyunAccessKey());    } catch (SQLException e) {        e.printStackTrace();        throw new BizException(ResultCode.MAX_COMPUTE_JDBC_DRIVE_LOAD_ERR);    }}

查询表信息

public List<TableMetaInfo> getTableInfos(){    List<TableMetaInfo> resultList = new ArrayList<>();    Statement statement = null;    ResultSet resultSet = null;    try {        // 创建statement 使用SQL直接查询        statement = conn.createStatement();        // 执行查询语句        resultSet = statement.executeQuery(SELECT_ALL_TABLE_SQL);        while (resultSet.next()){            final String tableName = resultSet.getString("table_name");            final String tableComment = resultSet.getString("table_comment");            final TableMetaInfo info = new TableMetaInfo(tableName, tableComment);            resultList.add(info);        }        return resultList;    } catch (SQLException e) {        e.printStackTrace();        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);    } finally {        // 关闭resultSet        closeResultSet(resultSet);        // 关闭statement        closeStatement(statement);    }}

执行SQL封装

public List<Map<String, Object>> queryData(String querySql){    List<Map<String, Object>> resultList = new ArrayList<>();    Statement statement = null;    ResultSet resultSet = null;    try {        // 创建statement        statement = conn.createStatement();        // 执行查询语句        resultSet = statement.executeQuery(querySql);        // 构建结果返回        buildMapByRs(resultList, resultSet);        return resultList;    } catch (SQLException e) {        e.printStackTrace();        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);    } finally {        // 关闭resultSet        closeResultSet(resultSet);        // 关闭statement        closeStatement(statement);    }}private void buildMapByRs(List<Map<String, Object>> resultList, ResultSet resultSet) throws SQLException {    // 获取元数据    ResultSetMetaData metaData = resultSet.getMetaData();    while (resultSet.next()) {        // 获取列数        int columnCount = metaData.getColumnCount();        Map<String, Object> map = new HashMap<>();        for (int i = 0; i < columnCount; i++) {            String columnName = metaData.getColumnName(i + 1);            Object object = resultSet.getObject(columnName);            // maxCompute里面的空返回的是使用\n            if ("\\N".equalsIgnoreCase(String.valueOf(object))) {                map.put(columnName, "");            } else {                map.put(columnName, object);            }        }        resultList.add(map);    }}private void closeStatement(Statement statement){    if (statement != null){        try {            statement.close();        } catch (SQLException e) {            e.printStackTrace();        }    }}private void closeResultSet(ResultSet resultSet){    if (resultSet != null){        try {            resultSet.close();        } catch (SQLException e) {            e.printStackTrace();        }    }}

分页查询分装

public List<Map<String, Object>> queryData(String querySql, Integer page, Integer size){    List<Map<String, Object>> resultList = new ArrayList<>();    Statement statement = null;    ResultSet resultSet = null;    try {        // 1、替换分号        querySql = querySql.replaceAll(";", "");        // 创建statement        statement = conn.createStatement();        // 2、格式化SQL        int offset = (page - 1 ) * size;        final String execSql = String.format(PAGE_SELECT_TEMPLATE_SQL, querySql, offset, size);        log.info("=======>>>执行分页sql为:{}", execSql);        // 执行查询语句        resultSet = statement.executeQuery(execSql);        // 构建结果返回        buildMapByRs(resultList, resultSet);        return resultList;    } catch (SQLException e) {        e.printStackTrace();        throw new BizException(ResultCode.MAX_COMPUTE_SQL_EXEC_ERR);    } finally {        // 关闭resultSet        closeResultSet(resultSet);        // 关闭statement        closeStatement(statement);    }}public PageResult<Map<String, Object>> pageQueryMap(String querySql, Integer page, Integer size){    // 1、替换分号    querySql = querySql.replaceAll(";", "");    String countSql = String.format(PAGE_COUNT_TEMPLATE_SQL, querySql);    log.info("=======>>>执行分页统计总数sql为:{}", countSql);    // 查询总数    final List<Map<String, Object>> countMap = queryData(countSql);    if (CollectionUtils.isEmpty(countMap)){        return new PageResult<>(0L, new ArrayList<>());    }    long count = 0L;    for (Object value : countMap.get(0).values()) {        count = Long.parseLong(String.valueOf(value));    }    if (count == 0){        return new PageResult<>(0L, new ArrayList<>());    }    // 执行分页查询 开启全表扫描    final List<Map<String, Object>> resultList = queryData(querySql, page, size);    return new PageResult<>(count, resultList);}public <T>PageResult<T> pageQuery(String querySql, Integer page, Integer size, Class<T> clazz){    final PageResult<Map<String, Object>> result = pageQueryMap(querySql, page, size);    List<T> rows = new ArrayList<>();    for (Map<String, Object> row : result.getRows()) {        final T t = JSONObject.parseObject(JSONObject.toJSONString(row), clazz);        rows.add(t);    }    return new PageResult<>(result.getTotal(), rows);}
2.2.2 工具类测试

使用测试数据测试工具类

public static void main(String[] args) {    final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();    connParam.setAliyunAccessId("您的阿里云账号accessId");    connParam.setAliyunAccessKey("您的阿里云账号accessKey");    connParam.setProjectName("项目名");    connParam.setEndpoint("http://service.cn-hangzhou.maxcompute.aliyun.com/api");    final MaxComputeJdbcUtil jdbcUtil = new MaxComputeJdbcUtil(connParam);    // 获取表信息    final List<TableMetaInfo> tableInfos = jdbcUtil.getTableInfos();    for (TableMetaInfo tableInfo : tableInfos) {        System.out.println(tableInfo);    }    // 获取字段信息    final String tableName = tableInfos.get(new Random().nextInt(tableInfos.size())).getTableName();    final List<TableColumnMetaInfo> fields = jdbcUtil.getFieldByTableName(tableName);    for (TableColumnMetaInfo field : fields) {        System.out.println(field.getFieldName() + "-" + field.getComment());    }    // 执行查询    final List<Map<String, Object>> list = jdbcUtil.queryData("select * from ods_cust;");    for (Map<String, Object> map : list) {        System.out.println(JSONObject.toJSONString(map));    }    // 执行分页查询    final List<Map<String, Object>> list2 = jdbcUtil.queryData("select * from ods_cust;", 2, 10);    for (Map<String, Object> map : list2) {        System.out.println(JSONObject.toJSONString(map));    }    // 执行分页查询 并返回count    final PageResult<Map<String, Object>> list3 = jdbcUtil.pageQueryMap("select * from ods_cust;", 2, 10);    System.out.println(list3.getTotal());    for (Map<String, Object> map : list3.getRows()) {        System.out.println(JSONObject.toJSONString(map));    }    jdbcUtil.close();}

关于“SpringBoot如何集成MaxCompute”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网行业资讯频道,小编每天都会为大家更新不同的知识点。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     807人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     351人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     314人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     433人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     221人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯