过滤器
我们我们来看几个常用的过滤器:
过滤器 | 说明 |
---|---|
RowFilter | 筛选指定的RowKey |
FilterList | 组合其他过滤器 |
ValueFilter | 筛选指定值的数据 |
PrefixFilter | 筛选有指定前缀的RowKey |
QualifierFilter | 筛选指定列名的列 |
ColumnPrefixFilter | 筛选指定前缀的列 |
SingleColumnValueFilter | 筛选指定列的指定值 |
HBase提供了很多过滤器,但是常用的就几个,最常用的就是:
- 通过RowKey过滤的RowFilter过滤器,
- 通过RowKey前缀过滤的PrefixFilter过滤器
- 通过指定列的指定值过滤的SingleColumnValueFilter过滤器
- FilterList组合其他过滤器的过滤器
如上图所示,是HBase提供的一下过滤器。
列名过滤器
如上图所示,列名过滤器QualifierFilter,可以过滤所有列簇中指定列名的列,图中获取的是所有列名为value的列。
列名前缀过滤器
如上图所示,列名前缀过滤器ColumnPrefixFilter,可以过滤所有列簇中指定列名前缀的列,图中获取的是所有列名为前缀为level的列。
列簇过滤器
如上图所示,列簇过滤器FamilyFilter,可以过滤指定列簇的数据,图中获取的是所有列簇为addr中的所有列数据。
Row Key过滤器
如上图所示,是Row Key过滤器RowFilter,可以过滤指定RowKey的数据,图中获取的是RowKey为20200107161557行中所有列簇中所有列的数据。
Row Key前缀过滤器
如上图所示,是Row Key前缀过滤器PrefixFilter,可以过滤RowKey为指定前缀的数据,图中获取的指定RowKey前缀为2020的行中所有列簇中所有列的数据。
值过滤器
值过滤器主要有3个:
- ValueFilter
- SingleColumnValueFilter
- SingleColumnValueExcludeFilter
他们的区别是SingleColumnValueFilter过滤器过滤的是指定列的值,ValueFilter过滤的是任意列的值,SingleColumnValueExcludeFilter是排除指定列是指定值的数据。
过滤器组合
FilterList不是一个过滤器,但是它可以组合其他过滤器。
比较符号
比较肯定就用比较符号:
枚举 | 含义 |
---|---|
LESS | 小于 |
EQUAL | 相等 |
NO_OP | 排除所有 |
GREATER | 大于 |
NOT_EQUAL | 不等于 |
LESS_OR_EQUAL | 小于等于 |
GREATER_OR_EQUAL | 大于等于 |
对应的Java类org.apache.hadoop.hbase.CompareOperator
比较方式
枚举 | 说明 |
---|---|
BitComparator | 按位匹配 |
NullComparator | 匹配空 |
BinaryComparator | 匹配完整字节数组 |
SubstringComparator | 子串匹配 |
RegexStringComparator | 正则表达式匹配,只支持EQUAL、NOT_EQUAL |
BinaryPrefixComparator | 匹配字节数组前缀 |
比较方式使用的比较多的是子串匹配SubstringComparator和匹配字节数组前缀BinaryPrefixComparator。
注意:HBase中存的是字节数组,不是比较值
要比较值,hbase-common包给了2个值比较的类LongComparator和BigDecimalComparator,如果要比较Integer、double等其他类型需要自定义Comparator,关于HBase自定义Comparator,可以参考:HBase自定义Comparator过滤数值
常用过滤器示例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.BinaryPrefixComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
public class HBaseFilterTest {
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "127.0.0.1:2181";
private static final String TABLE_NAME_STR = "user";
private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
private static final Configuration configuration = HBaseConfiguration.create();
private static Connection connection;
private static Admin admin;
private static Table table;
static {
configuration.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
table = connection.getTable(TABLE_NAME);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Test
public void init() throws Exception {
if(admin.tableExists(TABLE_NAME)) {
admin.disableTable(TABLE_NAME);//删除前必须先禁用
admin.deleteTable(TABLE_NAME);//删除表
}
createTable();
puts();
}
private void createTable() throws Exception {
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
ColumnFamilyDescriptor profile = ColumnFamilyDescriptorBuilder.of("profile");
ColumnFamilyDescriptor consume = ColumnFamilyDescriptorBuilder.of("consume");
tableDescriptorBuilder.setColumnFamily(profile);
tableDescriptorBuilder.setColumnFamily(consume);
TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
admin.createTable(tableDescriptor);//创建表
}
private void puts() throws Exception {
Table table = connection.getTable(TABLE_NAME);
table.put(getPuts());
}
private static List getPuts(){
LinkedList puts = new LinkedList<>();
LocalDateTime localDate = LocalDateTime.now();
DateTimeFormatter pattern = DateTimeFormatter.ofPattern("yyyy-MM-dd");
Random random = new Random();
byte[] profileFamily = Bytes.toBytes("profile");
byte[] sexesColumn = Bytes.toBytes("sex");
byte[] birthdayColumn = Bytes.toBytes("birthday");
byte[] consumeFamily = Bytes.toBytes("consume");
byte[] totalColumn = Bytes.toBytes("total");
for(int i=0;i<100;i++) {
byte[] rowKey = Bytes.toBytes(String.format("%03d",i));
Put put = new Put(rowKey);
int sex = random.nextInt(2);
String birthday = localDate.plusDays(random.nextInt(100)).format(pattern);
int total = random.nextInt(1000) + 1000;
put.addColumn(profileFamily, sexesColumn, Bytes.toBytes(sex));
put.addColumn(profileFamily, birthdayColumn, Bytes.toBytes(birthday));
put.addColumn(consumeFamily, totalColumn, Bytes.toBytes(total));
puts.add(put);
}
return puts;
}
@Test
public void rowKeyPrefixFilter() throws IOException {
Scan scan = new Scan();
PrefixFilter prefixFilter = new PrefixFilter("05".getBytes());
scan.setFilter(prefixFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
List cells = result.listCells();
for(Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void rowFilter() throws IOException {
Scan scan = new Scan();
Filter rowFilter = new RowFilter(CompareOperator.GREATER, new BinaryComparator("050".getBytes()));
scan.setFilter(rowFilter);
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
List cells = result.listCells();
for (Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void singleColumnValueFilter() throws IOException {
Scan scan = new Scan();
// BinaryComparator comparator = new BinaryComparator(Bytes.toBytes(0));
// BitComparator comparator = new BitComparator(Bytes.toBytes(1), BitComparator.BitwiseOp.XOR);
BinaryPrefixComparator comparator = new BinaryPrefixComparator(Bytes.toBytes(0));
byte[] family = "profile".getBytes();
byte[] column = "sex".getBytes();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
family,
column,
CompareOperator.EQUAL,
// comparator);
Bytes.toBytes(0));
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);
ResultScanner resultScanner = table.getScanner(scan);
printResultScanner(resultScanner);
}
@Test
public void filterListTest() throws IOException {
// FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);//满足一个条件
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);//满足所有条件
byte[] family = "profile".getBytes();
byte[] column = "sex".getBytes();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
family,
column,
CompareOperator.EQUAL,
Bytes.toBytes(0));
filterList.addFilter(singleColumnValueFilter);
byte[] consumeFamily = Bytes.toBytes("consume");
byte[] totalColumn = Bytes.toBytes("total");
singleColumnValueFilter = new SingleColumnValueFilter(
consumeFamily,
totalColumn,
CompareOperator.GREATER_OR_EQUAL,
Bytes.toBytes(1100));
filterList.addFilter(singleColumnValueFilter);
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner resultScanner = table.getScanner(scan);
printResultScanner(resultScanner);
}
@Test
public void testGet() throws IOException {
LinkedList gets = new LinkedList<>();
Get get = new Get(Bytes.toBytes("050"));
gets.add(get);
get = new Get(Bytes.toBytes("051"));
gets.add(get);
Result[] results = table.get(gets);
printResult(results);
}
@Test
public void testGetFilter() throws IOException {
LinkedList gets = new LinkedList<>();
Get get = new Get(Bytes.toBytes("050"));
gets.add(get);
get = new Get(Bytes.toBytes("051"));
byte[] family = "profile".getBytes();
byte[] column = "sex".getBytes();
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(family, column, CompareOperator.EQUAL, Bytes.toBytes(0));
get.setFilter(singleColumnValueFilter);
gets.add(get);
Result[] results = table.get(gets);
printResult(results);
}
@Test
public void printAll() throws IOException {
Scan s = new Scan();
ResultScanner resultScanner = table.getScanner(s);
printResultScanner(resultScanner);
}
private static void printResultScanner(ResultScanner resultScanner){
byte[] profileFamily = Bytes.toBytes("profile");
byte[] sexesColumn = Bytes.toBytes("sex");
byte[] birthdayColumn = Bytes.toBytes("birthday");
byte[] consumeFamily = Bytes.toBytes("consume");
byte[] totalColumn = Bytes.toBytes("total");
for (Result result : resultScanner) {
byte[] sex = result.getValue(profileFamily, sexesColumn);
byte[] birthday = result.getValue(profileFamily, birthdayColumn);
byte[] total = result.getValue(consumeFamily, totalColumn);
System.out.println(String.format("rowkey:%s,sex:%d,birthday:%s,total:%d",
Bytes.toString(result.getRow()),Bytes.toInt(sex),Bytes.toString(birthday),Bytes.toInt(total)
));
}
}
private static void printResult(Result[] datas){
byte[] profileFamily = Bytes.toBytes("profile");
byte[] sexesColumn = Bytes.toBytes("sex");
byte[] birthdayColumn = Bytes.toBytes("birthday");
byte[] consumeFamily = Bytes.toBytes("consume");
byte[] totalColumn = Bytes.toBytes("total");
for (Result result : datas) {
byte[] sex = result.getValue(profileFamily, sexesColumn);
System.out.println(sex);
byte[] birthday = result.getValue(profileFamily, birthdayColumn);
System.out.println(birthday);
byte[] total = result.getValue(consumeFamily, totalColumn);
System.out.println(total);
byte[] row = result.getRow();
System.out.println(row);
if(row == null){
continue;
}
System.out.println(String.format("rowkey:%s,sex:%d,birthday:%s,total:%d",
Bytes.toString(row),Bytes.toInt(sex),Bytes.toString(birthday),Bytes.toInt(total)
));
}
}
}
| |
另一些测试示例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
public class HBaseFilterTest {
private static final String ZK_CONNECT_KEY = "hbase.zookeeper.quorum";
private static final String ZK_CONNECT_VALUE = "127.0.0.1:2181";
private static final String TABLE_NAME_STR = "weather";
private static final TableName TABLE_NAME = TableName.valueOf(TABLE_NAME_STR);
private static final Configuration configuration = HBaseConfiguration.create();
private static Connection connection;
private static Admin admin;
private static Table table;
static {
configuration.set(ZK_CONNECT_KEY, ZK_CONNECT_VALUE);
try {
connection = ConnectionFactory.createConnection(configuration);
admin = connection.getAdmin();
table = connection.getTable(TABLE_NAME);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Test
public void init() throws Exception {
createTable();
puts();
}
private void createTable() throws Exception {
TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
ColumnFamilyDescriptor addr = ColumnFamilyDescriptorBuilder.of("addr");// province city
ColumnFamilyDescriptor temperature = ColumnFamilyDescriptorBuilder.of("temperature");// level value
ColumnFamilyDescriptor windForce = ColumnFamilyDescriptorBuilder.of("wind_force");// level value
ColumnFamilyDescriptor pm25 = ColumnFamilyDescriptorBuilder.of("pm25");// level value
tableDescriptorBuilder.setColumnFamily(addr);
tableDescriptorBuilder.setColumnFamily(temperature);
tableDescriptorBuilder.setColumnFamily(windForce);
tableDescriptorBuilder.setColumnFamily(pm25);
TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
admin.createTable(tableDescriptor);
}
private void puts() throws Exception {
Table table = connection.getTable(TABLE_NAME);
table.put(getPuts());
}
private static List getPuts(){
LinkedList puts = new LinkedList<>();
LocalDateTime localDate = LocalDateTime.now();
DateTimeFormatter pattern = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
Random random = new Random();
String[] provinces = {"SC","XJ"};
String[] cities = {"CD","BJ","SH","CQ"};
int level;
String value;
for(int i=0;i<=60;i++) {
localDate = localDate.plusDays(i);
byte[] rowKey = Bytes.toBytes(localDate.format(pattern));
Put put = new Put(rowKey);
String province = provinces[random.nextInt(provinces.length)];
String city = cities[random.nextInt(cities.length)];
put.addColumn(Bytes.toBytes("addr"), Bytes.toBytes("province"), Bytes.toBytes(province));
put.addColumn(Bytes.toBytes("addr"), Bytes.toBytes("city"), Bytes.toBytes(city));
level = random.nextInt(100);
value = "temperature_" + level;
put.addColumn(Bytes.toBytes("temperature"), Bytes.toBytes("level_temperature"), Bytes.toBytes(level));
put.addColumn(Bytes.toBytes("temperature"), Bytes.toBytes("value"), Bytes.toBytes(value));
level = random.nextInt(100);
value = "wind_force_" + level;
put.addColumn(Bytes.toBytes("wind_force"), Bytes.toBytes("level_wind_force"), Bytes.toBytes(level));
put.addColumn(Bytes.toBytes("wind_force"), Bytes.toBytes("value"), Bytes.toBytes(value));
level = random.nextInt(100);
value = "pm25" + level;
put.addColumn(Bytes.toBytes("pm25"), Bytes.toBytes("level_pm25"), Bytes.toBytes(level));
put.addColumn(Bytes.toBytes("pm25"), Bytes.toBytes("value"), Bytes.toBytes(value));
puts.add(put);
}
return puts;
}
@Test
public void qualifierFilter() throws Exception {
Scan scan = new Scan();
BinaryComparator qualifierComparator = new BinaryComparator("value".getBytes());
Filter qualifierFilter = new QualifierFilter(CompareOperator.EQUAL, qualifierComparator);
scan.setFilter(qualifierFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
List cells = result.listCells();
for(Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void columnPrefixFilter() throws IOException {
Scan scan = new Scan();
ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter("level".getBytes());
scan.setFilter(columnPrefixFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
List cells = result.listCells();
for(Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void familyFilter() throws IOException {
Scan scan = new Scan();
Filter familyFilter = new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator("addr".getBytes()));
scan.setFilter(familyFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
List cells = result.listCells();
for(Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void rowKeyPrefixFilter() throws IOException {
Scan scan = new Scan();
PrefixFilter prefixFilter = new PrefixFilter("2020".getBytes());
scan.setFilter(prefixFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
List cells = result.listCells();
for(Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void rowFilter() throws IOException {
Scan scan = new Scan();
Filter rowFilter = new RowFilter(CompareOperator.GREATER, new BinaryComparator("20200107161557".getBytes()));
scan.setFilter(rowFilter);
ResultScanner resultScanner = table.getScanner(scan);
for (Result result : resultScanner) {
List cells = result.listCells();
for (Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void singleColumnValueExcludeFilter() throws IOException {
Scan scan = new Scan();
BinaryComparator binaryComparator = new BinaryComparator(Bytes.toBytes("SC"));
SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(
"addr".getBytes(),//family
"province".getBytes(),//column
CompareOperator.EQUAL,
binaryComparator);
singleColumnValueExcludeFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueExcludeFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
List cells = result.listCells();
for (Cell cell : cells) {
System.out.println(cell);
}
}
}
@Test
public void singleColumnValueFilter() throws IOException {
Scan scan = new Scan();
SubstringComparator substringComparator = new SubstringComparator("XJ");
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"addr".getBytes(),
"province".getBytes(),
CompareOperator.EQUAL,
substringComparator);
singleColumnValueFilter.setFilterIfMissing(true);
scan.setFilter(singleColumnValueFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("addr"),Bytes.toBytes("province"))));
}
}
@Test
public void valueFilter() throws IOException {
Scan scan = new Scan();
Filter valueFilter = new ValueFilter(CompareOperator.EQUAL, new SubstringComparator("50"));
scan.setFilter(valueFilter);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("temperature"),Bytes.toBytes("value"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("wind_force"),Bytes.toBytes("value"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("pm25"),Bytes.toBytes("value"))));
}
}
@Test
public void filterListTest() throws IOException {
// FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);//满足一个条件
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);//满足所有条件
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
"addr".getBytes(),
"province".getBytes(),
CompareOperator.EQUAL,
Bytes.toBytes("SC"));
filterList.addFilter(singleColumnValueFilter);
singleColumnValueFilter = new SingleColumnValueFilter(
"addr".getBytes(),
"city".getBytes(),
CompareOperator.EQUAL,
Bytes.toBytes("SH"));
filterList.addFilter(singleColumnValueFilter);
Scan scan = new Scan();
scan.setFilter(filterList);
ResultScanner resultScanner = table.getScanner(scan);
for(Result result : resultScanner) {
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("addr"),Bytes.toBytes("province"))));
System.out.println(Bytes.toString(result.getValue(Bytes.toBytes("addr"),Bytes.toBytes("city"))));
}
}
}
| | | | | |