Fork me on GitHub

干货 | 百亿节点,毫秒级延迟,携程金融基于 nebula 的大规模图应用实践

作者简介

霖雾,携程数据开发工程师,关注图数据库等领域。

背景

2017年9月携程金融成立,在金融和风控业务中,有多种场景需要对图关系网络进行分析和实时查询,传统关系型数据库难以保证此类场景下的关联性能,且实现复杂性高,离线关联耗时过长,因此对图数据库的需求日益增加。携程金融从2020年开始引入大规模图存储和图计算技术,基于nebula构建了千亿级节点的图存储和分析平台,并取得了一些实际应用成果。本文主要分享nebula在携程金融的实践,希望能带给大家一些实践启发。

本文主要从以下几个部分进行分析:

  • 图基础介绍
  • 图平台建设
  • 内部应用案例分析
  • 痛点与优化
  • 总结规划

一、图基础

首先我们来简单介绍下图相关的概念:

1.1 什么是图

在计算机科学中,图就是一些顶点的集合,这些顶点通过一系列边结对(连接)。比如我们用一个图表示社交网络,每一个人就是一个顶点,互相认识的人之间通过边联系。
在图数据库中,我们使用 (起点,边类型,rank,终点) 表示一条边。起点和终点比较好理解,表示一条边两个顶点的出入方向。边类型则是用于区分异构图的不同边,如我关注了你,我向你转账,关注和转账就是两种不同种类的边。而rank是用来区分同起始点同终点的不同边,如A对B的多次转账记录,起点、终点、边类型是完全相同的 ,因此就需要如时间戳作为rank来区分不同的边。
同时,点边均可具有属性,如:A的手机号、银行卡、身份证号、籍贯等信息均可作为A的点属性存在,A对B转账这条边,也可以具有属性,如转账金额,转账地点等边属性。

图片

1.2 什么时候用图

(信息收集于开源社区、公开技术博客、文章、视频)

1)金融风控:

  • 诈骗电话的特征提取,如不在三步社交邻居圈内,被大量拒接等特征。实时识别拦截。(银行/网警等)
  • 转账实时拦截 (银行/支付宝等)
  • 实时欺诈检测,羊毛党的识别(电商)
  • 黑产群体识别,借贷记录良好用户关联,为用户提供更高额贷款、增加营收

2)股权穿透

影子集团、集团客户多层交叉持股、股权层层嵌套复杂关系的识别(天眼查/企查查)

3)数据血缘

在数据仓库开发过程中, 会因为数据跨表关联产生大量的中间表,使用图可直接根据关系模型表示出数据加工过程和数据流向,以及在依赖任务问题时快速定位上下游。

4)知识图谱

构建行业知识图谱

5)泛安全

ip关系等黑客攻击场景,计算机进程与线程等安全管理

6)社交推荐

  • 好友推荐,行为相似性,咨询传播路径,可能认识的人,大v粉丝共同关注,共同阅读文章等,商品相似性,实现好友商品或者咨询的精准推荐
  • 通过对用户画像、好友关系等,进行用户分群、实现用户群体精准管理

7)代码依赖分析

8)供应链上下游分析

如汽车供应链上下游可涉及上万零件及供应商,分析某些零件成本上涨/供应商单一/库存少等多维度的影响(捷豹)

1.3 谁在研发图,谁在使用图

(信息收集于开源社区、公开技术博客、文章、视频)

目前国内几家大公司都有各自研发的图数据库,主要满足内部应用的需求,大多数都是闭源的,开源的仅有百度的hugegraph。其他比较优秀的开源产品有Google Dgraph, vesoft的nebula 等,其中nebula在国内互联网公司应用非常广泛。结合我们的应用场景,以及外部公开的测试和内部压测,我们最终选择nebula构建金融图平台。

图片

二、图平台建设

图片

2.1. 图平台建设

我们的图平台早期只有1个3节点的nebula集群,随着图应用场景的不断扩充,需要满足实时检索、离线分析、数据同步与校验等功能,最终演化成上述架构图。

1)离线图:主要用于图构建阶段(建模、图算法分析),通过spark-connector同集团的大数据平台打通,此外我们还将Nebula提供的数10种常用图算法进行工具化包装,方便图分析人员在spark集群提交图算法作业。

2)线上图:经过离线图分析确定最终建模后,会通过spark-connector将数据导入线上图。通过对接qmq消息(集团内部的消息框架) 实时更新,对外提供实时检索服务。 同时也会有T+1的hive增量数据通过spark-connector按天写入。

