首页 文章 精选 留言 我的

精选列表

搜索[整合],共10003篇文章
优秀的个人博客,低调大师

Flink 1.10 SQL、HiveCatalog 与事件时间整合示例

Flink 1.10 与 1.9 相比又是个创新版本,在我们感兴趣的很多方面都有改进,特别是 Flink SQL。本文用根据埋点日志计算 PV、UV 的简单示例来体验 Flink 1.10 的两个重要新特性: 一是 SQL DDL 对事件时间的支持;二是 Hive Metastore 作为 Flink 的元数据存储(即 HiveCatalog)。 这两点将会为我们构建实时数仓提供很大的便利。 添加依赖项 示例采用 Hive 版本为 1.1.0,Kafka 版本为 0.11.0.2。 要使 Flink 与 Hive 集成以使用 HiveCatalog,需要先将以下 JAR 包放在 ${FLINK_HOME}/lib 目录下。 flink-connector-hive_2.11-1.10.0.jar flink-shaded-hadoop-2-uber-2.6.5-8.0.jar hive-metastore-1.1.0.jar hive-exec-1.1.0.jar libfb303-0.9.2.jar 后三个 JAR 包都是 Hive 自带的,可以在 ${HIVE_HOME}/lib 目录下找到。前两个可以通过阿里云 Maven 搜索 GAV 找到并手动下载(groupId 都是org.apache.flink)。 再在 pom.xml 内添加相关的 Maven 依赖。 Maven 下载: https://maven.aliyun.com/mvn/search <properties> <scala.bin.version>2.11</scala.bin.version> <flink.version>1.10.0</flink.version> <hive.version>1.1.0</hive.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-sql-connector-kafka-0.11_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_${scala.bin.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> </dependencies> 最后,找到 Hive 的配置文件 hive-site.xml,准备工作就完成了。 注册 HiveCatalog、创建数据库 不多废话了,直接上代码,简洁易懂。 val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(5) streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val tableEnvSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build() val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings) val catalog = new HiveCatalog( "rtdw", // catalog name "default", // default database "/Users/lmagic/develop", // Hive config (hive-site.xml) directory "1.1.0" // Hive version ) tableEnv.registerCatalog("rtdw", catalog) tableEnv.useCatalog("rtdw") val createDbSql = "CREATE DATABASE IF NOT EXISTS rtdw.ods" tableEnv.sqlUpdate(createDbSql) 创建 Kafka 流表并指定事件时间 我们的埋点日志存储在指定的 Kafka topic 里,为 JSON 格式,简化版 schema 大致如下。 "eventType": "clickBuyNow", "userId": "97470180", "shareUserId": "", "platform": "xyz", "columnType": "merchDetail", "merchandiseId": "12727495", "fromType": "wxapp", "siteId": "20392", "categoryId": "", "ts": 1585136092541 其中 ts 字段就是埋点事件的时间戳(毫秒)。在 Flink 1.9 时代,用 CREATE TABLE 语句创建流表时是无法指定事件时间的,只能默认用处理时间。而在 Flink 1.10 下,可以这样写。 CREATE TABLE rtdw.ods.streaming_user_active_log ( eventType STRING COMMENT '...', userId STRING, shareUserId STRING, platform STRING, columnType STRING, merchandiseId STRING, fromType STRING, siteId STRING, categoryId STRING, ts BIGINT, procTime AS PROCTIME(), -- 处理时间 eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间 WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印 ) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'ng_log_par_extracted', 'connector.startup-mode' = 'latest-offset', -- 指定起始offset位置 'connector.properties.zookeeper.connect' = 'zk109:2181,zk110:2181,zk111:2181', 'connector.properties.bootstrap.servers' = 'kafka112:9092,kafka113:9092,kafka114:9092', 'connector.properties.group.id' = 'rtdw_group_test_1', 'format.type' = 'json', 'format.derive-schema' = 'true', -- 由表schema自动推导解析JSON 'update-mode' = 'append' ) Flink SQL 引入了计算列(computed column)的概念,其语法为 column_name AS computed_column_expression,它的作用是在表中产生数据源 schema 不存在的列,并且可以利用原有的列、各种运算符及内置函数。比如在以上 SQL 语句中,就利用内置的 PROCTIME() 函数生成了处理时间列,并利用原有的 ts 字段与 FROM_UNIXTIME()、TO_TIMESTAMP() 两个时间转换函数生成了事件时间列。 为什么 ts 字段不能直接用作事件时间呢?因为 Flink SQL 规定时间特征必须是 TIMESTAMP(3) 类型,即形如"yyyy-MM-ddTHH:mm:ssZ"格式的字符串,Unix 时间戳自然是不行的,所以要先转换一波。 既然有了事件时间,那么自然要有水印。Flink SQL 引入了 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 的语法来产生水印,有以下两种通用的做法: 单调不减水印(对应 DataStream API 的 AscendingTimestampExtractor) WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND 有界乱序水印(对应 DataStream API 的 BoundedOutOfOrdernessTimestampExtractor) WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'n' TIME_UNIT 上文的 SQL 语句中就是设定了 10 秒的乱序区间。如果看官对水印、AscendingTimestampExtractor 和 BoundedOutOfOrdernessTimestampExtractor 不熟的话,可以参见之前的这篇,就能理解为什么会是这样的语法了。 https://www.jianshu.com/p/c612e95a5028 下面来正式建表。 val createTableSql = """ |上文的SQL语句 |...... """.stripMargin tableEnv.sqlUpdate(createTableSql) 执行完毕后,我们还可以去到 Hive 执行 DESCRIBE FORMATTED ods.streaming_user_active_log 语句,能够发现该表并没有事实上的列,而所有属性(包括 schema、connector、format 等等)都作为元数据记录在了 Hive Metastore 中。 Flink SQL 创建的表都会带有一个标记属性 is_generic=true,图中未示出。 开窗计算 PV、UV 用30秒的滚动窗口,按事件类型来分组,查询语句如下。 SELECT eventType, TUMBLE_START(eventTime, INTERVAL '30' SECOND) AS windowStart, TUMBLE_END(eventTime, INTERVAL '30' SECOND) AS windowEnd, COUNT(userId) AS pv, COUNT(DISTINCT userId) AS uv FROM rtdw.ods.streaming_user_active_log WHERE platform = 'xyz' GROUP BY eventType, TUMBLE(eventTime, INTERVAL '30' SECOND) 关于窗口在 SQL 里的表达方式请参见官方文档。1.10 版本 SQL 的官方文档写的还是比较可以的。 SQL 文档: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows 懒得再输出到一个结果表了,直接转换成流打到屏幕上。 val queryActiveSql = """ |...... |...... """.stripMargin val result = tableEnv.sqlQuery(queryActiveSql) result .toAppendStream[Row] .print() .setParallelism(1) 敏感数据较多,就不一一截图了。以上是我分享的两个示例,感兴趣的同学也可以动手试试。

