当前位置: 代码网 > it编程>编程语言>Java > SpringBoot3集成Calcite多数据源查询的实战示例小结

SpringBoot3集成Calcite多数据源查询的实战示例小结

2026年02月09日 Java 我要评论
前言:跨库查询的痛,谁懂?凌晨两点被告警电话叫醒,订单查询接口超时,客服群炸开了锅。排查了一圈才发现问题的根源:订单数据在 mysql,用户信息在 postgresql,行为画像在 mongodb,访

前言:跨库查询的痛,谁懂?

凌晨两点被告警电话叫醒,订单查询接口超时,客服群炸开了锅。排查了一圈才发现问题的根源:订单数据在 mysql,用户信息在 postgresql,行为画像在 mongodb,访问日志存在 hive。四套 dao 层相互牵制,改动任何一个字段就像推倒多米诺骨牌一样引发连锁反应。

企业项目一旦数据源增多,"多数据源管理"很快就演变成维护黑洞。每新增一个数据源,就要新建一套 dao 配置、数据源连接池、事务管理。复用率低不说,还把业务逻辑和数据访问层死死绑在一起。最要命的是,不同数据源的字段命名规范不统一,user_levelcustomer_leveluser_grade 混着用,改一个字段名,订单系统、用户系统、风控系统、报表系统全线报错,谁改谁背锅。

传统方案要么做全量数据同步到数仓,要么写一堆分布式事务代码,要么在内存里手动 join。结果就是:延迟不可避免,查询结果拿到的往往是旧数据快照;性能是硬伤,mysql 擅长点查和小范围索引扫描,mongodb 天生文档检索,hive 批量扫描效率高,kafka 流式处理,一刀切的内存聚合方式很快就把 jvm 打爆了,查询延迟像筛子一样抖个不停。

一、重新认识 apache calcite:不只是数据库,更是查询大脑

很多人第一次听到 “apache calcite”,直觉反应是"又一个数据库"。这个理解完全错了。

calcite 本质上是一个动态数据管理框架,专注于提供 sql 解析、查询优化和跨数据源连接的基础能力,但不涉及数据的存储和处理。这种设计哲学让它具备了极其灵活的适配能力。

它主要做四件事:

1. sql 解析与验证

将用户提交的 sql 语句解析为抽象语法树(ast),并进行语义分析,验证表名、列名是否存在,数据类型是否匹配等。就像把一句自然语言翻译成结构化的指令树。

2. 查询优化

这是 calcite 的核心杀手锏。它提供基于规则优化(rbo)和基于代价优化(cbo)两种策略:

  • 规则优化:通过预设规则重写查询,比如谓词下推、投影裁剪、join 重排
  • 代价优化:根据统计信息估算不同执行计划的成本,选择最优方案

3. 数据源适配

calcite 通过适配器(adapter)机制连接各种数据源,包括:

  • 关系型数据库:mysql、postgresql、oracle、sql server
  • nosql 数据库:mongodb、cassandra、redis
  • 文件系统:csv、json、parquet、excel
  • 大数据引擎:hive、spark、kafka

甚至还可以自定义适配器,让任何数据源都能接入。

4. 跨数据源查询

能够连接不同类型的数据源,通过适配器统一抽象不同数据源的操作,将查询分解为各数据源可处理的子查询,然后合并结果。

一句话总结:calcite 把"数据在哪"和"怎么查"彻底拆开了。你写一句标准 sql,它负责解析、优化、拆分,最终把查询路由到各个数据源执行。

很多熟悉的系统都在用 calcite:flink sql 用它做解析与优化,hive 的 cbo 用它打底,drill、kylin、druid 都接入了它的能力。

二、spring boot 3 集成 calcite 实战指南

2.1 核心依赖引入

第一步是添加 maven 依赖。在 pom.xml 中引入 calcite 核心包、对应数据源的适配器,以及 mybatis plus 的核心依赖。

<!-- calcite 核心依赖 -->
<dependency>
    <groupid>org.apache.calcite</groupid>
    <artifactid>calcite-core</artifactid>
    <version>1.36.0</version>