3)全量校验:虽然 Nebula Graph 通过 TOSS 保证了正反边的插入一致性,但仍不支持事务,随着数据持续更新,实时图和离线(hive数据)可能会存在不一致的情况,因此我们需要定期进行全量数据的校验(把图读取到Hive,和Hive表存储的图数据进行比对,找出差异、修复),保证数据的最终一致性。

4)集群规模:为了满足千亿节点的图业务需求,实时集群采用三台独立部署的高性能机器,每台机器64core / 320GB / 12TB SSD ,版本为nebulav2.5,跨机房部署。离线集群64core /320GB /3.6TB SSD * 12 ,测试集群 48core/ 188GB/5T HDD * 4.

2.2. 遇到的问题

在nebula应用过程中,也发现一些问题,期待逐步完善:

1)资源隔离问题,目前nebula没有资源分组隔离功能 ,不同业务会相互影响;如业务图A在导数据,业务图B线上延迟就非常高。

2)版本升级问题:

  • nebula在版本升级过程中需要停止服务,无法实现热更新;对于类似实时风控等对可靠性要求非常高的场景非常不友好。此种情况下如需保证在线升级,就需要配备主备集群,每个集群切量后挨个升级,增加服务复杂性和运维成本。
  • 客户端不兼容,客户端需要跟着服务端一起升级版本。对于已有多个应用使用的nebula集群,想要协调各应用方同时升级客户端是比较困难的。

三、内部应用案例分析

3.1 数据血缘图

数据治理是近年来比较热的一个话题,他是解决数仓无序膨胀的有效手段,其中数据血缘是数据有效治理的重要依据,金融借助nebula构建了数据血缘图,以支撑数据治理的系统建设。

图片

图片

数据血缘就是数据产生的链路,记录数据加工的流向,经过了哪些过程和阶段;主要解决 ETL 过程中可能产出几十甚至几百个中间表导致的复杂表关系,借用数据血缘可以清晰地记录数据源头到最终数据的生成过程。

图 a 是数据血缘的关系图,采用库名 + 表名作为图的顶点来保证点的唯一性,点属性则是分开的库名和表名,以便通过库名或者表名进行属性查询。在两张表之间会建立一条边,边的属性主要存放任务的产生运行情况,比如说:任务开始时间,结束时间、用户 ID等等同任务相关的信息。

图 b 是实际查询中的一张关系图,箭头的方向表示了表的加工方向,通过上游或者下游表我们可以快速地找到它的依赖, 清晰明了地显示从上游到下游的每一个链路。

如果要表达复杂的血缘依赖关系图,通过传统的关系型数据库需要复杂的SQL实现(循环嵌套),性能也比较差,而通过图数据库实现,则可直接按数据依赖关系存储,读取也快于传统DB,非常简洁。目前,数据血缘也是金融BU在图数据库上的一个经典应用。

3.2 风控关系人图

关系人图常用于欺诈识别等场景,它是通过 ID、设备、手机标识以及其他介质信息关联不同用户的关系网络。比如说,用户 A 和用户 B 共享一个 WiFi,他们便是局域网下的关系人;用户 C 和用户 D 相互下过单,他们便是下单关系人。简言之,系统通过多种维度的数据关联不同的用户,这便是关系人图。

构建模型时,通常要查询某个时点(比如欺诈事件发生前)的关系图,对当时的图进行模型抽取和特征构建,我们称这个过程为图回溯。随着回溯时间点的不同,返回的图数据也是动态变化的;比如某人上午,下午各自打了一通电话, 需要回溯此人中午时间点时的图关系,只会出现上午的电话记录,具体到图,则每类边都具有此类时间特性,每一次查询都需要对时间进行限制。

对于图回溯场景,最初我们尝试通过HIVE SQL实现,发现对于二阶及以上的图回溯,SQL表达会非常复杂,而且性能不可接受(比如二阶回溯 Hive需要跑数小时,三阶回溯Hive几乎不能实现);因此尝试借助图数据库来实现,把时间作为边rank进行建模,再根据边关系进行筛选来实现回溯。这种回溯方式更直观、简洁,使用简单的API即可完成,在性能上相比Hive也有1个数量级以上的提升(二阶回溯,图节点:百亿级,待回溯节点:10万级)。

图片

下面用一个例子说明:如图(a),点A分别在 t0 、t1、 t2 时刻建立了一条边 ,t0、t1、t2为边rank值,需要返回tx时的的图关系数据,只能返回 t0、 t1 对应的 点 B、C ,因为当回溯到tx时间点时候,t2还没有发生;最终返回的图关系为 t0 和 t1 时候,VertexA ->VertexB 、 VertexA -> VertexC (见图 (c) )。这个例子是用一种边进行回溯,实际查询中可能会涉及到 2~3 跳,且存在异构边(打电话是一种边,点外卖又是一种边,下单酒店机票是一种边,都是不同类型的边),而这种异构图的数据都具有回溯特征,因此实际的关系人图回溯查询也会变得复杂。