优秀的个人博客,低调大师

spring-boot项目整合Disruptor的初步使用

1.在项目的pom文件中配置 <dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>r09</version> </dependency> 2.创建BaseQueueHelper /** * lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。 * * 调用init()时才真正启动线程开始处理 系统退出自动清理资源. */ public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> { /** * 记录所有的队列,系统退出时统一清理资源 */ private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>(); /** * Disruptor 对象 */ private Disruptor<E> disruptor; /** * RingBuffer */ private RingBuffer<E> ringBuffer; /** * initQueue */ private List<D> initQueue = new ArrayList<D>(); /** * 队列大小 * * @return 队列长度,必须是2的幂 */ protected abstract int getQueueSize(); /** * 事件工厂 * * @return EventFactory */ protected abstract EventFactory<E> eventFactory(); /** * 事件消费者 * * @return WorkHandler[] */ protected abstract WorkHandler[] getHandler(); /** * 初始化 */ public void init() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build(); disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy()); disruptor.setDefaultExceptionHandler(new MyHandlerException()); disruptor.handleEventsWithWorkerPool(getHandler()); ringBuffer = disruptor.start(); //初始化数据发布 for (D data : initQueue) { ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } //加入资源清理钩子 synchronized (queueHelperList) { if (queueHelperList.isEmpty()) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { for (BaseQueueHelper baseQueueHelper : queueHelperList) { baseQueueHelper.shutdown(); } } }); } queueHelperList.add(this); } } /** * 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU, * 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景. * * @return WaitStrategy */ protected abstract WaitStrategy getStrategy(); /** * 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理. */ public synchronized void publishEvent(D data) { if (ringBuffer == null) { initQueue.add(data); return; } ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } /** * 关闭队列 */ public void shutdown() { disruptor.shutdown(); } } 3.创建MyHandlerException public class MyHandlerException implements ExceptionHandler { private Logger logger = LoggerFactory.getLogger(MyHandlerException.class); /* * (non-Javadoc) 运行过程中发生时的异常 * * @see * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable * , long, java.lang.Object) */ @Override public void handleEventException(Throwable ex, long sequence, Object event) { ex.printStackTrace(); logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage()); } /* * (non-Javadoc) 启动时的异常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. * Throwable) */ @Override public void handleOnStartException(Throwable ex) { logger.error("start disruptor error ==[{}]!", ex.getMessage()); } /* * (non-Javadoc) 关闭时的异常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang * .Throwable) */ @Override public void handleOnShutdownException(Throwable ex) { logger.error("shutdown disruptor error ==[{}]!", ex.getMessage()); } } 4.创建ValueWrapper public abstract class ValueWrapper<T> { private T value; public ValueWrapper() {} public ValueWrapper(T value) { this.value = value; } public T getValue() { return value; } public void setValue(T value) { this.value = value; } } 5.创建EventFactory public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> { @Override public SeriesDataEvent newInstance() { return new SeriesDataEvent(); } } 创建DisruptorConfig @Configuration @ComponentScan(value = {"com.demo.disruptor"}) public class DisruptorConfig { /** * smsParamEventHandler1 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler1() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler2 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler2() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler3 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler3() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler4 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler4() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler5 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler5() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler5 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler6() { return new SeriesDataEventHandler(); } } 7.创建SeriesData public class SeriesData { private String deviceInfoStr; public SeriesData() { } public SeriesData(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } public String getDeviceInfoStr() { return deviceInfoStr; } public void setDeviceInfoStr(String deviceInfoStr) { this.deviceInfoStr = deviceInfoStr; } @Override public String toString() { return "SeriesData{" + "deviceInfoStr='" + deviceInfoStr + '\'' + '}'; } } 8.创建SeriesDataEvent public class SeriesDataEvent extends ValueWrapper<SeriesData> { } 9.创建SeriesDataEventQueueHelper @Component public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean { private static final int QUEUE_SIZE = 1024; @Autowired private List<SeriesDataEventHandler> seriesDataEventHandler; @Override protected int getQueueSize() { return QUEUE_SIZE; } @Override protected com.lmax.disruptor.EventFactory eventFactory() { return new EventFactory(); } @Override protected WorkHandler[] getHandler() { int size = seriesDataEventHandler.size(); SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]); return paramEventHandlers; } @Override protected WaitStrategy getStrategy() { return new BlockingWaitStrategy(); //return new YieldingWaitStrategy(); } @Override public void afterPropertiesSet() throws Exception { this.init(); } } 10.创建SeriesDataEventHandler public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> { private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class); @Autowired private SocketService socketService; @Override public void onEvent(SeriesDataEvent event) { if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) { logger.warn("receiver series data is empty!"); } logger.error("hello word!"); } } 11.使用操作 @Autowired private SeriesDataEventQueueHelper seriesDataEventQueueHelper; @Test public void demo(){ seriesDataEventQueueHelper.publishEvent(new SeriesData("hello word")); } 12.后面推出Disruptor的各种操作

