您现在的位置是:首页 > 文章详情

Redkale 2.8.0 发布,Java 分布式微服务框架

日期:2024-12-30点击:125

Redkale, 一个 Java 分布式微服务框架,2.2M 的 jar 可以代替传统几十 M 的第三方。包含 TCP/UDP、HTTP、RPC、依赖注入、序列化与反序列化、数据库操作、WebSocket、配置中心、消息队列等功能。 极大的简化业务开发代码,在模块高度整合的同时暴露大量底层,方便二次框架开发。

Java 并不臃肿, 臃肿的是你自己的设计思维!

本次更新主要内容

1、【新增】FilterNode 增加 Lambda 表达式

2、【新增】Convert 增加字段值转化功能,可用于脱敏

3、【新增】Convert 增加 protobuf 实现

4、【新增】DataSqlSource 增加 DataSqlMapper 和 sql 模板功能

5、【新增】增加 sql 解析器 DataNativeSqlParser 功能

6、【新增】CacheSource 增加订阅消息功能

7、【新增】CacheSource 增加令牌桶算法限流接口

8、【新增】增加方法定时任务功能 @Scheduled

9、【新增】增加方法缓存功能 @Cached

10、【新增】增加方法 MQ 消息功能 @Messaged

11、【新增】增加方法非阻塞模式 @NonBlocking

12、【新增】支持 JDK21 虚拟线程池

13、【新增】增加结果码公共类 RetCodes

14、【新增】支持 yaml 配置格式

15、【优化】重构框架模块化 ModuleEngine

16、【优化】大量包和类名的优化和调整

17、【优化】Convert 兼容 setter 返回值是 void 或类本身对象

18、【优化】HTTP 支持 Expect:100-continue

19、【优化】对象复制类 Copier 代替 Reproduce

20、【优化】javax.annotation 包迁移到 org.redkale.annotation

21、【优化】javax.persistence 包迁移到 org.redkale.persistence

22、【修复】修复 ArrayEncoder 并发下 componentEncoder 偶尔为 null 的 bug

更新详情介绍

DataSqlMapper 和 SQL 模板:

Redkale 有自己独特设计的 ORM 模块,单表、简单关联表、简单分库分表的增删改查都不需要写 sql,使用 DataSource 可满足大部分需求,同时兼容类似 Mongodb、ElasticSearch 非关系型数据库。 针对复杂的 sql,新增的 DataSqlMapper 支持 sql 模板, 不需要像 MyBatis 那样写 xml,也不需要写很多 if/else 判断语句, 框架会根据参数是否存在动态生成 sql 语句, 查询结果时带下划线的 sql 字段名和类中的驼峰字段名会自动匹配。

SELECT * FROM user_info WHERE user_id IN #{bean.userIds} OR user_name = #{bean.userName}

当 bean.userIds=null,bean.userName='hello' 时,sql 语句转换成:

  SELECT * FROM user_info WHERE user_name = 'hello'

当 bean.userIds=[1,2,3],bean.userName=null 时,sql 语句转换成:

SELECT * FROM user_info WHERE user_id IN (1,2,3)

IN 或者 NOT IN 语句当参数不为 null 而是空数组或者空 List 会进行特殊处理,IN 语句会转化成 1=1, NOT IN 语句会转化成 1=2。 当 bean.userIds = 空数组,bean.userName='hello' 时,sql 语句转换成:

 SELECT * FROM user_info WHERE 1=2 OR user_name = 'hello'

有些场景要求参数是必需的,就需要使用 ##{} 来校验参数是否必需

DELETE FROM user_info WHERE user_name = ##{bean.userName}

当 bean=null 或者 bean.userName=null 时,执行 sql 会报错 Missing parameter bean.userName

参数支持默认值 , 用逗号隔开,非 String 参数类型主要指定数据类型,目前支持 (int)、(long)、(short)、(float)、(double), 参数不存在会使用默认值:

DELETE FROM user_info WHERE user_name = #{bean.userName,aaa}