3.3 实时反欺诈图

图片

用户下单时,会进入一个快速风控的阶段:通过基于关系型数据库和图数据库的规则进行模型特征计算,来判断这个用户是不是风险用户,要不要对该用户进行下单拦截(实时反欺诈)。

我们可以根据图关系配合模型规则,用来挖掘欺诈团伙。比如说,已知某个 uid 是犯欺团伙的一员,根据图关联来判断跟他关系紧密的用户是不是存在欺诈行为。为了避免影响正常用户的下单流程,风控阶段需要快速响应,因此对图查询的性能要求非常高(P95 <15ms)。我们基于nebula构建了百亿级的反欺诈图,在查询性能的优化方面进行了较多思考。

图片

此图 Schema 为脱敏过后的部分图模型,当中隐藏很多建模信息。这里简单讲解下部分的查询流程和关联信息。

如上图为一次图查询流程,每一次图查询由多个起始点如用户uid、用户mobile等用户信息同时开始,每条线为一次关联查询,因此一次图查询由几十次点边查询组成,由起始点经过一跳查询和2跳查询,最终将结果集返回给风控引擎。

系统会将用户的信息,转化为该用户的标签。在图查询的时候,根据这些标签,如 uid、mobile 进行独立查询。举个例子,根据某个 uid 进行一跳查询,查询出它关联的 5 个手机号。再根据这 5 个手机号进行独立的 2 跳查询,可能会出来 25 个 uid,查询会存在数据膨胀的情况。因此,系统会做一个查询限制。去查看这 5 个手机号关联的 uid 是不是超过了系统设定的热点值。如果说通过 mobile 查询出来关联的手机号、uid 过多的话,系统就会判断其为热点数据,不进行边结果返回。(二阶/三阶回溯,图点边:百亿级)。

四、痛点及优化

在上述应用场景中,对于风控关系人图和反欺诈图,由于图规模比较大(百亿点边),查询较多,且对时延要求较高,遇到了一些典型问题,接下来简单介绍一下。

4.1 查询性能问题

为了满足实时场景2跳查询p95 15ms需求,我们针对图schema和连接池以及查询端做了一些优化:

4.1.1 牺牲写性能换取读性能

图片

首先,我们来看看这样的一个需求: 查询id关联的手机号 ,需要满足对于这个手机号关联边不超过3个。这里解释下为什么要限制关联边数量, 因为我们正常个体关联边数量是有限的,会有一个对于大多数人的p95这样的阈值边数量,超过这个阈值就是脏数据。为了这个阈值校验, 就需要对每次查询的结果再多查询一跳。

如图(a)所示,我们需要进行2次查询,第一跳查询是为了查询用户id关联的手机号, 第二跳查询是为了保证我们的结果值是合法的(阈值内),这样每跳查询最终需要进行2跳查询来满足。如图给出了图查询的gsql 2步伪码,这种情况下无法满足我们的高时效性。如何优化呢?看下图(b) :

图片

我们可以将热点查询固定在点属性上,这样一跳查询时就可以知道该点有多少关联边, 避免进行图 a 中(2)语句验证。还是以图 (a)为例,从一个用户 ID 开始查询,查询他的手机号关联,此时因为手机号关联的边已经变成了点属性(修改了 schema),图(a) 2 条查询语句实现的功能就可以变成一条查询 go from id over edgeName where $手机号.用户id边数据 <5 | limit 5。

这种设计的好处就是,在读的时候可以加速验证过程, 节约了一跳查询。带来的成本是:每写一条边,同时需要更新2个点属性来记录点的关联边情况,而且需要保证幂等(保证重复提交不会叠加属性+1),当插入一条边的时,先去图里面查询边是否存在,不存在才会进行写边以及点属性 +1 的操作。也就是我们牺牲了写性能,来换取读性能,并通过定期check保证数据一致。

4.1.2 池化连接降低时延

第二个优化手段是通过池化连接降低时延。Nebula 官方连接池每次进行查询均需要进行建立初始化连接-执行查询任务-关闭连接。而在高频(QPS 会达到几千)的查询场景中,频繁的创建、关闭连接非常影响系统的性能和稳定性。且建立连接过程耗时平均需要6ms, 比实际查询时长1.5ms左右高出几倍,这是不可接受的。因此我们对官方客户端进行了二次封装,实现连接的复用和共享。最后将查询p95从 20ms 降低到了 4ms。通过合理控制并发,我们最终将 2跳查询性能控制在p95 15ms 。

