一、开发环境
- OpenJDK版本 >= 17
- ClickHouse:20.7+
1、支持的数据类型
Format | Support | Comment |
---|---|---|
AggregatedFunction | ❌ | limited to groupBitmap , and known to have issue with 64bit bitmap |
Array(*) | ✅ | |
Bool | ✅ | |
Date* | ✅ | |
DateTime* | ✅ | |
Decimal* | ✅ | SET output_format_decimal_trailing_zeros=1 in 21.9+ for consistency |
Enum* | ✅ | can be treated as both string and integer |
Geo Types | ✅ | Point, Ring, Polygon, and MultiPolygon |
Int*, UInt* | ✅ | UInt64 is mapped to long |
IPv* | ✅ | |
Map(*) | ✅ | |
Nested(*) | ✅ | |
Object('JSON') | ✅ | |
SimpleAggregateFunction | ✅ | |
*String | ✅ | |
Tuple(*) | ✅ | |
UUID | ✅ |
二、Java客户端方式
1、引入依赖
com.clickhouse clickhouse-http-client 0.4.0
2、连接ClickHouse
连接字符串: protocol://host[:port][/database][?param[=value][¶m[=value]][#tag[,tag]]
示例:
http://localhost:8443?ssl=true&sslmode=NONE
https://explorer@play.clickhouse.com:443
tcp://localhost?!auto_discovery#experimental),(grpc://localhost#experimental)?failover=3#test
ClickHouseNode server = ClickHouseNode.builder() .host(System.getProperty("chHost", "localhost")) .port(ClickHouseProtocol.HTTP, Integer.getInteger("chPort", 8123)) // .port(ClickHouseProtocol.GRPC, Integer.getInteger("chPort", 9000)) // .port(ClickHouseProtocol.TCP, Integer.getInteger("chPort", 9100)) .database("system").credentials(ClickHouseCredentials.fromUserAndPassword( System.getProperty("chUser", "default"), System.getProperty("chPassword", ""))) .build(); String table = "java_client_example_table"; try { dropAndCreateTable(server, table); System.out.println("Insert: " + insert(server, table)); System.out.println("Query: " + query(server, table)); } catch (ClickHouseException e) { e.printStackTrace(); }
3、查询
static int query(ClickHouseNode server, String table) throws ClickHouseException { try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol()); ClickHouseResponse response = client.read(server) // prefer to use RowBinaryWithNamesAndTypes as it's fully supported // see details at https://github.com/ClickHouse/clickhouse-java/issues/928 .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query("select * from " + table).execute().get()) { int count = 0; // or use stream API via response.stream() for (ClickHouseRecord r : response.records()) { count++; } return count; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw ClickHouseException.forCancellation(e, server); } catch (ExecutionException e) { throw ClickHouseException.of(e, server); } }
4、流式查询
ClickHouseResponse response = client.connect(endpoint) // or client.connect(endpoints) // you'll have to parse response manually if using a different format .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) .query("select * from numbers(:limit)") .params(1000).executeAndWait()) { for (ClickHouseRecord r : response.records()) { int num = r.getValue(0).asInteger(); // type conversion String str = r.getValue(0).asString(); LocalDate date = r.getValue(0).asDate(); }
5、Insert
static long insert(ClickHouseNode server, String table) throws ClickHouseException { try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { ClickHouseRequest.Mutation request = client.connect(server).write().table(table) .format(ClickHouseFormat.RowBinary); ClickHouseConfig config = request.getConfig(); CompletableFuture future; // back-pressuring is not supported, you can adjust the first two arguments try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() .createPipedOutputStream(config, (Runnable) null)) { // in async mode, which is default, execution happens in a worker thread future = request.data(stream.getInputStream()).execute(); // writing happens in main thread for (int i = 0; i < 10_000; i++) { BinaryStreamUtils.writeString(stream, String.valueOf(i % 16)); BinaryStreamUtils.writeNonNull(stream); BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString()); } } // response should be always closed try (ClickHouseResponse response = future.get()) { ClickHouseResponseSummary summary = response.getSummary(); return summary.getWrittenRows(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw ClickHouseException.forCancellation(e, server); } catch (ExecutionException | IOException e) { throw ClickHouseException.of(e, server); } }
6、多语句操作
在同一会话中一个接一个地在工作线程中执行多个语句:
CompletableFuture> future = ClickHouseClient.send(servers.get(), "create database if not exists my_base", "use my_base", "create table if not exists test_table(s String) engine=Memory", "insert into test_table values('1')('2')('3')", "select * from test_table limit 1", "truncate table test_table", "drop table if exists test_table");// block current thread until queries completed, and then retrieve summariesList results = future.get();
7、表操作
static void dropAndCreateTable(ClickHouseNode server, String table) throws ClickHouseException { try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) { ClickHouseRequest> request = client.connect(server); // or use future chaining request.query("drop table if exists " + table).execute().get(); request.query("create table " + table + "(a String, b Nullable(String)) engine=MergeTree() order by a") .execute().get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw ClickHouseException.forCancellation(e, server); } catch (ExecutionException e) { throw ClickHouseException.of(e, server); } }
三、JDBC Driver方式
clickhouse jdbc实现了标准jdbc接口。它构建在clickhouse客户端之上,提供了自定义类型映射、事务支持、标准同步UPDATE和DELETE语句等附加功能,因此可以轻松地与遗留应用程序和工具一起使用。
clickhouse jdbc API是同步的,通常会有更多的开销(例如SQL解析和类型映射/转换等)。当性能至关重要时,或者如果您喜欢更直接的访问clickhouse的方式,请考虑使用clickhouse客户端。
1、引入依赖
com.clickhouse clickhouse-jdbc 0.4.0 all * *
2、配置
驱动程序类:com.clickhouse.jdbc.ClickHouseDriver
连接字符串:例如:jdbc:(ch|clickhouse)[:
jdbc:ch://localhost
与jdbc:clickhouse:http://localhost:8123
jdbc:ch:https://localhost
与jdbc:clickhouse:http://localhost:8443?ssl=true&sslmode=STRICT
jdbc:ch:grpc://localhost
与jdbc:clickhouse:grpc://localhost:9100
连接属性:
属性 | 违约 | 描述 |
---|---|---|
continueBatchOnError | false | 发生错误时是否继续批处理 |
createDatabaseIfNotExist | false | 如果数据库不存在,是否创建数据库 |
custom_http_headers | 逗号分隔的自定义 HTTP 标头,例如:User-Agent=client1,X-Gateway-Id=123 | |
custom_http_params | 逗号分隔的自定义 HTTP 标头,例如:User-Agent=client1,X-Gateway-Id=123 | |
nullAsDefault | 0 | 0-将null值按原样处理,并在将null插入不可为null的列时引发异常;1-按原样处理null值,并禁用插入时的null检查;2-将null替换为查询和插入的相应数据类型的默认值 |
jdbcCompliance | true | 是否支持标准同步 UPDATE/DELETE 和fake transaction |
typeMappings | 自定义ClickHouse数据类型和Java类之间的映射,这将影响getColumnType()和getObject(class<?>)的结果。例如:UInt128=java.lang.String,UInt256=java.lang.String | |
wrapperObject | false | getObject()是否应为Array/Tuple返回java.sql.Array/java.sql.Struct。 |
3、连接到ClickHouse
String url = "jdbc:ch://my-server/system"; // use http protocol and port 8123 by default// String url = "jdbc:ch://my-server:8443/system?ssl=true&sslmode=strict&&sslrootcert=/mine.crt";Properties properties = new Properties();// properties.setProperty("ssl", "true");// properties.setProperty("sslmode", "NONE"); // NONE to trust all servers; STRICT for trusted onlyClickHouseDataSource dataSource = new ClickHouseDataSource(url, new Properties());try (Connection conn = dataSource.getConnection("default", "password"); Statement stmt = conn.createStatement()) {}
4、Query
try (Connection conn = dataSource.getConnection(...); Statement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery("select * from numbers(50000)"); while(rs.next()) { // ... }}
5、Insert
使用 input table函数
具有最佳性能的推荐方式
try (PreparedStatement ps = conn.prepareStatement( "insert into mytable select col1, col2 from input('col1 String, col2 DateTime64(3), col3 Int32')")) { // the column definition will be parsed so the driver knows there are 3 parameters: col1, col2 and col3 ps.setString(1, "test"); // col1 ps.setObject(2, LocalDateTime.now()); // col2, setTimestamp is slow and not recommended ps.setInt(3, 123); // col3 ps.addBatch(); // parameters will be write into buffered stream immediately in binary format ... ps.executeBatch(); // stream everything on-hand into ClickHouse}
Insert
它更易于使用,但与输入函数相比,性能更慢
try (PreparedStatement ps = conn.prepareStatement("insert into mytable(* except (description))")) { // the driver will issue query "select * except (description) from mytable where 0" for type inferring // since description column is excluded, we know there are only two parameters: col1 and col2 ps.setString(1, "test"); // id ps.setObject(2, LocalDateTime.now()); // timestamp ps.addBatch(); // parameters will be write into buffered stream immediately in binary format ... ps.executeBatch(); // stream everything on-hand into ClickHouse}
使用占位符新增
不推荐,因为它基于大型 SQL
// Note: "insert into mytable values(?,?,?)" is treated as "insert into mytable"try (PreparedStatement ps = conn.prepareStatement("insert into mytable values(trim(?),?,?)")) { ps.setString(1, "test"); // id ps.setObject(2, LocalDateTime.now()); // timestamp ps.setString(3, null); // description ps.addBatch(); // append parameters to the query ... ps.executeBatch(); // issue the composed query: insert into mytable values(...)(...)...(...)}
6、高级接口
处理日期时间和时区
请使用java.time.LocalDateTime或java.time.OffsetDateTime代替java.sql.Timestamp,并使用java.time_LocalDate代替java.sql.Date。
try (PreparedStatement ps = conn.prepareStatement("select date_time from mytable where date_time > ?")) { ps.setObject(2, LocalDateTime.now()); ResultSet rs = ps.executeQuery(); while(rs.next()) { LocalDateTime dateTime = (LocalDateTime) rs.getObject(1); } ...}
7、处理聚合函数
// batch insert using input functiontry (ClickHouseConnection conn = newConnection(props); Statement s = conn.createStatement(); PreparedStatement stmt = conn.prepareStatement( "insert into test_batch_input select id, name, value from input('id Int32, name Nullable(String), desc Nullable(String), value AggregateFunction(groupBitmap, UInt32)')")) { s.execute("drop table if exists test_batch_input;" + "create table test_batch_input(id Int32, name Nullable(String), value AggregateFunction(groupBitmap, UInt32))engine=Memory"); Object[][] objs = new Object[][] { new Object[] { 1, "a", "aaaaa", ClickHouseBitmap.wrap(1, 2, 3, 4, 5) }, new Object[] { 2, "b", null, ClickHouseBitmap.wrap(6, 7, 8, 9, 10) }, new Object[] { 3, null, "33333", ClickHouseBitmap.wrap(11, 12, 13) } }; for (Object[] v : objs) { stmt.setInt(1, (int) v[0]); stmt.setString(2, (String) v[1]); stmt.setString(3, (String) v[2]); stmt.setObject(4, v[3]); stmt.addBatch(); } int[] results = stmt.executeBatch(); ...}// use bitmap as query parametertry (PreparedStatement stmt = conn.prepareStatement( "SELECT bitmapContains(my_bitmap, toUInt32(1)) as v1, bitmapContains(my_bitmap, toUInt32(2)) as v2 from {tt 'ext_table'}")) { stmt.setObject(1, ClickHouseExternalTable.builder().name("ext_table") .columns("my_bitmap AggregateFunction(groupBitmap,UInt32)").format(ClickHouseFormat.RowBinary) .content(new ByteArrayInputStream(ClickHouseBitmap.wrap(1, 3, 5).toBytes())) .asTempTable() .build()); ResultSet rs = stmt.executeQuery(); Assert.assertTrue(rs.next()); Assert.assertEquals(rs.getInt(1), 1); Assert.assertEquals(rs.getInt(2), 0); Assert.assertFalse(rs.next());}
大家好,我是Doker品牌的Sinbad,欢迎点赞和评论,您的鼓励是我们持续更新的动力!欢迎加微信进入技术群聊!
来源地址:https://blog.csdn.net/leesinbad/article/details/130016795