优秀的个人博客,低调大师

Rancher 开源 Rio,整合 Istio、Knative 与 Kubernetes 能力

近日 Rancher 开源了一个可以在任何符合标准的 Kubernetes 集群上使用的 MicroPaaS 平台 Rio。 Rio 由一些 Kubernetes 自定义资源和一个可选的 CLI 构成,用户可以轻松地将服务部署到 Kubernetes,自动获得持续交付、DNS、HTTPS、路由、监控、自动扩缩容、金丝雀部署与 Git 触发构建等功能。 Rio 将 Istio、Knative 和 Kubernetes 的力量结合在一起,并像使用 Docker 一样简单地使用它们。 Rio 主要功能包括: 自动 DNS 和 HTTPS HTTP 负载均衡 HTTP 路由 HTTP 指标 自动扩缩容 金丝雀部署 Git 触发的部署 详情查看官方公告。 查看项目:https://www.oschina.net/p/rio

优秀的个人博客,低调大师

shiro的入门实例-shiro于spring的整合

shiro是一款java安全框架、简单而且可以满足实际的工作需要 第一步、导入maven依赖 <!--shiro--> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-core</artifactId> <version>${org.apache.shiro.version}</version> </dependency> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-web</artifactId> <version>${org.apache.shiro.version}</version> </dependency> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-spring</artifactId> <version>${org.apache.shiro.version}</version> </dependency> <dependency> <groupId>org.apache.shiro</groupId> <artifactId>shiro-ehcache</artifactId> <version>${org.apache.shiro.version}</version> </dependency> 第二步、在项目中定义shiro的过滤器(shiro的实现主要是通过filter实现) <!--ShiroSecurityfilter--> <filter> <filter-name>shiroFilter</filter-name> <filter-class>org.springframework.web.filter.DelegatingFilterProxy</filter-class> <init-param> <param-name>targetFilterLifecycle</param-name> <param-value>true</param-value> </init-param> </filter> <filter-mapping> <filter-name>shiroFilter</filter-name> <url-pattern>/*</url-pattern> <dispatcher>REQUEST</dispatcher> </filter-mapping> 第三步、创建一个Realm publicclassUserRealmextendsAuthorizingRealm{ @Autowired privateUserBizbiz; //验证用户信息,认证的实现 @Override protectedAuthenticationInfodoGetAuthenticationInfo(AuthenticationTokenauthenticationToken)throwsAuthenticationException{ Stringuserno=(String)authenticationToken.getPrincipal(); Stringpassword=newString((char[])authenticationToken.getCredentials()); Result<RcUser>result=biz.login(userno,password); if(result.isStatus()){ Sessionsession=SecurityUtils.getSubject().getSession(); session.setAttribute(Constants.Token.RONCOO,userno); RcUseruser=result.getResultData(); returnnewSimpleAuthenticationInfo(user.getUserNo(),user.getPassword(),getName()); } returnnull; } //验证用户的权限,实现认证 @Override protectedAuthorizationInfodoGetAuthorizationInfo(PrincipalCollectionprincipals){ SimpleAuthorizationInfoauthorizationInfo=newSimpleAuthorizationInfo(); Stringuserno=(String)principals.getPrimaryPrincipal(); Result<RcUser>result=biz.queryByUserNo(userno); if(result.isStatus()){ Result<List<RcRole>>resultRole=biz.queryRoles(result.getResultData().getId()); if(resultRole.isStatus()){ //获取角色 HashSet<String>roles=newHashSet<String>(); for(RcRolercRole:resultRole.getResultData()){ roles.add(rcRole.getRoleValue()); } System.out.println("角色:"+roles); authorizationInfo.setRoles(roles); //获取权限 Result<List<RcPermission>>resultPermission=biz.queryPermissions(resultRole.getResultData()); if(resultPermission.isStatus()){ HashSet<String>permissions=newHashSet<String>(); for(RcPermissionrcPermission:resultPermission.getResultData()){ permissions.add(rcPermission.getPermissionsValue()); } System.out.println("权限:"+permissions); authorizationInfo.setStringPermissions(permissions); } } } returnauthorizationInfo; } } 第四步、添加shiro配置 1、shiro缓存 <?xmlversion="1.0"encoding="UTF-8"?> <!DOCTYPExml> <ehcacheupdateCheck="false"name="shiroCache"> <!--http://ehcache.org/ehcache.xml--> <defaultCache maxElementsInMemory="10000" eternal="false" timeToIdleSeconds="120" timeToLiveSeconds="120" overflowToDisk="false" diskPersistent="false" diskExpiryThreadIntervalSeconds="120" /> </ehcache> 2、在spring的core配置文件中配置shiro <description>Shiro安全配置</description> <beanid="userRealm"class="com.roncoo.adminlte.controller.realm.UserRealm"/> <beanid="securityManager"class="org.apache.shiro.web.mgt.DefaultWebSecurityManager"> <propertyname="realm"ref="userRealm"/> <propertyname="cacheManager"ref="shiroEhcacheManager"/> </bean> <!--Shiro过滤器--> <beanid="shiroFilter"class="org.apache.shiro.spring.web.ShiroFilterFactoryBean"> <!--Shiro的核心安全接口,这个属性是必须的--> <propertyname="securityManager"ref="securityManager"/> <!--身份认证失败,则跳转到登录页面的配置--> <propertyname="loginUrl"value="/login"/> <propertyname="successUrl"value="/certification"/> <propertyname="unauthorizedUrl"value="/error"/> <!--Shiro连接约束配置,即过滤链的定义--> <propertyname="filterChainDefinitions"> <value> /login=authc /exit=anon /admin/security/list=authcBasic,perms[admin:read] /admin/security/save=authcBasic,perms[admin:insert] /admin/security/update=authcBasic,perms[admin:update] /admin/security/delete=authcBasic,perms[admin:delete] </value> </property> </bean> <!--用户授权信息Cache,采用EhCache--> <beanid="shiroEhcacheManager"class="org.apache.shiro.cache.ehcache.EhCacheManager"> <propertyname="cacheManagerConfigFile"value="classpath:ehcache/ehcache-shiro.xml"/> </bean> <!--保证实现了Shiro内部lifecycle函数的bean执行--> <beanid="lifecycleBeanPostProcessor"class="org.apache.shiro.spring.LifecycleBeanPostProcessor"/> <!--AOP式方法级权限检查--> <bean class="org.springframework.aop.framework.autoproxy.DefaultAdvisorAutoProxyCreator" depends-on="lifecycleBeanPostProcessor"> <propertyname="proxyTargetClass"value="true"/> </bean> <bean class="org.apache.shiro.spring.security.interceptor.AuthorizationAttributeSourceAdvisor"> <propertyname="securityManager"ref="securityManager"/> </bean> 第五步、shiro退出登录的实现 第一种方式 /** *退出登陆操作 */ @RequestMapping(value="/exit",method=RequestMethod.GET) publicStringexit(RedirectAttributesredirectAttributes,HttpSessionsession){ session.removeAttribute(Constants.Token.RONCOO); SecurityUtils.getSubject().logout(); redirectAttributes.addFlashAttribute("msg","您已经安全退出"); returnredirect("/login"); } 第二种方式:在shiroFilter的约束配置中配置 <!--Shiro连接约束配置,即过滤链的定义--> <propertyname="filterChainDefinitions"> <value> /exit=logout </value> </property>

