文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Java ClickHouse整合—官方教程

2023-10-22 07:06

关注

一、开发环境

 1、支持的数据类型

FormatSupportComment
AggregatedFunctionlimited to groupBitmap, and known to have issue with 64bit bitmap
Array(*)
Bool
Date*
DateTime*
Decimal*SET output_format_decimal_trailing_zeros=1 in 21.9+ for consistency
Enum*can be treated as both string and integer
Geo TypesPoint, Ring, Polygon, and MultiPolygon
Int*, UInt*UInt64 is mapped to long
IPv*
Map(*)
Nested(*)
Object('JSON')
SimpleAggregateFunction
*String
Tuple(*)
UUID

二、Java客户端方式

1、引入依赖

    com.clickhouse        clickhouse-http-client    0.4.0

2、连接ClickHouse

连接字符串protocol://host[:port][/database][?param[=value][¶m[=value]][#tag[,tag]]

示例:

 ClickHouseNode server = ClickHouseNode.builder()                .host(System.getProperty("chHost", "localhost"))                .port(ClickHouseProtocol.HTTP, Integer.getInteger("chPort", 8123))                // .port(ClickHouseProtocol.GRPC, Integer.getInteger("chPort", 9000))                // .port(ClickHouseProtocol.TCP, Integer.getInteger("chPort", 9100))                .database("system").credentials(ClickHouseCredentials.fromUserAndPassword(                        System.getProperty("chUser", "default"), System.getProperty("chPassword", "")))                .build();        String table = "java_client_example_table";        try {            dropAndCreateTable(server, table);            System.out.println("Insert: " + insert(server, table));            System.out.println("Query: " + query(server, table));        } catch (ClickHouseException e) {            e.printStackTrace();        }

3、查询

    static int query(ClickHouseNode server, String table) throws ClickHouseException {        try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol());                ClickHouseResponse response = client.read(server)                        // prefer to use RowBinaryWithNamesAndTypes as it's fully supported                        // see details at https://github.com/ClickHouse/clickhouse-java/issues/928                        .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)                        .query("select * from " + table).execute().get()) {            int count = 0;            // or use stream API via response.stream()            for (ClickHouseRecord r : response.records()) {                count++;            }            return count;        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            throw ClickHouseException.forCancellation(e, server);        } catch (ExecutionException e) {            throw ClickHouseException.of(e, server);        }    }

4、流式查询

ClickHouseResponse response = client.connect(endpoint) // or client.connect(endpoints)    // you'll have to parse response manually if using a different format    .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)    .query("select * from numbers(:limit)")    .params(1000).executeAndWait()) {    for (ClickHouseRecord r : response.records()) {        int num = r.getValue(0).asInteger();        // type conversion        String str = r.getValue(0).asString();        LocalDate date = r.getValue(0).asDate();    }

