文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

自定义SQL实现PostgreSQL安全审计

2024-12-02 04:38

关注

数据审计是一个跟踪表内容随时间变化的系统,在现在安全合规方面数据审计是必须要的功能之一。PostgreSQL作为一个强大现代的开源关系数据库,也有一个相关插件PGAudit可以提供审计功能。

关于PGAudit插件以后有机会可以详细介绍,本文我们介绍一个简单SQL语句实现的数据集审计功能。

概述

最终实现效果为:

创建一个示例表:

create extension supa_audit cascade;
create table public.account(
id int primary key,
name text not null
);

启用审计:

select audit.enable_tracking('public.account'::regclass);

增改删操作:

insert into public.account(id, name)
values (1, 'Chongchong');
update public.account set name = 'CC' where id = 1;
delete from public.account where id = 1;

清空表:

truncate table public.account;

查看审计日志:

select * from audit.record_history

请注意,record_id和old_record_id在更新行时保持不变,这样就可以轻松查询单行的历史记录。

要关闭审计追踪,只需执行:

select audit.disable_tracking('public.account'::regclass);

实现

首先创建一个名为audit schema为审计用:

create schema if not exists audit;

记录存储

接下来,需要一个表来跟踪插入、更新和删除。

传统上,使用audit schema并附加了一些元数据列,如提交的时间戳。

该解决方案存在一些维护挑战:

为此使用PostgreSQL的无模式JSONB数据类型将每条记录的数据存储在单个列中的。这种方法的另一个好处是允许将多个表的审计历史存储在一个审计表中。

create table audit.record_version(
id bigserial primary key,
record_id uuid,
old_record_id uuid,
op varchar(8) not null,
ts timestamptz not null default now(),
table_oid oid not null,
table_schema name not null,
table_name name not null,
record jsonb,
old_record jsonb
);

查询和索引

查询性能很重要,如果不能快速查询日志,则该审计日志没有多大实际意义。为了提高查询的性能,需要对最常用的查询涉及字段创建索引。

时间范围内查询

对于时间范围,需要一个索引ts。 由于审计表仅用于插入记录,其中ts列插入操作时间,其值ts自然是升序排列。PostgreSQL的内置BRIN索引可以利用值和物理位置之间的相关性来生成一个索引,该索引在规模上比默认值(BTREE索引)小数百倍,并且查找时间更快。

create index record_version_ts
on audit.record_version
using brin(ts);

对于表查询,包含了一个 table_oid跟踪PostgreSQL内部数字表标识符的列。可以为该列添加索引而不是table_schema和 able_name列,最小化索引大小并提供更好的性能。

create index record_version_table_oid
on audit.record_version
using btree(table_oid);

记录唯一标识

将每一行的数据存储为的缺点之一jsonb是基于列值的过滤变得非常低效。如果想快速查找一行的历史记录,需要为每一行提取和索引一个唯一标识符。

对于全局唯一标识符,使用以下结构:

[table_oid, primary_key_value_1, primary_key_value_2, ...]

并将该数组散列为UUID v5以获得有效的可索引UUID类型,以识别对数据更改具有鲁棒性的行。

使用一个实用函数来查找记录的主键列名:

create or replace function audit.primary_key_columns(entity_oid oid)
returns text[]
stable
security definer
language sql
as $$
-- Looks up the names of a table's primary key columns
select
coalesce(
array_agg(pa.attname::text order by pa.attnum),
array[]::text[]
) column_names
from
pg_index pi
join pg_attribute pa
on pi.indrelid = pa.attrelid
and pa.attnum = any(pi.indkey)
where
indrelid = $1
and indisprimary
$$;

另一个为table_oid和主键,将结果转换为记录的UUID。

create or replace function audit.to_record_id(
entity_oid oid,
pkey_cols text[],
rec jsonb
)
returns uuid
stable
language sql
as $$
select
case
when rec is null then null
-- if no primary key exists, use a random uuid
when pkey_cols = array[]::text[] then uuid_generate_v4()
else (
select
uuid_generate_v5(
'fd62bc3d-8d6e-43c2-919c-802ba3762271',
(
jsonb_build_array(to_jsonb($1))
|| jsonb_agg($3 ->> key_)
)::text
)
from
unnest($2) x(key_)
)
end
$$;

最后,索引record_id和old_record_id包含这些用于快速查询的唯一标识符的列。

create index record_version_record_id
on audit.record_version(record_id)
where record_id is not null;
create index record_version_old_record_id
on audit.record_version(record_id)
where old_record_id is not null;

触发器

为了让审计功能真正起作用,需要在最终用户不对其事务进行任何更改的情况下插入记录给审计表。为此,设置一个触发器在数据更改时触发,为每个插入/更新/删除的行为触发一次触发器。

create or replace function audit.insert_update_delete_trigger()
returns trigger
security definer
language plpgsql
as $$
declare
pkey_cols text[] = audit.primary_key_columns(TG_RELID);
record_jsonb jsonb = to_jsonb(new);
record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, record_jsonb);
old_record_jsonb jsonb = to_jsonb(old);
old_record_id uuid = audit.to_record_id(TG_RELID, pkey_cols, old_record_jsonb);
begin
insert into audit.record_version(
record_id,
old_record_id,
op,
table_oid,
table_schema,
table_name,
record,
old_record
)
select
record_id,
old_record_id,
TG_OP,
TG_RELID,
TG_TABLE_SCHEMA,
TG_TABLE_NAME,
record_jsonb,
old_record_jsonb;
return coalesce(new, old);
end;
$$;

API

将公开的用于对表启用审计的API:

select audit.enable_tracking('.
'::regclass);

禁用跟踪:

select audit.disable_tracking('.
'::regclass);

这些函数根据请求由表注册审计触发器:

create or replace function audit.enable_tracking(regclass)
returns void
volatile
security definer
language plpgsql
as $$
declare
statement_row text = format('
create trigger audit_i_u_d
before insert or update or delete
on %I
for each row
execute procedure audit.insert_update_delete_trigger();',
$1
);
pkey_cols text[] = audit.primary_key_columns($1);
begin
if pkey_cols = array[]::text[] then
raise exception 'Table % can not be audited because it has no primary key', $1;
end if;
if not exists(select 1 from pg_trigger where tgrelid = $1 and tgname = 'audit_i_u_d') then
execute statement_row;
end if;
end;
$$;
create or replace function audit.disable_tracking(regclass)
returns void
volatile
security definer
language plpgsql
as $$
declare
statement_row text = format(
'drop trigger if exists audit_i_u_d on %I;',
$1
);
begin
execute statement_row;
end;
$$;

性能开销

开启审计表后会降低插入、更新和删除的吞吐量。但是在吞吐量低于每秒1000次写入的情况下,其开销通常可以忽略不计。对于写入频率较高的表,建议使用pgAudit。

总结

通过简单纯sql语句就实现了Postgresql数据库的安全审计,总体上算起来实现才150行sql语句。大家可以自己手动尝试一下,主要是搞清楚其原理,如果生产环境中有需求还是建议用pgAudit。

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 资料下载
  • 历年真题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
咦!没有更多了?去看看其它编程学习网 内容吧