优秀的个人博客,低调大师

SpringBoot开发案例之整合Quartz注入Service

前段时间做了一个基于SpringBoot和Quartz任务管理系统(脚手架而已),很多功能不是特别完善,由于工作原因,断断续续一直在更新中,码云上有个小伙伴提问说:Job中service自动注入报错怎么解决?正好之前做的项目中有使用到注入相关的功能,顺便也集成进去。 缘由 简单来说就是quartz中的Job是在quartz中实例化出来的,不受spring的管理,所以就导致注入不进去了。 解决 定义SpringJobFactory类: /** * 解决spring bean注入Job的问题 */ @Component public class SpringJobFactory extends AdaptableJobFactory { @Autowired private AutowireCapableBeanFactory capableBeanFactory; @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { // 调用父类的方法 Object jobInstance = super.createJobInstance(bundle); // 进行注入 capableBeanFactory.autowireBean(jobInstance); return jobInstance; } } quartz配置: /** * quartz配置 * 创建者 科帮网 * 创建时间 2018年4月3日 */ @Configuration public class SchedulerConfig { @Autowired private SpringJobFactory springJobFactory; @Bean(name="SchedulerFactory") public SchedulerFactoryBean schedulerFactoryBean() throws IOException { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setAutoStartup(true); factory.setStartupDelay(5);//延时5秒启动 factory.setQuartzProperties(quartzProperties()); //注意这里是重点 factory.setJobFactory(springJobFactory); return factory; } @Bean public Properties quartzProperties() throws IOException { PropertiesFactoryBean propertiesFactoryBean = new PropertiesFactoryBean(); propertiesFactoryBean.setLocation(new ClassPathResource("/quartz.properties")); propertiesFactoryBean.afterPropertiesSet(); return propertiesFactoryBean.getObject(); } /* * quartz初始化监听器 */ @Bean public QuartzInitializerListener executorListener() { return new QuartzInitializerListener(); } /* * 通过SchedulerFactoryBean获取Scheduler的实例 */ @Bean(name="Scheduler") public Scheduler scheduler() throws IOException { return schedulerFactoryBean().getScheduler(); } } 测试任务案例TestJob: /** * 实现序列化接口、防止重启应用出现quartz Couldn't retrieve job because a required class was not found 的问题 */ public class TestJob implements Job,Serializable { private static final long serialVersionUID = 1L; @Autowired private IJobService jobService; @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println(jobService);//注入jobService 执行相关业务操作 System.out.println("任务执行成功"); } } 项目源码: https://gitee.com/52itstyle/spring-boot-quartz

资源下载

更多资源
优质分享App

优质分享App

近一个月的开发和优化,本站点的第一个app全新上线。该app采用极致压缩,本体才4.36MB。系统里面做了大量数据访问、缓存优化。方便用户在手机上查看文章。后续会推出HarmonyOS的适配版本。

Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle

Oracle

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Eclipse

Eclipse

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。