5、Insert

   static long insert(ClickHouseNode server, String table) throws ClickHouseException {        try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) {            ClickHouseRequest.Mutation request = client.connect(server).write().table(table)                    .format(ClickHouseFormat.RowBinary);            ClickHouseConfig config = request.getConfig();            CompletableFuture future;            // back-pressuring is not supported, you can adjust the first two arguments            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()                    .createPipedOutputStream(config, (Runnable) null)) {                // in async mode, which is default, execution happens in a worker thread                future = request.data(stream.getInputStream()).execute();                // writing happens in main thread                for (int i = 0; i < 10_000; i++) {                    BinaryStreamUtils.writeString(stream, String.valueOf(i % 16));                    BinaryStreamUtils.writeNonNull(stream);                    BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString());                }            }            // response should be always closed            try (ClickHouseResponse response = future.get()) {                ClickHouseResponseSummary summary = response.getSummary();                return summary.getWrittenRows();            }        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            throw ClickHouseException.forCancellation(e, server);        } catch (ExecutionException | IOException e) {            throw ClickHouseException.of(e, server);        }    }

6、多语句操作

在同一会话中一个接一个地在工作线程中执行多个语句:

CompletableFuture> future = ClickHouseClient.send(servers.get(),    "create database if not exists my_base",    "use my_base",    "create table if not exists test_table(s String) engine=Memory",    "insert into test_table values('1')('2')('3')",    "select * from test_table limit 1",    "truncate table test_table",    "drop table if exists test_table");// block current thread until queries completed, and then retrieve summariesList results = future.get();

7、表操作 

static void dropAndCreateTable(ClickHouseNode server, String table) throws ClickHouseException {        try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) {            ClickHouseRequest request = client.connect(server);            // or use future chaining            request.query("drop table if exists " + table).execute().get();            request.query("create table " + table + "(a String, b Nullable(String)) engine=MergeTree() order by a")                    .execute().get();        } catch (InterruptedException e) {            Thread.currentThread().interrupt();            throw ClickHouseException.forCancellation(e, server);        } catch (ExecutionException e) {            throw ClickHouseException.of(e, server);        }    }

三、JDBC Driver方式

clickhouse jdbc实现了标准jdbc接口。它构建在clickhouse客户端之上,提供了自定义类型映射、事务支持、标准同步UPDATE和DELETE语句等附加功能,因此可以轻松地与遗留应用程序和工具一起使用。
clickhouse jdbc API是同步的,通常会有更多的开销(例如SQL解析和类型映射/转换等)。当性能至关重要时,或者如果您喜欢更直接的访问clickhouse的方式,请考虑使用clickhouse客户端。

1、引入依赖

    com.clickhouse    clickhouse-jdbc    0.4.0        all                        *            *            

2、配置

驱动程序类com.clickhouse.jdbc.ClickHouseDriver

连接字符串:例如:jdbc:(ch|clickhouse)[:]://endpoint1[,endpoint2,...][/][?param1=value1¶m2=value2][#tag1,tag2,...]

连接属性

属性违约描述
continueBatchOnErrorfalse发生错误时是否继续批处理 
createDatabaseIfNotExistfalse如果数据库不存在,是否创建数据库
custom_http_headers逗号分隔的自定义 HTTP 标头,例如:User-Agent=client1,X-Gateway-Id=123
custom_http_params逗号分隔的自定义 HTTP 标头,例如:User-Agent=client1,X-Gateway-Id=123
nullAsDefault00-将null值按原样处理,并在将null插入不可为null的列时引发异常;1-按原样处理null值,并禁用插入时的null检查;2-将null替换为查询和插入的相应数据类型的默认值
jdbcCompliancetrue是否支持标准同步 UPDATE/DELETE 和fake transaction
typeMappings自定义ClickHouse数据类型和Java类之间的映射,这将影响getColumnType()和getObject(class<?>)的结果。例如:UInt128=java.lang.String,UInt256=java.lang.String
wrapperObjectfalsegetObject()是否应为Array/Tuple返回java.sql.Array/java.sql.Struct。

3、连接到ClickHouse

String url = "jdbc:ch://my-server/system"; // use http protocol and port 8123 by default// String url = "jdbc:ch://my-server:8443/system?ssl=true&sslmode=strict&&sslrootcert=/mine.crt";Properties properties = new Properties();// properties.setProperty("ssl", "true");// properties.setProperty("sslmode", "NONE"); // NONE to trust all servers; STRICT for trusted onlyClickHouseDataSource dataSource = new ClickHouseDataSource(url, new Properties());try (Connection conn = dataSource.getConnection("default", "password");    Statement stmt = conn.createStatement()) {}

4、Query

try (Connection conn = dataSource.getConnection(...);    Statement stmt = conn.createStatement()) {    ResultSet rs = stmt.executeQuery("select * from numbers(50000)");    while(rs.next()) {        // ...    }}

5、Insert

使用 input table函数

具有最佳性能的推荐方式

try (PreparedStatement ps = conn.prepareStatement(    "insert into mytable select col1, col2 from input('col1 String, col2 DateTime64(3), col3 Int32')")) {    // the column definition will be parsed so the driver knows there are 3 parameters: col1, col2 and col3    ps.setString(1, "test"); // col1    ps.setObject(2, LocalDateTime.now()); // col2, setTimestamp is slow and not recommended    ps.setInt(3, 123); // col3    ps.addBatch(); // parameters will be write into buffered stream immediately in binary format    ...    ps.executeBatch(); // stream everything on-hand into ClickHouse}

Insert

它更易于使用,但与输入函数相比,性能更慢

try (PreparedStatement ps = conn.prepareStatement("insert into mytable(* except (description))")) {    // the driver will issue query "select * except (description) from mytable where 0" for type inferring    // since description column is excluded, we know there are only two parameters: col1 and col2    ps.setString(1, "test"); // id    ps.setObject(2, LocalDateTime.now()); // timestamp    ps.addBatch(); // parameters will be write into buffered stream immediately in binary format    ...    ps.executeBatch(); // stream everything on-hand into ClickHouse}

使用占位符新增

不推荐,因为它基于大型 SQL

// Note: "insert into mytable values(?,?,?)" is treated as "insert into mytable"try (PreparedStatement ps = conn.prepareStatement("insert into mytable values(trim(?),?,?)")) {    ps.setString(1, "test"); // id    ps.setObject(2, LocalDateTime.now()); // timestamp    ps.setString(3, null); // description    ps.addBatch(); // append parameters to the query    ...    ps.executeBatch(); // issue the composed query: insert into mytable values(...)(...)...(...)}

6、高级接口

处理日期时间和时区

请使用java.time.LocalDateTime或java.time.OffsetDateTime代替java.sql.Timestamp,并使用java.time_LocalDate代替java.sql.Date。

try (PreparedStatement ps = conn.prepareStatement("select date_time from mytable where date_time > ?")) {    ps.setObject(2, LocalDateTime.now());    ResultSet rs = ps.executeQuery();    while(rs.next()) {        LocalDateTime dateTime = (LocalDateTime) rs.getObject(1);    }    ...}

7、处理聚合函数

// batch insert using input functiontry (ClickHouseConnection conn = newConnection(props);        Statement s = conn.createStatement();        PreparedStatement stmt = conn.prepareStatement(                "insert into test_batch_input select id, name, value from input('id Int32, name Nullable(String), desc Nullable(String), value AggregateFunction(groupBitmap, UInt32)')")) {    s.execute("drop table if exists test_batch_input;"            + "create table test_batch_input(id Int32, name Nullable(String), value AggregateFunction(groupBitmap, UInt32))engine=Memory");    Object[][] objs = new Object[][] {            new Object[] { 1, "a", "aaaaa", ClickHouseBitmap.wrap(1, 2, 3, 4, 5) },            new Object[] { 2, "b", null, ClickHouseBitmap.wrap(6, 7, 8, 9, 10) },            new Object[] { 3, null, "33333", ClickHouseBitmap.wrap(11, 12, 13) }    };    for (Object[] v : objs) {        stmt.setInt(1, (int) v[0]);        stmt.setString(2, (String) v[1]);        stmt.setString(3, (String) v[2]);        stmt.setObject(4, v[3]);        stmt.addBatch();    }    int[] results = stmt.executeBatch();    ...}// use bitmap as query parametertry (PreparedStatement stmt = conn.prepareStatement(    "SELECT bitmapContains(my_bitmap, toUInt32(1)) as v1, bitmapContains(my_bitmap, toUInt32(2)) as v2 from {tt 'ext_table'}")) {    stmt.setObject(1, ClickHouseExternalTable.builder().name("ext_table")            .columns("my_bitmap AggregateFunction(groupBitmap,UInt32)").format(ClickHouseFormat.RowBinary)            .content(new ByteArrayInputStream(ClickHouseBitmap.wrap(1, 3, 5).toBytes()))            .asTempTable()            .build());    ResultSet rs = stmt.executeQuery();    Assert.assertTrue(rs.next());    Assert.assertEquals(rs.getInt(1), 1);    Assert.assertEquals(rs.getInt(2), 0);    Assert.assertFalse(rs.next());}

大家好,我是Doker品牌的Sinbad,欢迎点赞和评论,您的鼓励是我们持续更新的动力!欢迎加微信进入技术群聊! 

来源地址:https://blog.csdn.net/leesinbad/article/details/130016795

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