</dependency>
<!-- mysql 适配器 -->
<dependency>
    <groupid>org.apache.calcite</groupid>
    <artifactid>calcite-mysql</artifactid>
    <version>1.36.0</version>
</dependency>
<!-- mongodb 适配器 -->
<dependency>
    <groupid>org.apache.calcite</groupid>
    <artifactid>calcite-mongodb</artifactid>
    <version>1.36.0</version>
</dependency>
<!-- mybatis plus 核心依赖(需要适配 spring boot 3) -->
<dependency>
    <groupid>com.baomidou</groupid>
    <artifactid>mybatis-plus-boot-starter</artifactid>
    <version>3.5.5</version>
</dependency>
<!-- 数据源连接池 -->
<dependency>
    <groupid>com.alibaba</groupid>
    <artifactid>druid-spring-boot-starter</artifactid>
    <version>1.2.20</version>
</dependency>

三个避坑点必须强调:

  1. calcite 所有组件版本必须统一。核心包和适配器版本要一致,否则容易出现类加载异常。
  2. mybatis plus 要选适配 spring boot 3 的版本。必须是 3.5.3 及以上版本,旧版本不支持。
  3. 一定要加连接池依赖。mybatis plus 需要连接池支持才能正常管理 calcite 数据源。

2.2 编写 calcite 模型文件

模型文件是 calcite 识别数据源的关键,通常使用 json 格式,放在 resources 目录下,命名为 calcite-model.json

下面是一个适配 mysql 和 mongodb 双数据源的示例:

{
  "version": "1.0",
  "defaultschema": "ecommerce",
  "schemas": [
    {
      "name": "ecommerce",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.jdbc.jdbcschema$factory",
      "operand": {
        "jdbcurl": "jdbc:mysql://localhost:3306/ecommerce_order?usessl=false&servertimezone=utc",
        "username": "root",
        "password": "123456",
        "driver": "com.mysql.cj.jdbc.driver"
      }
    },
    {
      "name": "user_mongo",
      "type": "custom",
      "factory": "org.apache.calcite.adapter.mongodb.mongoschema$factory",
      "operand": {
        "host": "localhost",
        "port": 27017,
        "database": "user_db",
        "collection": "user_info"
      }
    }
  ]
}

关键配置说明:

  • defaultschema:默认查询的 schema,可省略。如果省略,查询时需要指定 schema 名称(如 ecommerce.orderuser_mongo.user_info)。
  • factory:对应数据源的适配器工厂类。calcite 已为主流数据源提供现成工厂,自定义数据源需要实现自己的工厂类。
  • operand:数据源连接参数,根据数据源类型不同配置不同参数(如 mysql 的 jdbcurl、mongodb 的 host/port)。

2.3 spring boot 集成 calcite + mybatis plus 核心配置

这一步是集成的核心,主要分两步走:

  1. 配置好 calcite 数据源
  2. 让 mybatis plus 使用这个数据源,并配置 mapper 扫描、分页插件等基础参数
