文章详情

短信预约信息系统项目管理师 报名、考试、查分时间动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

ClickHouse源码笔记1:聚合函数的实现

2018-06-25 13:23

关注

ClickHouse源码笔记1:聚合函数的实现

由于工作的需求,后续笔者工作需要和开源的OLAP数据库ClickHouse打交道。ClickHouse是Yandex在2016年6月15日开源了一个分析型数据库,以强悍的单机处理能力被称道
笔者在实际测试ClickHouse和阅读ClickHouse的源码过程之中,对"战斗民族"开发的数据库十分欣赏。ClickHouse不仅是一个很好的数据库学习材料,而且同时应用了大量的CPP17的新特性进行开发,也是一个大型的Modern CPP的教导资料。
笔者接下来会陆续将阅读ClickHouse的部分心得体会与通过源码阅读笔记的方式和大家分享,坦白说,这种源码阅读笔记很难写啊。(多一分繁琐,少一分就模糊了~~)
第一篇文章,我们就从聚合函数的实现开始聊起~~ 上车!

1.基础知识的梳理

什么是聚合函数?

聚合函数: 顾名思义就是对一组数据执行聚合计算并返回结果的函数。
这类函数在数据库之中很常见,如:count, max, min, sum等等。

ClickHouse的实现接口
  
    virtual void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena) const = 0;

    /// Merges state (on which place points to) with other state of current aggregation function.
    virtual void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena * arena) const = 0;

    /// Serializes state (to transmit it over the network, for example).
    virtual void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const = 0;

    /// Deserializes state. This function is called only for empty (just created) states.
    virtual void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena * arena) const = 0;
    // 
    virtual void addBatch(size_t batch_size, AggregateDataPtr * places, size_t place_offset, const IColumn ** columns, Arena * arena) const = 0;

    void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }
template 
class IAggregateFunctionHelper : public IAggregateFunction
{
private:
    static void addFree(const IAggregateFunction * that, AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena * arena)
    {
        static_cast(*that).add(place, columns, row_num, arena);
    }

public:
    IAggregateFunctionHelper(const DataTypes & argument_types_, const Array & parameters_)
        : IAggregateFunction(argument_types_, parameters_) {}

    AddFunc getAddressOfAddFunction() const override { return &addFree; }
class AggregateFunctionFactory final : private boost::noncopyable, public IFactoryWithAliases
{
public:

    static AggregateFunctionFactory & instance();

    /// Register a function by its name.
    /// No locking, you must register all functions before usage of get.
    void registerFunction(
        const String & name,
        Creator creator,
        CaseSensitiveness case_sensitiveness = CaseSensitive);

    /// Throws an exception if not found.
    AggregateFunctionPtr get(
        const String & name,
        const DataTypes & argument_types,
        const Array & parameters = {},
        int recursion_level = 0) const;

2.聚合函数的注册流程

有了上述的背景知识,我们接下来举个栗子。来看看一个聚合函数的实现细节,以及它是如何被使用的。

AggregateFunctionSum

笔者这里选取了一个很简单的聚合算子Sum,我们来看看它实现的代码细节。
这里我们可以看到AggregateFunctionSum是个final类,无法被继承了。而它继承了上面提到的IAggregateFunctionHelp类的子类IAggregateFunctionDataHelper类。

这里我们就重点看,这个类override了getName方法,返回了对应的名字sum。并且实现了我们上文提到的四个核心的方法。

template 
class AggregateFunctionSum final : public IAggregateFunctionDataHelper>
{
public:
    using ResultDataType = std::conditional_t, DataTypeDecimal, DataTypeNumber>;
    using ColVecType = std::conditional_t, ColumnDecimal, ColumnVector>;
    using ColVecResult = std::conditional_t, ColumnDecimal, ColumnVector>;

    String getName() const override { return "sum"; }