IN 参数也支持默认值:

    DELETE FROM user_info WHERE type IN #{types,(1, 2, 3)}
    DELETE FROM user_info WHERE type IN (1, 2, #{type, (int)3})
    DELETE FROM user_info WHERE user_name IN #{name, ('aa','bb','cc')}
    DELETE FROM user_info WHERE user_name ('aa', 'bb', #{type,cc})

Service 调用原生 SQL 模板示例:

public class ForumInfoService extends AbstractService {

    //查询单个记录的sql
    private static final String findSql = "SELECT f.forum_groupid, s.forum_section_color "
            + "FROM forum_info f, forum_section s "
            + " WHERE f.forumid = s.forumid AND "
            + "s.forum_sectionid = #{bean.forumSectionid} AND "
            + "f.forumid = #{bean.forumid} AND s.forum_section_color = #{bean.forumSectionColor}";

    //查询列表记录的sql
    private static final String querySql = "SELECT f.forum_groupid, s.forum_section_color "
            + "FROM forum_info f, forum_section s "
            + " WHERE f.forumid = s.forumid AND "
            + "s.forum_sectionid = #{bean.forumSectionid} AND "
            + "f.forumid = #{bean.forumid} AND s.forum_section_color = #{bean.forumSectionColor}";

    @Resource
    private DataSqlSource source;

    public ForumResult findForumResultOne(ForumBean bean) {
        return source.nativeQueryOne(ForumResult.class, findSql, Map.of("bean", bean));
    }

    public CompletableFuture> queryForumResultListAsync(ForumBean bean) {
        return source.nativeQueryListAsync(ForumResult.class, querySql, Map.of("bean", bean));
    }
    
    //翻页查询
    public Sheet queryForumResult(RowBound bound, ForumBean bean){
        return source.nativeQuerySheet(ForumResult.class, querySql, round, Map.of("bean", bean));
    }
}

也可使用 DataSqlMapper:

public interface ForumInfoMapper extends BaseMapper<ForumInfo> {

    @Sql("SELECT f.forum_groupid, s.forum_section_color "
            + "FROM forum_info f, forum_section s "
            + "WHERE f.forumid = s.forumid "
            + "AND s.forum_sectionid = #{bean.forumSectionid} "
            + "AND f.forumid = #{bean.forumid} "
            + "AND s.forum_section_color = #{bean.forumSectionColor}")
    public ForumResult findForumResultOne(ForumBean bean);

    @Sql("SELECT f.forum_groupid, s.forum_section_color "
            + "FROM forum_info f, forum_section s "
            + "WHERE f.forumid = s.forumid AND "
            + "s.forum_sectionid = #{bean.forumSectionid} "
            + "AND f.forumid = #{bean.forumid} "
            + "AND s.forum_section_color = #{bean.forumSectionColor}")
    public CompletableFuture findForumResultOneAsync(ForumBean bean);

    //翻页查询
    @Sql("SELECT f.forum_groupid, s.forum_section_color "
            + "FROM forum_info f, forum_section s "
            + "WHERE f.forumid = s.forumid AND "
            + "s.forum_sectionid = #{bean.forumSectionid} "
            + "AND f.forumid = #{bean.forumid} "
            + "AND s.forum_section_color = #{bean.forumSectionColor}")
    public Sheet queryForumResult(RowBound bound, ForumBean bean);

    @Sql("UPDATE forum_section s "
            + " SET s.forum_sectionid = '' "
            + " WHERE s.forum_section_color = ##{bean.forumSectionColor}")
    public int updateForumResult(@Param("bean") ForumBean bean0);
}

支持多数据源:

public class ForumInfoService implements Service {

    //基于默认数据源创建的DataSqlMapper
    @Resource
    private ForumInfoMapper forumInfoMapper;
    
    //基于platf数据源创建的DataSqlMapper
    @Resource(name = "platf")
    private ForumInfoMapper forumInfoMapper2;
}

方法缓存 Cached (只能标记在 Service 非 final/static 的 protoectd 或 public 的方法上):

将结果进行本地缓存 30 秒且远程缓存 60 秒:

    @Cached(key = "name", localExpire = "30", remoteExpire = "60")
    public String getName() {
        return "haha";
    }

以参数 code 为 key 将结果进行本地缓存 (时长由环境变量 env.cache.expire 配置,没配置采用默认值 30 秒):

    @Cached(name = "name", key = "#{code}", localExpire = "${env.cache.expire:30}")
    public CompletableFuture getNameAsync(String code) {
        return redis.getStringAsync(code);
    }

以参数 code+map.id 为 key 将结果进行远程缓存 60 毫秒:


    @Resource
    private CachedManager cachedManager;

    //实时修改远程缓存的key值
    public void updateName(String code, Map<String, Long> map) {
        String key = code + "_" + map.get("id");
        cachedManager.remoteSetString("name", key, code + "-" + map, Duration.ofMillis(60));
    }

    @Cached(name = "name", key = "#{code}_#{map.id}", remoteExpire = "60", timeUnit = TimeUnit.MILLISECONDS)
    public String getName(String code, Map<String, Long> map) {
        return code + "-" + map;
    }

动态调整缓存时长

    @Resource
    private CachedManager cachedManager;

    @Cached(name = "name", key = "#{code}_#{map.id}", remoteExpire = "60", timeUnit = TimeUnit.MILLISECONDS)
    public String getName(String code, Map<String, Long> map) {
        return code + "-" + map;
    }

    public void updateExpire() {
        Duration expire = Duration.ofMillis(600);
        cachedManager.acceptCachedAction("name", action -> {
            //将缓存时长改成600毫秒,并开启本地缓存
            action.setLocalExpire(expire);
            action.setRemoteExpire(expire);
        });
    }

配置多个多缓存器, 使用 @Resource 注入多个 CachedManager

    <cached name="" enabled="true" remote="redis_1" broadcastable="true"/>
    <cached name="backup" enabled="true" remote="redis_2" broadcastable="true"/>
    //第一个缓存器
    @Resource
    private CachedManager cachedManager;

    //第二个缓存器
    @Resource(name = "backup")
    private CachedManager cachedManager2;

    //第一个缓存器实时修改远程缓存的key值
    public void updateName(String code, Map<String, Long> map) {
        String key = code + "_" + map.get("id");
        cachedManager.remoteSetString("name", key, code + "_" + map, Duration.ofMillis(60));
    }

    //使用第一个缓存器
    @Cached(name = "name", key = "#{code}_#{map.id}", remoteExpire = "60", timeUnit = TimeUnit.MILLISECONDS)
    public String getName(String code, Map<String, Long> map) {
        return code + "-" + map;
    }

    //使用第二个缓存器
    @Cached(manager = "backup", name = "name2", key = "#{code}_#{map.id}_2", remoteExpire = "60", timeUnit = TimeUnit.MILLISECONDS)
    public String getName2(String code, Map<String, Long> map) {
        return code + "-" + map;
    }

定时任务 Scheduled:

每秒执行:

    @Scheduled(cron = "0/1 * * * * ?")
    public void task1() {
        System.out.println(Times.nowMillis() + "执行一次");
    }

数值配置 , 系统启动后延迟 10 分钟后每 60 分钟执行一次:

    @Scheduled(fixedDelay = "10", fixedRate = "60", timeUnit = TimeUnit.MINUTES)
    private void task3() {
        System.out.println(Times.nowMillis() + "执行一次");
    }

环境配置 , 定时间隔时间由环境变量 env.scheduled.fixedRate 配置,没配置采用默认值 60 秒):

    @Scheduled(fixedRate = "${env.scheduled.fixedRate:60}")
    public String task2() {
        System.out.println(Times.nowMillis() + "执行一次");
        return "";
    }

使用 Xxl-Job

Scheduled 可以采用第三方实现,官方扩展包 redkale-plugins 提供了 xxl-job 实现,且不依赖 xxl-job 包。

pom 依赖:

    <dependency>
        <groupId>org.redkalexgroupId>
        <artifactId>redkale-pluginsartifactId>
        <version>2.8.0version>
    dependency> 

配置文件:

    "true">    
        <xxljob addresses="http://localhost:8080/xxl-job-admin" 
                executorName="redkale-examples" 
                ip="127.0.0.1" -- 可选 -->
                port="5678"    
                accessToken="default_token" />
    scheduled>

使用方法:

    @Scheduled(name = "testTask")
    public void runTask(ScheduledEvent event) {
        System.out.println("xxl-job参数param: " + event.getString("param"));
        System.out.println("xxl-job参数index: " + event.getInteger("index"));
        System.out.println("xxl-job参数total: " + event.getInteger("total"));
    }

MQ 消息生成消费:

MessageAgent 是消息中心抽象接口。

配置参数

<mq name="mymq" type="kafka">
    <servers value="127.0.0.1:9092"/>
    <consumer autoload="true"/>
mq>

消息消费

通过 @ResourceConsumer MessageConsumer 接口实现消费

@ResourceConsumer(mq = "mymq", topics = "test_bean_topic")
public class TestMessageConsumer implements MessageConsumer<TestBean> {

    @Override
    public void init(AnyValue config) {
        System.out.println("执行 TestMessageConsumer.init");
    }

    @Override
    public void onMessage(MessageConext context, TestBean message) {
        System.out.println("消费消息, message: " + message);
    }

    @Override
    public void destroy(AnyValue config) {
        System.out.println("执行 TestMessageConsumer.destroy");
    }
}

通过 Service 里标记 @Messaged 的方法实现消费,方法只能是 protected public, 不能是 finalstatic

public class TestMessageService extends AbstractService {

    @Messaged(mq = "mymq", topics = "test_bean_topic")
    protected void runMessage(TestBean message) {
        System.out.println("消费消息,  message: " + message);
    }
}

通过 @Component 的 Service 里标记 @Messaged 的方法实现消费,方法只能是 public

@Component
public final class TestMessageService extends AbstractService {

    @Messaged(mq = "mymq", topics = "test_bean_topic")
    public int runMessage(TestBean message) {
        System.out.println("消费消息,  message: " + message);
        return 0;
    }
}

消息生成

通过 @Component 的 Service 里标记 @Messaged 的方法实现消费,方法只能是 public

@Component
public class TestMessageService extends AbstractService {

    @ResourceProducer(mq = "mymq")
    private MessageProducer producer;

    public void sendMessage() {
        TestBean bean = new TestBean(12345, "this is a message");
        System.out.println("生产消息: " + bean);
        producer.sendMessage("test_bean_topic", bean);
    }
}

Topic 管理

通过 MessageManager 操作 topic。

public class TestMessageManager extends AbstractService {

    @Resource(name = "mymq")
    private MessageManager manager;

    // 创建topic
    public void initTopic() {
        manager.createTopic("topic_1", "topic_2").join();
    }

    // 删除topic
    public void deleteTopic() {
        manager.deleteTopic("topic_1", "topic_2").join();
    }

    // 查询topic
    public void printTopic() {
        List topics = manager.queryTopic().join();
    }
}
原文链接:https://www.oschina.net/news/327168/redkale-2-8-0-released
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章