import com.baomidou.mybatisplus.annotation.dbtype;
import com.baomidou.mybatisplus.extension.plugins.mybatisplusinterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.paginationinnerinterceptor;
import org.apache.calcite.jdbc.calciteconnection;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
import org.mybatis.spring.annotation.mapperscan;
import org.springframework.core.io.support.pathmatchingresourcepatternresolver;
import javax.sql.datasource;
import java.sql.connection;
import java.sql.drivermanager;
import java.util.properties;
@configuration
@mapperscan(basepackages = "com.example.calcite.mapper")
public class calcitemybatisplusconfig {
    // 1. 配置 calcite 数据源
    @bean
    public datasource calcitedatasource() throws exception {
        properties props = new properties();
        props.setproperty("model", "classpath:calcite-model.json");
        connection connection = drivermanager.getconnection("jdbc:calcite:", props);
        calciteconnection calciteconnection = connection.unwrap(calciteconnection.class);
        return calciteconnection.getdatasource();
    }
    // 2. 配置 mybatis plus 的 sqlsessionfactory,指定使用 calcite 数据源
    @bean
    public sqlsessionfactory sqlsessionfactory(datasource calcitedatasource) throws exception {
        mybatissqlsessionfactorybean sessionfactory = new mybatissqlsessionfactorybean();
        // 注入 calcite 数据源
        sessionfactory.setdatasource(calcitedatasource);
        // 配置 mapper.xml 文件路径
        sessionfactory.setmapperlocations(
            new pathmatchingresourcepatternresolver().getresources("classpath:mapper/*.xml")
        );
        // 配置 mybatis plus 全局参数
        org.apache.ibatis.session.configuration configuration = 
            new org.apache.ibatis.session.configuration();
        configuration.setmapunderscoretocamelcase(true); // 下划线转驼峰
        sessionfactory.setconfiguration(configuration);
        // 注入 mybatis plus 插件
        sessionfactory.setplugins(mybatisplusinterceptor());
        return sessionfactory.getobject();
    }
    // 3. mybatis plus 分页插件
    @bean
    public mybatisplusinterceptor mybatisplusinterceptor() {
        mybatisplusinterceptor interceptor = new mybatisplusinterceptor();
        interceptor.addinnerinterceptor(
            new paginationinnerinterceptor(dbtype.mysql) // 适配 calcite 兼容的 mysql 语法
        );
        return interceptor;
    }
    // 4. 配置事务管理器
    @bean
    public platformtransactionmanager transactionmanager(datasource calcitedatasource) {
        return new datasourcetransactionmanager(calcitedatasource);
    }
}

核心逻辑梳理:

先通过 calcite 创建统一的数据源,再把它注入到 mybatis plus 的 sqlsessionfactory 里。这样一来,后续写代码完全是 mybatis plus 的熟悉风格,无论是 mapper 接口还是 xml 映射文件,都能直接用,跨数据源查询的复杂逻辑全交给 calcite 处理。

2.4 核心查询实现

定义实体类

使用 lombok 注解定义 vo 类,包含订单和用户信息:

import lombok.data;
@data
public class userordervo {
    // 订单信息
    private string orderid;
    private string ordertime;
    private double amount;
    // 用户信息
    private string username;
    private string phone;
    private string userid;
}

定义 mapper 接口

继承 basemapper 获得 mybatis plus 基础 crud 能力,使用 @select 注解编写跨数据源关联 sql:

import org.apache.ibatis.annotations.mapper;
import org.apache.ibatis.annotations.select;
import java.util.list;
@mapper
public interface userordermapper extends basemapper<userordervo> {
    // 注解方式编写跨数据源关联 sql
    @select("select " +
            "  o.order_id as orderid, " +
            "  o.order_time as ordertime, " +
            "  o.amount, " +
            "  u.user_name as username, " +
            "  u.phone " +
            "from ecommerce.orders o " +
            "join user_mongo.user_info u on o.user_id = u.user_id " +
            "where o.user_id = #{userid}")
    list<userordervo> queryuserorderbyuserid(string userid);
}

如果使用 xml 方式:

resources/mapper/userordermapper.xml 中编写:

<?xml version="1.0" encoding="utf-8"?>
<!doctype mapper public "-//mybatis.org//dtd mapper 3.0//en" 
    "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.calcite.mapper.userordermapper">
    <select id="queryuserorderbyuseridwithxml" resulttype="com.example.calcite.entity.userordervo">
        select
            o.order_id as orderid,
            o.order_time as ordertime,
            o.amount,
            u.user_name as username,
            u.phone
        from ecommerce.orders o
        join user_mongo.user_info u on o.user_id = u.user_id
        where o.user_id = #{userid}
    </select>
</mapper>

编写 service 层

继承 serviceimpl 实现业务逻辑:

