本篇内容介绍了“PostgreSQL聚合函数的实现方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
一、数据结构
AggState
聚合函数执行时状态结构体,内含AggStatePerAgg等结构体
//在nodeAgg.c中私有的结构体
typedef struct AggStatePerAggData *AggStatePerAgg;
typedef struct AggStatePerTransData *AggStatePerTrans;
typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
typedef struct AggState
{
//第一个字段是NodeTag(继承自ScanState)
ScanState ss;
//targetlist和quals中所有的Aggref
List *aggs;
//链表的大小(可以为0)
int numaggs;
//pertrans条目大小
int numtrans;
//Agg策略模式
AggStrategy aggstrategy;
//agg-splitting模式,参见nodes.h
AggSplit aggsplit;
//指向当前步骤数据的指针
AggStatePerPhase phase;
//步骤数(包括0)
int numphases;
//当前步骤
int current_phase;
//per-Aggref信息
AggStatePerAgg peragg;
//per-Trans状态信息
AggStatePerTrans pertrans;
//长生命周期数据的ExprContexts(hashtable)
ExprContext *hashcontext;
////长生命周期数据的ExprContexts(每一个GS使用)
ExprContext **aggcontexts;
//输入表达式的ExprContext
ExprContext *tmpcontext;
#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14
//当前活跃的aggcontext
ExprContext *curaggcontext;
//当前活跃的aggregate(如存在)
AggStatePerAgg curperagg;
#define FIELDNO_AGGSTATE_CURPERTRANS 16
//当前活跃的trans state
AggStatePerTrans curpertrans;
//输入结束?
bool input_done;
//Agg扫描结束?
bool agg_done;
//最后一个grouping set
int projected_set;
#define FIELDNO_AGGSTATE_CURRENT_SET 20
//将要解析的当前grouping set
int current_set;
//当前投影操作的分组列
Bitmapset *grouped_cols;
//倒序的分组列链表
List *all_grouped_cols;
//-------- 下面的列用于grouping set步骤数据
//所有步骤中最大的sets大小
int maxsets;
//所有步骤的数组
AggStatePerPhase phases;
//对于phases > 1,已排序的输入信息
Tuplesortstate *sort_in;
//对于下一个步骤,输入已拷贝
Tuplesortstate *sort_out;
//排序结果的slot
TupleTableSlot *sort_slot;
//------- 下面的列用于AGG_PLAIN和AGG_SORTED模式:
//per-group指针的grouping set编号数组
AggStatePerGroup *pergroups;
//当前组的第一个元组拷贝
HeapTuple grp_firstTuple;
//--------- 下面的列用于AGG_HASHED和AGG_MIXED模式:
//是否已填充hash表?
bool table_filled;
//hash桶数?
int num_hashes;
//相应的哈希表数据数组
AggStatePerHash perhash;
//per-group指针的grouping set编号数组
AggStatePerGroup *hash_pergroup;
//---------- agg输入表达式解析支持
#define FIELDNO_AGGSTATE_ALL_PERGROUPS 34
//首先是->pergroups,然后是hash_pergroup
AggStatePerGroup *all_pergroups;
//投影实现机制
ProjectionInfo *combinedproj;
} AggState;
//nodeag .c支持的基本选项
#define AGGSPLITOP_COMBINE 0x01
#define AGGSPLITOP_SKIPFINAL 0x02
#define AGGSPLITOP_SERIALIZE 0x04
#define AGGSPLITOP_DESERIALIZE 0x08
//支持的操作模式
typedef enum AggSplit
{
//基本 : 非split聚合
AGGSPLIT_SIMPLE = 0,
//部分聚合的初始步骤,序列化
AGGSPLIT_INITIAL_SERIAL = AGGSPLITOP_SKIPFINAL | AGGSPLITOP_SERIALIZE,
//部分聚合的最终步骤,反序列化
AGGSPLIT_FINAL_DESERIAL = AGGSPLITOP_COMBINE | AGGSPLITOP_DESERIALIZE
} AggSplit;
//测试AggSplit选择了哪些基本选项
#define DO_AGGSPLIT_COMBINE(as) (((as) & AGGSPLITOP_COMBINE) != 0)
#define DO_AGGSPLIT_SKIPFINAL(as) (((as) & AGGSPLITOP_SKIPFINAL) != 0)
#define DO_AGGSPLIT_SERIALIZE(as) (((as) & AGGSPLITOP_SERIALIZE) != 0)
#define DO_AGGSPLIT_DESERIALIZE(as) (((as) & AGGSPLITOP_DESERIALIZE) != 0)
二、源码解读
N/A
三、跟踪分析
跟踪分析数据结构中的相关信息.
测试数据:
-- 禁用并行
set max_parallel_workers_per_gather=0;
select bh,avg(c1),min(c1),max(c2) from t_agg_simple group by bh;
aggstate是聚合运算的运行状态.
1536 AggState *node = castNode(AggState, pstate);
(gdb) n
1537 TupleTableSlot *result = NULL;
(gdb) p *node
$1 = {ss = {ps = {type = T_AggState, plan = 0x120ba30, state = 0x12eb428, ExecProcNode = 0x6ee438 <ExecAgg>,
ExecProcNodeReal = 0x6ee438 <ExecAgg>, instrument = 0x0, worker_instrument = 0x0, worker_jit_instrument = 0x0,
qual = 0x0, lefttree = 0x12ebbb0, righttree = 0x0, initPlan = 0x0, subPlan = 0x0, chgParam = 0x0,
ps_ResultTupleSlot = 0x12ec7b0, ps_ExprContext = 0x12ebaf0, ps_ProjInfo = 0x12ec8f0, scandesc = 0x12ebf00},
ss_currentRelation = 0x0, ss_currentScanDesc = 0x0, ss_ScanTupleSlot = 0x12ec458}, aggs = 0x12ece00, numaggs = 3,
numtrans = 3, aggstrategy = AGG_HASHED, aggsplit = AGGSPLIT_SIMPLE, phase = 0x12ecef8, numphases = 1, current_phase = 0,
peragg = 0x13083e0, pertrans = 0x130a3f0, hashcontext = 0x12eba30, aggcontexts = 0x12eb858, tmpcontext = 0x12eb878,
curaggcontext = 0x12eba30, curperagg = 0x0, curpertrans = 0x0, input_done = false, agg_done = false, projected_set = -1,
current_set = 0, grouped_cols = 0x0, all_grouped_cols = 0x12ed090, maxsets = 1, phases = 0x12ecef8, sort_in = 0x0,
sort_out = 0x0, sort_slot = 0x0, pergroups = 0x0, grp_firstTuple = 0x0, table_filled = false, num_hashes = 1,
perhash = 0x12ecf50, hash_pergroup = 0x13085f8, all_pergroups = 0x13085f8, combinedproj = 0x0}
aggstate->phase/phases是当前阶段/所有阶段的信息.
在本例中,由于只有一个阶段:AGG_HASHED,因此两者是一样的.
(gdb) p *node->phase
$2 = {aggstrategy = AGG_HASHED, numsets = 1, gset_lengths = 0x12ecfe8, grouped_cols = 0x12ed008, eqfunctions = 0x0,
aggnode = 0x120ba30, sortnode = 0x0, evaltrans = 0x1315800}
(gdb) p node->phases[0]
$27 = {aggstrategy = AGG_HASHED, numsets = 1, gset_lengths = 0x12ecfe8, grouped_cols = 0x12ed008, eqfunctions = 0x0,
aggnode = 0x120ba30, sortnode = 0x0, evaltrans = 0x1315800}
aggstate->phase->evaltrans是该阶段的表达式解析转换函数.
在本例中,该函数是ExecInterpExpr(通过执行一系列的步骤得到值)
(gdb) p *node->phase->evaltrans
$3 = {tag = {type = T_ExprState}, flags = 6 '\006', resnull = false, resvalue = 0, resultslot = 0x0, steps = 0x1315ac0,
evalfunc = 0x6cd882 <ExecInterpExprStillValid>, expr = 0x12eb640, evalfunc_private = 0x6cb43e <ExecInterpExpr>,
steps_len = 16, steps_alloc = 16, parent = 0x12eb640, ext_params = 0x0, innermost_caseval = 0x0,
innermost_casenull = 0x0, innermost_domainval = 0x0, innermost_domainnull = 0x0}
aggstate->phase->evaltrans是该阶段的聚合节点,即T_AGG.
(gdb) p *node->phase->aggnode
$9 = {plan = {type = T_Agg, startup_cost = 24.800000000000001, total_cost = 27.300000000000001, plan_rows = 200,
plan_width = 98, parallel_aware = false, parallel_safe = false, plan_node_id = 0, targetlist = 0x12fbf10, qual = 0x0,
lefttree = 0x12fb9d0, righttree = 0x0, initPlan = 0x0, extParam = 0x0, allParam = 0x0}, aggstrategy = AGG_HASHED,
aggsplit = AGGSPLIT_SIMPLE, numCols = 1, grpColIdx = 0x12fbcc0, grpOperators = 0x12fbca0, numGroups = 200,
aggParams = 0x0, groupingSets = 0x0, chain = 0x0}
aggstate->peragg存储的是per-Aggref信息,亦即每一个聚合对应一个,每一个peragg->aggref对应一个聚合信息.
第1个是max:
(gdb) p *node->peragg
$10 = {aggref = 0x12fc458, transno = 0, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p *node->peragg->aggref
$11 = {xpr = {type = T_Aggref}, aggfnoid = 2116, aggtype = 23, aggcollid = 0, inputcollid = 0, aggtranstype = 23,
aggargtypes = 0x12fc518, aggdirectargs = 0x0, args = 0x12fc628, aggorder = 0x0, aggdistinct = 0x0, aggfilter = 0x0,
aggstar = false, aggvariadic = false, aggkind = 110 'n', agglevelsup = 0, aggsplit = AGGSPLIT_SIMPLE, location = 26}
########
testdb=# select oid,proname from pg_proc where oid in (2116,768);
oid | proname
------+------------
768 | int4larger
2116 | max
(2 rows)
########
第2个是min:
(gdb) p node->peragg[1]
$38 = {aggref = 0x12fc1d0, transno = 1, finalfn_oid = 0, finalfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = 4, resulttypeByVal = true, shareable = true}
(gdb) p *node->peragg[1]->aggref
$39 = {xpr = {type = T_Aggref}, aggfnoid = 2132, aggtype = 23, aggcollid = 0, inputcollid = 0, aggtranstype = 23,
aggargtypes = 0x12fc290, aggdirectargs = 0x0, args = 0x12fc3a0, aggorder = 0x0, aggdistinct = 0x0, aggfilter = 0x0,
aggstar = false, aggvariadic = false, aggkind = 110 'n', agglevelsup = 0, aggsplit = AGGSPLIT_SIMPLE, location = 18}
############
testdb=# select oid,proname from pg_proc where oid in (2132,769);
oid | proname
------+-------------
769 | int4smaller
2132 | min
(2 rows)
###########
第3个是avg:
(gdb) p node->peragg[2]
$40 = {aggref = 0x12fbf48, transno = 2, finalfn_oid = 1964, finalfn = {fn_addr = 0x978251 <int8_avg>, fn_oid = 1964,
fn_nargs = 1, fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x12eb310,
fn_expr = 0x1315698}, numFinalArgs = 1, aggdirectargs = 0x0, resulttypeLen = -1, resulttypeByVal = false,
shareable = true}
(gdb) p *node->peragg[2]->aggref
$41 = {xpr = {type = T_Aggref}, aggfnoid = 2101, aggtype = 1700, aggcollid = 0, inputcollid = 0, aggtranstype = 1016,
aggargtypes = 0x12fc008, aggdirectargs = 0x0, args = 0x12fc118, aggorder = 0x0, aggdistinct = 0x0, aggfilter = 0x0,
aggstar = false, aggvariadic = false, aggkind = 110 'n', agglevelsup = 0, aggsplit = AGGSPLIT_SIMPLE, location = 10}
#####
testdb=# select oid,proname from pg_proc where oid in (2101,1963);
oid | proname
------+----------------
1963 | int4_avg_accum
2101 | avg
(2 rows)
#####
aggstate->pertrans保存的是转换函数.
第1/2/3个分别是int4larger/int4smaller/int4_avg_accum
(gdb) p node->pertrans[0]
$19 = {aggref = 0x12fc458, aggshared = false, numInputs = 1, numTransInputs = 1, transfn_oid = 768, serialfn_oid = 0,
deserialfn_oid = 0, aggtranstype = 23, transfn = {fn_addr = 0x93e877 <int4larger>, fn_oid = 768, fn_nargs = 2,
fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x12eb310, fn_expr = 0x1315458},
serialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false, fn_stats = 0 '\000',
fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, deserialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
aggCollation = 0, numSortCols = 0, numDistinctCols = 0, sortColIdx = 0x0, sortOperators = 0x0, sortCollations = 0x0,
sortNullsFirst = 0x0, equalfnOne = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false,
fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, equalfnMulti = 0x0, initValue = 0,
initValueIsNull = true, inputtypeLen = 0, transtypeLen = 4, inputtypeByVal = false, transtypeByVal = true,
sortslot = 0x0, uniqslot = 0x0, sortdesc = 0x0, sortstates = 0x1308620, transfn_fcinfo = {flinfo = 0x130a418,
context = 0x12eb640, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 2, arg = {0 <repeats 100 times>},
argnull = {false <repeats 100 times>}}, serialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0,
fncollation = 0, isnull = false, nargs = 0, arg = {0 <repeats 100 times>}, argnull = {false <repeats 100 times>}},
deserialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 0, arg = {
0 <repeats 100 times>}, argnull = {false <repeats 100 times>}}}
(gdb) p node->pertrans[1]
$20 = {aggref = 0x12fc1d0, aggshared = false, numInputs = 1, numTransInputs = 1, transfn_oid = 769, serialfn_oid = 0,
deserialfn_oid = 0, aggtranstype = 23, transfn = {fn_addr = 0x93e8a3 <int4smaller>, fn_oid = 769, fn_nargs = 2,
fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x12eb310, fn_expr = 0x13155a8},
serialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false, fn_stats = 0 '\000',
fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, deserialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
aggCollation = 0, numSortCols = 0, numDistinctCols = 0, sortColIdx = 0x0, sortOperators = 0x0, sortCollations = 0x0,
sortNullsFirst = 0x0, equalfnOne = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false,
fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, equalfnMulti = 0x0, initValue = 0,
initValueIsNull = true, inputtypeLen = 0, transtypeLen = 4, inputtypeByVal = false, transtypeByVal = true,
sortslot = 0x0, uniqslot = 0x0, sortdesc = 0x0, sortstates = 0x1308640, transfn_fcinfo = {flinfo = 0x130b060,
context = 0x12eb640, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 2, arg = {0 <repeats 100 times>},
argnull = {false <repeats 100 times>}}, serialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0,
fncollation = 0, isnull = false, nargs = 0, arg = {0 <repeats 100 times>}, argnull = {false <repeats 100 times>}},
deserialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 0, arg = {
0 <repeats 100 times>}, argnull = {false <repeats 100 times>}}}
(gdb) p node->pertrans[2]
$21 = {aggref = 0x12fbf48, aggshared = false, numInputs = 1, numTransInputs = 1, transfn_oid = 1963, serialfn_oid = 0,
deserialfn_oid = 0, aggtranstype = 1016, transfn = {fn_addr = 0x977d8f <int4_avg_accum>, fn_oid = 1963, fn_nargs = 2,
fn_strict = true, fn_retset = false, fn_stats = 2 '\002', fn_extra = 0x0, fn_mcxt = 0x12eb310, fn_expr = 0x1315a68},
serialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false, fn_stats = 0 '\000',
fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, deserialfn = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0,
fn_strict = false, fn_retset = false, fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0},
aggCollation = 0, numSortCols = 0, numDistinctCols = 0, sortColIdx = 0x0, sortOperators = 0x0, sortCollations = 0x0,
sortNullsFirst = 0x0, equalfnOne = {fn_addr = 0x0, fn_oid = 0, fn_nargs = 0, fn_strict = false, fn_retset = false,
fn_stats = 0 '\000', fn_extra = 0x0, fn_mcxt = 0x0, fn_expr = 0x0}, equalfnMulti = 0x0, initValue = 20010920,
initValueIsNull = false, inputtypeLen = 0, transtypeLen = -1, inputtypeByVal = false, transtypeByVal = false,
sortslot = 0x0, uniqslot = 0x0, sortdesc = 0x0, sortstates = 0x13156f0, transfn_fcinfo = {flinfo = 0x130bca8,
context = 0x12eb640, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 2, arg = {0 <repeats 100 times>},
argnull = {false <repeats 100 times>}}, serialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0,
fncollation = 0, isnull = false, nargs = 0, arg = {0 <repeats 100 times>}, argnull = {false <repeats 100 times>}},
deserialfn_fcinfo = {flinfo = 0x0, context = 0x0, resultinfo = 0x0, fncollation = 0, isnull = false, nargs = 0, arg = {
0 <repeats 100 times>}, argnull = {false <repeats 100 times>}}}
aggstate->perhash存储的是per-hashtable数据.perhash->hashslot存储的是最小化Tuple.
本例只有一个group set,因此对应的hash表只有一个.
(gdb) p node->perhash[0]
$30 = {hashtable = 0x1308890, hashiter = {cur = 0, end = 0, done = false}, hashslot = 0x12ed238, hashfunctions = 0x12ed2d0,
eqfuncoids = 0x1308700, numCols = 1, numhashGrpCols = 1, largestGrpColIdx = 1, hashGrpColIdxInput = 0x1308660,
hashGrpColIdxHash = 0x1308680, aggnode = 0x120ba30}
(gdb) p node->perhash[0]->hashslot[0]
$32 = {type = T_TupleTableSlot, tts_isempty = true, tts_shouldFree = false, tts_shouldFreeMin = false, tts_slow = false,
tts_tuple = 0x0, tts_tupleDescriptor = 0x12ed120, tts_mcxt = 0x12eb310, tts_buffer = 0, tts_nvalid = 0,
tts_values = 0x12ed298, tts_isnull = 0x12ed2a0, tts_mintuple = 0x0, tts_minhdr = {t_len = 0, t_self = {ip_blkid = {
bi_hi = 0, bi_lo = 0}, ip_posid = 0}, t_tableOid = 0, t_data = 0x0}, tts_off = 0, tts_fixedTupleDescriptor = true}
其他的数据结构.
aggstate->hash_pergroup/all_pergroups/combinedproj
(gdb) p node->hash_pergroup
$35 = (AggStatePerGroup *) 0x13085f8
(gdb) p *node->hash_pergroup
$36 = (AggStatePerGroup) 0x0
(gdb) p *node->all_pergroups
$37 = (AggStatePerGroup) 0x0
(gdb) p *node->combinedproj
Cannot access memory at address 0x0
简单来说,整个过程大体如下:
每个Group的列信息会存储在aggstate->perhash中,按阶段(aggstate->phases)逐个执行.
扫描数据表,在遍历tuple的时候,通过hash函数比对(Key为最小化tuple)找到/创建相应的Group组aggstate->perhash(hash table),提取tuple中相应的列值作为参数输入到aggstate->pertrans中定义的转换函数中,解析执行结果并存储,最后执行投影操作,把最终结果返回给客户端
“PostgreSQL聚合函数的实现方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!