这里贴下代码供参考:

public class SessionPool {

    /**
     * 创建连接池
     *
     * @param maxCountSession 默认创建连接数
     * @param minCountSession 最大创建连接数
     * @param hostAndPort     机器端口列表
     * @param userName        用户名
     * @param passWord        密码
     * @throws UnknownHostException
     * @throws NotValidConnectionException
     * @throws IOErrorException
     * @throws AuthFailedException
     */
    public SessionPool(int maxCountSession, int minCountSession, String hostAndPort, String userName, String passWord) throws UnknownHostException, NotValidConnectionException, IOErrorException, AuthFailedException {
        this.minCountSession = minCountSession;
        this.maxCountSession = maxCountSession;
        this.userName = userName;
        this.passWord = passWord;
        this.queue = new LinkedBlockingQueue<>(minCountSession);
        this.pool = this.initGraphClient(hostAndPort, maxCountSession, minCountSession);
        initSession();
    }

    public Session borrow() {
        Session se = queue.poll();
        if (se != null) {
            return se;
        }
        try {
            return this.pool.getSession(userName, passWord, true);
        } catch (Exception e) {
            log.error("execute borrow session fail, detail: ", e);
            throw new RuntimeException(e);
        }
    }

    public void release(Session se) {
        if (se != null) {
            boolean success = queue.offer(se);
            if (!success) {
                se.release();
            }
        }
    }

    public void close() {
        this.pool.close();
    }

    private void initSession() throws NotValidConnectionException, IOErrorException, AuthFailedException {
        for (int i = 0; i < minCountSession; i++) {
            queue.offer(this.pool.getSession(userName, passWord, true));
        }
    }

    private NebulaPool initGraphClient(String hostAndPort, int maxConnSize, int minCount) throws UnknownHostException {
        List<HostAddress> hostAndPorts = getGraphHostPort(hostAndPort);
        NebulaPool pool = new NebulaPool();
        NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
        nebulaPoolConfig = nebulaPoolConfig.setMaxConnSize(maxConnSize);
        nebulaPoolConfig = nebulaPoolConfig.setMinConnSize(minCount);
        nebulaPoolConfig = nebulaPoolConfig.setIdleTime(1000 * 600);
        pool.init(hostAndPorts, nebulaPoolConfig);
        return pool;
    }

    private List<HostAddress> getGraphHostPort(String hostAndPort) {
        String[] split = hostAndPort.split(",");
        return Arrays.stream(split).map(item -> {
            String[] splitList = item.split(":");
            return new HostAddress(splitList[0], Integer.parseInt(splitList[1]));
        }).collect(Collectors.toList());
    }

    private Queue<Session> queue;

    private String userName;

    private String passWord;

    private int minCountSession;

    private int maxCountSession;

    private NebulaPool pool;

}

4.1.3 查询端优化

对于查询端,像3.3中的例图,每一次图查询由多个起始点开始,可拆解为几十次点边查询,需要让每一层的查询尽可能地并发进行,降低最终时延。我们可以先对 1 跳查询并发(约十几次查询),再对结果进行分类合并,进行第二轮的迭代并发查询(十几到几十次查询),通过合理地控制并发,可将一次组合图查询的 P95 控制在 15 ms 以内。

4.2 边热点问题

在图查询过程中,存在部分用户id 关联过多信息,如黄牛用户关联过多信息,这部分异常用户会在每一次查询时被过滤掉,不会继续参与下一次查询,避免结果膨胀。而判断是否为异常用户,则依赖于数据本身设定的阈值,异常数据不会流入下一阶段对模型计算造成干扰。

4.3 一致性问题

Nebula Graph 本身是没有事务的,对于上文写边以及点属性 +1 的操作,如何保证这些操作的一致性,上文提到过,我们会定期对全量HIVE表数据和图数据库进行check,以 HIVE 数据为准对线上图进行修正,来实现最终一致性。目前来说,图数据库和 HIVE 表不一致的情况还是比较少的。

五、总结与展望

基于nebula的图业务应用,完成了对数据血缘、对关系人网络、反欺诈等场景的支持,并将持续应用在金融更多场景下,助力金融业务。我们将持续跟进社区,结合自身应用场景推进图平台建设 ;同时也期待社区版能提供热升级、资源隔离、更丰富易用的算法包、更强大的studio 等功能。


本文地址:https://www.6aiq.com/article/1657360619214
本文版权归作者和AIQ共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出