import com.baomidou.mybatisplus.extension.service.impl.serviceimpl;
import org.springframework.stereotype.service;
import java.util.list;
@service
public class userorderservice extends serviceimpl<userordermapper, userordervo> {
    public list<userordervo> getuserorderbyuserid(string userid) {
        // 直接调用 mapper 接口方法
        return this.basemapper.queryuserorderbyuserid(userid);
    }
    // 如果使用 xml 方式:
    public list<userordervo> getuserorderbyuseridwithxml(string userid) {
        return this.basemapper.queryuserorderbyuseridwithxml(userid);
    }
}

编写 controller 层

import org.springframework.beans.factory.annotation.autowired;
import org.springframework.web.bind.annotation.getmapping;
import org.springframework.web.bind.annotation.pathvariable;
import org.springframework.web.bind.annotation.restcontroller;
import java.util.list;
@restcontroller
public class crossdatasourcequerycontroller {
    @autowired
    private userorderservice userorderservice;
    @getmapping("/user/order/{userid}")
    public list<userordervo> queryuserorder(@pathvariable string userid) {
        // 调用 service 方法,返回跨数据源查询结果
        return userorderservice.getuserorderbyuserid(userid);
    }
}

三个关键点:

  1. 实体类字段要和查询结果列名对应。使用别名适配下划线转驼峰更省心(如 o.order_id as orderid)。
  2. mapper 接口继承 basemapper 后,mybatis plus 的分页、条件构造器这些功能都能正常使用。
  3. sql 查询要带 schema 前缀。比如 ecommerce.ordersuser_mongo.user_info,明确指定数据源。

三、经典使用场景深度解析

3.1 多系统数据融合查询

场景痛点:

大型企业里数据分散在不同系统,订单系统用 mysql,用户系统用 mongodb 存行为数据,库存系统用 postgresql。传统做法需要分别调用三个系统接口,在内存中手动整合数据,效率低且容易出错。一旦某个系统字段变更,所有调用方都要改。

calcite 解决方案:

用 calcite 分别适配三个数据源后,只要写一套标准 sql 就能实现跨数据源关联查询。业务层完全不用管数据存在哪,专注核心业务逻辑。

价值收益:

  • 开发效率提升 50% 以上。不用写重复的接口调用和数据整合代码。
  • calcite 的查询优化器会自动优化关联逻辑。先把时间过滤推给 mysql 做索引扫描,再把用户 id 集合下推给 mongodb 做 in 查询,最后在本地完成 join 和聚合。
  • 查询效率也能跟上。谓词下推、投影裁剪、join 重排这些优化都是自动的,不用手动抠。

3.2 实时数据与离线数据联动查询

场景需求:

运营需要实时查看今日订单加近 30 天历史订单的汇总数据。实时订单数据存在 kafka 里,历史订单数据存在 hive 里。

传统方案:

需要开发两套查询逻辑,分别从 kafka 和 hive 拉数据,再在内存中合并。开发成本高,而且数据同步有延迟,合并后的结果未必是实时的。

calcite 解决方案:

用 calcite 的 kafka 适配器和 hive 适配器,把实时流数据和离线数据放到同一个查询体系里。写一条 sql 就能实现实时加离线数据的联合查询:

select 
  product_id,
  sum(amount) as total_amount
from (
  -- 实时数据(kafka)
  select product_id, amount from realtime.orders where dt = current_date
  union all
  -- 离线数据(hive)
  select product_id, amount from hive.orders where dt >= current_date - interval '30' day
) combined
group by product_id

价值收益:

  • 省去了数据同步成本。不用把 kafka 数据实时同步到 hive,也不用把 hive 数据预聚合出来。
  • 兼顾实时性和准确性。kafka 的实时数据 + hive 的历史数据,联合查询结果既有实时性又有完整性。

3.3 自定义数据源适配

场景需求:

企业里有很多 csv、excel、parquet 格式的文件数据,需要集成到业务系统中查询。

传统方案:

先把这些文件导入数据库(如 mysql),然后再提供查询接口。数据迁移成本高,而且数据更新后需要重新导入。

calcite 解决方案:

calcite 内置了文件适配器,支持直接查询这些文件数据,根本不用导入数据库。结合 spring boot 3 的文件上传功能,还能实现文件上传后直接用 sql 查询的需求。

模型配置示例:

{
  "name": "files",
  "type": "custom",
  "factory": "org.apache.calcite.adapter.csv.csvschemafactory",
  "operand": {
    "directory": "/data/files",
    "flavor": "mysql"
  }
}

查询示例:

-- 直接查询 csv 文件
select * from files.sales_data where region = 'east'
-- 与其他数据源关联
select 
  f.product_id,
  p.product_name,
  f.sales_amount
from files.sales_data f
join ecommerce.products p on f.product_id = p.id

四、避坑指南:集成注意事项

4.1 版本一致性

问题:

calcite 核心依赖和各数据源适配器的版本必须一致,不然很容易出现类加载异常。

避坑技巧:

  • 推荐使用 calcite 1.36.0 版本。这个版本稳定性好,适配器齐全。
  • 不要混用不同版本。比如 calcite-core 1.36.0 + calcite-mysql 1.35.0,这种组合极易出问题。
  • mvn dependency:tree 检查依赖冲突。

4.2 模型文件配置规范

问题:

schema 名称、表名要清晰,别重复。数据源的地址、端口、账号密码这些连接参数一定要准确。

避坑技巧:

  • 按业务域划分 schema。比如 retail.orderscrm.users,一眼能看懂归属。
  • 表名使用有意义的名称。避免用 table1table2 这种模糊命名。
  • 连接参数做好环境隔离。开发、测试、生产环境用不同的配置文件。

4.3 数据源性能考虑

问题:

跨数据源查询的性能取决于最慢的那个数据源。如果 mysql 查询很快,但 mongodb 慢,整体查询还是会被拖慢。

避坑技巧:

  • 确保每个数据源自身性能没问题。定期检查慢查询,优化索引。
  • 合理使用谓词下推。calcite 会自动把过滤条件推到数据源执行,但要检查执行计划,确保推下去了。
  • 对小表考虑广播到内存,避免大表大表硬碰硬。

4.4 sql 方言差异

问题:

不同数据源支持的 sql 语法有差异。比如 grouping sets、窗口函数、子查询某些源不完全支持。

避坑技巧:

  • 查询前先测试。开发阶段用 calcite 的 explain 功能看执行计划。
  • 不支持的语法要降级实现。比如某个源不支持窗口函数,就改用子查询。
  • 定期查看 calcite 和各数据源的官方文档,了解语法支持情况。

五、优化小技巧:让查询更快更稳

5.1 启用 calcite 缓存

元数据缓存:

避免每次查询都重新加载 schema 结构,减少重复解析和元数据查询的时间。

props.setproperty("calcite.metadatacachesize", "1000");

查询计划缓存:

对相同 sql 查询,缓存执行计划,提升重复查询效率。

props.setproperty("calcite.parser.factory", 
    "org.apache.calcite.sql.parser.impl.sqlparserimpl#factory");

5.2 优化 sql 写法

能下推就下推:

尽量避免复杂的多表关联在 calcite 层执行,把过滤条件下推到数据源。

示例:

-- 不好的写法:先全量扫描再过滤
select * from ecommerce.orders o join user_mongo.users u 
  on o.user_id = u.user_id 
  where o.create_time >= '2025-01-01'
-- 好的写法:时间过滤下推到 mysql
select * from (
  select * from ecommerce.orders where create_time >= '2025-01-01'
) o 
join user_mongo.users u on o.user_id = u.user_id

选择必要字段:

避免 select *,只查询需要的字段,减少数据传输量。

5.3 自定义优化规则

如果是特别复杂的业务场景,可以自己实现 reloptrule 接口,写自定义的查询优化规则。

示例场景:

强制某些过滤条件先执行,或者把某个维表标成可广播。

代码示例:

import org.apache.calcite.rel.relnode;
import org.apache.calcite.rel.rules.reloptrule;
public class customrule extends reloptrule {
    public customrule() {
        super(operand(filterrel.class, any()));
    }
    @override
    public boolean matches(reloptrulecall call) {
        // 自定义匹配逻辑
        return true;
    }
    @override
    public void onmatch(reloptrulecall call) {
        // 自定义优化逻辑
        filterrel filter = call.rel(0);
        // ... 重写查询计划
    }
}

5.4 开启 explain 分析

定期分析慢查询的执行计划,找出性能瓶颈:

explain plan for 
select * from ecommerce.orders o 
join user_mongo.user_info u on o.user_id = u.user_id 
where o.user_id = '123456'

重点关注:

  • 拉回来的行数
  • 下推的过滤条件
  • join 顺序
  • 是否使用了索引

六、边界与选型:什么时候不该用 calcite

6.1 不适合的场景

强事务 oltp 场景:

calcite 不适合替代 oltp 数据库的核心业务场景。对于强事务需求,如转账扣减、库存冻结,仍需使用传统数据库,跨库分布式事务别硬凑。

大规模分布式计算:

如果要的是超大规模分布式计算,直接开 trino、presto 集群。calcite 更像"查询大脑",把解析和优化做好,执行靠接出来的源,或者你自己写执行器。

高频点查场景:

对于高频单表点查,直接用数据源自身的驱动可能更高效,没必要经过 calcite 这一层。

6.2 适合的场景

  • 数据中台:作为统一查询层
  • 多系统整合:跨部门、跨系统的数据融合
  • 实时分析:流批一体化分析
  • 数据虚拟化:免 etl,直接查询原始数据源

6.3 与其他方案的对比

方案优点缺点适用场景
calcite嵌入式、自定义规则强、轻量级分布式能力弱spring boot 应用内集成、规则定制化需求强
trino/presto分布式、大规模计算学习曲线陡、集群运维成本高数据仓库、大规模分析
etl稳定、数据一致性好实时性差、开发量大离线数仓、数据同步为主
bi 平台易用、可视化强依赖工具、定制化受限业务分析、报表展示

七、团队协作与治理

7.1 统一 sql 编码规范

  • 强制使用 schema.table 格式访问表,避免歧义。
  • 禁用 select *,只查询必要字段。
  • 统一字段命名规范,下划线和驼峰明确约定。

7.2 建立查询评审机制

  • 复杂查询需经过架构评审。
  • 评审重点关注:查询逻辑、执行计划、数据量预估。
  • 评审通过才能上线。

7.3 监控与告警

  • 查询性能监控:对慢查询(超过 1 秒)进行告警。
  • 跨源查询数据量监控:记录每次查询涉及的行数,及时发现异常。
  • 连接池监控:监控各数据源连接池使用情况,避免连接耗尽。

7.4 定期清理与维护

  • 跨域配置按季度清理:没用的数据源配置就摘掉,避免"历史债务"不断叠加。
  • 元数据定期更新:数据源表结构变更后,及时更新模型文件。
  • 规则库维护:自定义优化规则要定期review,避免规则过时。

八、总结

spring boot 3 集成 apache calcite,最大的变化是:

  • 业务写 sql,不用管数据在哪
  • 适配器管"去哪儿查",自动路由到对应数据源
  • 优化器管"怎么查得更省",自动下推、自动优化
  • 工程师把精力放回业务规则,而不是堆 dao、抠连接

它不是万能钥匙,但在多源并存的公司里,它确实把"跨库地狱"挪走了一大半。

用对地方,别把它硬拽去当分布式 oltp,剩下的,交给监控、规范、演练和一点点耐心。

技术选型的本质,不是找最先进的工具,而是找最合适的工具。

参考资料:

  • apache calcite 官方文档:https://calcite.apache.org/
  • spring boot 3 官方文档
  • mybatis plus 官方文档
  • apache flink、hive 等大数据系统的 calcite 集成案例

到此这篇关于springboot3集成calcite多数据源查询实战笔记-7407466045的文章就介绍到这了,更多相关springboot3 calcite多数据源查询内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2026  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com