    AggregateFunctionSum(const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper>(argument_types_, {})
        , scale(0)
    {}

    AggregateFunctionSum(const IDataType & data_type, const DataTypes & argument_types_)
        : IAggregateFunctionDataHelper>(argument_types_, {})
        , scale(getDecimalScale(data_type))
    {}

    DataTypePtr getReturnType() const override
    {
        if constexpr (IsDecimalNumber)
            return std::make_shared(ResultDataType::maxPrecision(), scale);
        else
            return std::make_shared();
    }

    void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
    {
        const auto & column = static_cast(*columns[0]);
        this->data(place).add(column.getData()[row_num]);
    }

    void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
    {
        this->data(place).merge(this->data(rhs));
    }

    void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
    {
        this->data(place).write(buf);
    }

    void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
    {
        this->data(place).read(buf);
    }

    void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
    {
        auto & column = static_cast(to);
        column.getData().push_back(this->data(place).get());
    }

private:
    UInt32 scale;
};

接下来,ClickHouse实现了两种聚合计算:AggregateFunctionSumDataAggregateFunctionSumKahanData。后者是用Kahan算法避免float类型精度损失的,我们可以暂时不细看。直接看SumData的实现。这是个模板类,之前我们讲到AggregateFunction的函数就是通过AggregateDataPtr指针来获取AggregateFunctionSumData的地址,来调用add实现聚合算子的。我们可以看到AggregateFunctionSumData实现了前文提到的add, merge, write,read四大方法,正好和接口一一对应上了。

template 
struct AggregateFunctionSumData
{
    T sum{};

    void add(T value)
    {
        sum += value;
    }

    void merge(const AggregateFunctionSumData & rhs)
    {
        sum += rhs.sum;
    }

    void write(WriteBuffer & buf) const
    {
        writeBinary(sum, buf);
    }

    void read(ReadBuffer & buf)
    {
        readBinary(sum, buf);
    }

    T get() const
    {
        return sum;
    }
};

ClickHouse在Server启动时。main函数之中会调用registerAggregateFunction的初始化函数注册所有的聚合函数。
然后调用到下面的函数:

void registerAggregateFunctionSum(AggregateFunctionFactory & factory)
{
    factory.registerFunction("sum", createAggregateFunctionSum, AggregateFunctionFactory::CaseInsensitive);
    factory.registerFunction("sumWithOverflow", createAggregateFunctionSum);
    factory.registerFunction("sumKahan", createAggregateFunctionSum);
}

这里又调用了 factory.registerFunction("sum", createAggregateFunctionSum, AggregateFunctionFactory::CaseInsensitive);来进行上述我们看到的聚合函数的注册。这里有一点很恶心的模板代码,笔者这里简化了一下,把注册的部分函数拉出来:

createAggregateFunctionSum(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
    AggregateFunctionPtr res;
    DataTypePtr data_type = argument_types[0];
    if (isDecimal(data_type))
        res.reset(createWithDecimalType(*data_type, *data_type, argument_types));
    else
        res.reset(createWithNumericType(*data_type, argument_types));
    return res;

这里的Function模板就是上面的AggregateFunctionSumSimple, 而它又是下面的模板类型:

template  using AggregateFunctionSumSimple = typename SumSimple::Function;

template 
struct SumSimple
{
    /// @note It uses slow Decimal128 (cause we need such a variant). sumWithOverflow is faster for Decimal32/64
    using ResultType = std::conditional_t, Decimal128, NearestFieldType>;
    using AggregateDataType = AggregateFunctionSumData;
    using Function = AggregateFunctionSum;
};

不知道读者被绕晕了没,最终绕回来还是new出来这个AggregateFunctionSum
也就是完成了这个求和算子的注册,后续我们get出来就可以愉快的调用啦。(这里这部分的模板变化比较复杂,如果看不明白可以回到源码梳理一下~~~)

3. 小结

好了,关于聚合函数的基础信息,和它是如何实现并且通过工厂方法注册获取的流程算是搞明白了。
关于其他的聚合算子,也是大同小异的方式。笔者就不再赘述了,感兴趣的可以回到源码之中继续一探究竟。讲完了聚合函数的实现,下一篇笔者就要继续给探究聚合函数究竟在ClickHouse之中是如何和列存结合使用,并实现向量化的~~。
笔者是一个ClickHouse的初学者,对ClickHouse有兴趣的同学,也欢迎和笔者多多指教,交流。

4. 参考资料

官方文档
ClickHouse源代码

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-数据库
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