文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

Embedding空间中的时序异常检测,你学会了吗?

2024-11-29 20:18

关注

2.示例

假设某业务场景下,用户有100个来源渠道,用户使用产品时,有10种不同的操作方式,对于用户的行为,我们可以简单的撮取出PV、UV、失败率等指标。那么我们可以建立这样一个监控:

监控的维度:来源渠道 * 操作方式 = 100 * 10 = 1000个维度

监控的指标:PV、UV、失败率...

统计周期: 小时

然后针对每个维度、时刻、指标,收集过去30天的数据做为训练样本,训练异常检测模型(如EllipticEnvelope等),然后对当前时刻的指标值,进行异常检测。

上面的方法,通过合理的拆分监控维度,一方面可以有效的提高检测的灵敏度,避免较少的异常流量淹没在大盘监控在随机波动中;另一方面,也可以对异常流量进行快速的定位,便于及时处理。

3.问题

上面的方法也存在诸多的限制,比如:

显然,不是所有的业务场景都能满足上述的要求。即便是能满足上述要求的业务场景中,随着对攻击者的对抗不断深入,攻击者会尝试降低攻击的规模,并尽量将攻击行为分散到更多的维度中,从而躲避我们的检测手段。

4.解决思路

那么,能否不依赖业务维度拆分,直接对指标进行异常检测呢?

首先,我们需要把待检测的每一条日志、数据当做一个独立的样本。接下来,不难联想到,这些样本都可以映射到某个高维空间中,我们把这个空间叫做样本空间。可以通过向量化、Embedding等方法,得到样本在这个空间中的坐标。

样本在这个空间中的分布必然不是完全随机的,而是会存在一定的特点(分布特征)。若当前时刻样本在这个空间中的分布特征与历史数据中的分布特征不一致,则说明当前样本存在异常。而分布在差异最大的区域中的样本,则可以认为是异常样本。

接下来的问题就变成了如何对这种分布特征进行建模?

最先想到的是,我们可以通过聚类算法,来对样本进行划分,再对每个Cluster,提取出统计特征。但在具体实现时还需要考虑以下问题:

再进一步,其实我们不需要执行完整的聚类算法,我们只需要对样本空间设置足够多的采样点进行采样,计算出采样点附近的样本的统计特征做为采集采样点的分布特征,再对采样点的特征进行时间维度的异常检测,即可完成对整个样本空间的异常检测了。

图片

5.算法实验

5.1 数据准备

取某业务场景下近30天的用户行为日志,约160万条,利用其中的UserAgent信息,对其进行向量化处理。每条日志的向量长度为128维。

向量化算法:

def to_vector(ua):
    if isinstance(ua, (list, tuple)):
        return [to_vector(c) for c in ua]
    else:
        vec = np.zeros(128)
        for c in ua:
            vec[ord(c) % 128] += 1  # UserAgent中的字符绝大多数都是Ascll字符,所以取余128
        l2 = np.sqrt(np.sum(vec * vec))
        if l2 != 0:
            vec /= l2
        return vec.tolist()

将清洗好的数据保存到向量DB中备用:

for day in days:
    for hour in hours:
        event_day = day.strftime("%Y%m%d")
        event_hour = "{:02d}".format(hour)
        collection = chroma_client.get_or_create_collection(
            name="{}_{}_{}".format(name_prefix, event_day, event_hour)
        )
        sub_df = df_ua_pv[(df_ua_pv.event_day == event_day) & (df_ua_pv.event_hour == event_hour)]
        ids = [hashlib.md5(bytes(str(row), "utf-8")).hexdigest() for _, row in sub_df.iterrows()]
        docs = [row.ua for _, row in sub_df.iterrows()]
        metadatas = [{"pv": row.pv} for _, row in sub_df.iterrows()]
        embeddings = [to_vector(row.ua) for _, row in sub_df.iterrows()]
        batch_size = 10000
        for batch_id in range(0, len(docs), batch_size):
            collection.upsert(
                ids=ids[batch_id : batch_id + batch_size],
                documents=docs[batch_id : batch_id + batch_size],
                metadatas=metadatas[batch_id : batch_id + batch_size],
                embeddings=embeddings[batch_id : batch_id + batch_size],
            )
            print("{:>8d} / {}".format(batch_id + batch_size, len(docs)))
        collections[event_day + event_hour] = collection

为了更方便的验证算法的有效性,在数据集中,人工构造了一些异常样本,包括:

5.2 算法实现

随机生成采样点:

query_ua_list = (
    df_ua_pv[(df_ua_pv.event_day == event_day) & (df_ua_pv.event_hour == event_hour)].sample(100)["ua"].to_list()
)

在样本空间进行邻近采样:

results = []
query_ua_vec = to_vector(query_ua_list)
for day in days:
    for hour in hours:
        res = get_collection(day, hour).query(query_embeddings=query_ua_vec, n_results=n_results)
        for i in range(len(query_ua_list)):
            for j in range(n_results):
                row = [
                    query_ua_list[i],
                    res["metadatas"][i][j]["event_day"],
                    res["metadatas"][i][j]["event_hour"],
                    res["documents"][i][j],
                    res["metadatas"][i][j]["pv"],
                    res["distances"][i][j],
                ]
                if extra_fields:
                    for field in extra_fields:
                        row.append(res["metadatas"][i][j].get(field))
                results.append(row)
cols = ["ua", "day", "hour", "doc", "pv", "dist"]
if extra_fields:
    cols += extra_fields
df_results = pd.DataFrame(results, columns=cols)

定义要检测的字段:

AREA_EXP = [0, 2, 8]
MODEL_FIELDS = ["pv", "dist"]
MODEL_FIELDS += [f"dens_{i}" for i in AREA_EXP]
MODEL_FIELDS += ["dens_s"]
MODEL_AGGS = {}
for col in MODEL_FIELDS:
    MODEL_AGGS[f"{col}_mean"] = (col, "mean")
    MODEL_AGGS[f"{col}_std"] = (col, "std")

进行天维度的异常检测:

df_query_results["dens_s"] = 1 / (df_query_results["dist"] ** 0.5 + 1)
df_res_agg = df_query_results.groupby(["ua", "day"], as_index=False).agg(
    pv=("pv", "sum"),
    dist=("dist", "mean"),
    dens_s=("dens_s", "mean"),
)
for i in AREA_EXP:
    df_res_agg["area_{}".format(i)] = (df_res_agg["dist"] * 10) ** i
    df_res_agg["dens_{}".format(i)] = df_res_agg["pv"] / df_res_agg["area_{}".format(i)]
df_model = df_res_agg[df_res_agg.day <= last_event_day].groupby("ua").agg(**MODEL_AGGS)
df_check = df_res_agg.join(df_model, notallow="ua")
for col in MODEL_FIELDS:
    df_check[f"{col}_sigma"] = (df_check[col] - df_check[f"{col}_mean"]) / df_check[f"{col}_std"]
df_check["dens_avg_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].mean(axis=1)
df_check["dens_max_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].max(axis=1)
df_check["dens_min_sigma"] = df_check[["dens_s_sigma"] + [f"dens_{i}_sigma" for i in AREA_EXP]].min(axis=1)

6.实验效果

6.1 实验一

个别随机UA,PV增长:10%, 20%, 50%, 100%, 200%, 500%,1000%;数量:5;min_pv=100。

异常样本与原始样本的异常置信度分布对比如下图,由上到下分别为:

图片

天级检测不同阈值下的准召情况:

图片

小时级检测不同阈值下的准召情况:

图片

6.2 实验二

部分相似UA,PV增长:5%,10%,20%, 50%, 100%;数量:5, 10, 20;  min_pv=10。

异常样本与原始样本的异常置信度分布对比如下图,由上到下分别为:

图片

天级检测不同阈值下的准召情况:

图片

小时级检测不同阈值下的准召情况:

图片

6.3 实验三

生成相似UA,PV同比增长,数量:5, 10, 20, 50, 100。

异常样本与原始样本的异常置信度分布对比如下图,由上到下分别为:

图片

天级检测不同阈值下的准召情况:

图片

小时级检测不同阈值下的准召情况:

图片

6.4 实验四

生成相似UA,整体PV不增长,数量:10, 20, 50, 100;min_pv=1。

异常样本与原始样本的异常置信度分布对比如下图,由上到下分别为:

图片

天级检测不同阈值下的准召情况:

图片

小时级检测不同阈值下的准召情况:

图片

7.总结与展望

通过实验,验证了该算法的有效性,但在后续的工程化应用中,还需要结合具体的应用场景进行适当的调整。比如采样点的数量、采样点的选取方法、样本Embedding方法、距离计算方法等。

此外,在实践中,若要发挥出异常检测的真正价值,还需要考虑以下问题:

来源:百度Geek说内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