首页 文章 精选 留言 我的

精选列表

搜索[整合],共10003篇文章
优秀的个人博客,低调大师

Java高级面试(一)消息队列问题整合

①消息队列运用场景: 比如有一个订单系统,每次下一个订单的时候就会发送一条消息队列里面(activeMQ)然后又有一个库存系统负责获取消息队列里面的消息,最后更新库存 ②为什么使用消息队列: 解耦: 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 冗余: 有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。 扩展性: 因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。 灵活性 & 峰值处理能力: 当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。 可恢复性: 当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。 送达保证: 消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。 排序保证: 在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。 缓冲: 在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。 理解数据流: 在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。 异步通信: 很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。 ③消息队列的优缺点: 优点: 1)解耦 场景:当A系统需要发送数据到BCD三个系统时。 如果使用接口调用,A系统是和BCD系统耦合在一起的,需要考虑BCD系统挂了怎么办?BCD系统消费失败怎么办?如果E系统也需要这个数据?如果B系统现在不需要这个数据? 如果使用MQ,A系统产生的数据,只要保证消息成功发送到MQ中。各个系统需要数据,自己到MQ中消费。如果新系统需要数据,直接从MQ里消费,如果老系统不需要数据了,直接取消对MQ的消费。 通过MQ,A系统和其他系统彻底解耦了。 2)异步 场景:A系统接到请求,需要在本系统写库,还需要在BCD三个系统写库。 自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。如果同步调用,最终请求总延时是 3 + 300 + 450 + 200 = 953ms。 如果使用MQ,A系统发送三个消息到三个MQ,假设耗时5ms,则A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。 3)消峰 场景:每天 0:00 到 12:00,A 系统风平浪静,每秒并发请求数量就 50 个。结果每次一到 12:00 ~ 13:00 ,每秒并发请求数量突然会暴增到 5k+ 条。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。 一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。 如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。 缺点: 1)系统可用性降低 外部依赖的系统多了,增加了消息队列系统。 2)系统复杂度提高 你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性? 3)一致性问题 数据可能不一致。 ③kafka、activemq、rabbitmq 、rocketmq优缺点: ④如何保证消息队列的高可用: RabbitMQ 的高可用性 RabbitMQ 是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。 RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。 单机模式 单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的?,没人生产用单机模式。 普通集群模式(无高可用性) 普通集群模式,意思就是在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都同步 queue 的元数据(元数据可以认为是 queue 的一些配置信息,通过元数据,可以找到 queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从 queue 所在实例上拉取数据过来。 这种方式确实很麻烦,也不怎么好,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。 而且如果那个放 queue 的实例宕机了,会导致接下来其他实例就无法从那个实例拉取,如果你开启了消息持久化,让 RabbitMQ 落地存储消息的话,消息不一定会丢,得等这个实例恢复了,然后才可以继续从这个 queue 拉取数据。 所以这个事儿就比较尴尬了,这就没有什么所谓的高可用性,这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。 镜像集群模式(高可用性) 这种模式,才是所谓的 RabbitMQ 的高可用模式。跟普通集群模式不一样的是,在镜像集群模式下,你创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据的意思。然后每次你写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。 那么如何开启这个镜像集群模式呢?其实很简单,RabbitMQ 有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建 queue 的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。 这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个 queue 的完整数据,别的 consumer 都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!第二,这么玩儿,不是分布式的,就没有扩展性可言了,如果某个 queue 负载很重,你加机器,新增的机器也包含了这个 queue 的所有数据,并没有办法线性扩展你的 queue。你想,如果这个 queue 的数据量很大,大到这个机器上的容量无法容纳了,此时该怎么办呢? Kafka 的高可用性: Kafka 一个最基本的架构认识:由多个 broker 组成,每个 broker 是一个节点;你创建一个 topic,这个 topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 就放一部分数据。 这就是天然的分布式消息队列,就是说一个 topic 的数据,是分散放在多个机器上的,每个机器就放一部分数据。 实际上 RabbmitMQ 之类的,并不是分布式消息队列,它就是传统的消息队列,只不过提供了一些集群、HA(High Availability, 高可用性) 的机制而已,因为无论怎么玩儿,RabbitMQ 一个 queue 的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个 queue 的完整数据。 Kafka 0.8 以前,是没有 HA 机制的,就是任何一个 broker 宕机了,那个 broker 上的 partition 就废了,没法写也没法读,没有什么高可用性可言。 比如说,我们假设创建了一个 topic,指定其 partition 数量是 3 个,分别在三台机器上。但是,如果第二台机器宕机了,会导致这个 topic 的 1/3 的数据就丢了,因此这个是做不到高可用的。 Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。写的时候,leader 会负责把数据同步到所有 follower 上去,读的时候就直接读 leader 上的数据即可。只能读写 leader?很简单,要是你可以随意读写每个 follower,那么就要care 数据一致性的问题,系统复杂度太高,很容易出问题。Kafka 会均匀地将一个 partition 的所有 replica 分布在不同的机器上,这样才可以提高容错性。 这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的。如果这个宕机的 broker 上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。这就有所谓的高可用性了。 写数据的时候,生产者就写 leader,然后 leader 将数据落地写本地磁盘,接着其他 follower 自己主动从 leader 来 pull 数据。一旦所有 follower 同步好数据了,就会发送 ack 给 leader,leader 收到所有 follower 的 ack 之后,就会返回写成功的消息给生产者。(当然,这只是其中一种模式,还可以适当调整这个行为) 消费的时候,只会从 leader 去读,但是只有当一个消息已经被所有 follower 都同步成功返回 ack 的时候,这个消息才会被消费者读到。 看到这里,相信你大致明白了 Kafka 是如何保证高可用机制的了,对吧?不至于一无所知,现场还能给面试官画画图。要是遇上面试官确实是 Kafka 高手,深挖了问,那你只能说不好意思,太深入的你没研究过。 ⑤怎么确保消息的顺序性: 举个例子: 有一个mysql binlog 同步系统,压力是非常大的,日同步数据达到了上亿级别,就是将数据从一个 mysql 库当中原封不动的同步到另一个 mysql 库当中去(比较常见的就是大数据组需要干的事情)。 假设在 mysql 当中增删改了一条数据,对应的生产了三条 增删改的 binlog 日志,接着这三条 binlog 发送到 MQ 里面去,然后进行消费。这时候就得保证消息队列的顺序性了。不然本来是:增加、修改、删除;你愣是换了顺序给执行成删除、修改、增加,不全错了么。 再来看看顺序会错乱的俩场景: RabbitMQ:一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。具体如下图所示: Kafka:比如说建了一个 topic,有三个 partition。生产者在写的时候,其实可以指定一个 key,比如说我们指定了某个订单 id 作为 key,那么这个订单相关的数据,一定会被分发到同一个 partition 中去,而且这个 partition 中的数据一定是有顺序的。消费者从 partition 中取出来数据的时候,也一定是有顺序的。到这里,顺序还是 ok 的,没有错乱。接着,我们在消费者里可能会搞多个线程来并发处理消息。因为如果消费者是单线程消费处理,而处理比较耗时的话,比如处理一条消息耗时几十 ms,那么 1 秒钟只能处理几十条消息,这吞吐量太低了。而多个线程并发跑的话,顺序可能就乱掉了。具体如下图: 解决方案: RabbitMQ: 拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。具体如下图: Kafka: 一个 topic,一个 partition,一个 consumer,内部单线程消费,单线程吞吐量太低,一般不会用这个。 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。具体如下图所示:

优秀的个人博客,低调大师

Django+Django-Celery+Celery的整合实战

本篇文章主要是由于计划使用django写一个计划任务出来,可以定时的轮换值班人员名称或者定时执行脚本等功能,百度无数坑之后,终于可以凑合把这套东西部署上。本人英文不好,英文好或者希望深入学习或使用的人,建议去参考官方文档,而且本篇的记录不一定正确,仅仅实现crontab 的功能而已。 希望深入学习的人可以参考http://docs.jinkan.org/docs/celery/。 首先简单介绍一下,Celery 是一个强大的分布式任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。它的架构组成如下图 可以看到,Celery 主要包含以下几个模块: 任务模块 Task 包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。 消息中间件 Broker Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。 任务执行单元 Worker Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。 任务结果存储 Backend Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, Redis 和 MongoDB 等。 异步任务使用 Celery 实现异步任务主要包含三个步骤: 创建一个 Celery 实例启动 Celery Worker应用程序调用异步任务 一、快速入门 本地环境: OS:centOS6.5 django-1.9 python-2.7.11 celery==3.1.20 django-celery python、pip、django相关安装不在详写,直接参考百度即可; pip install django==1.9 安装django pip install celery==3.1.20 安装celery pip install django-celery 安装django-celery 安装如果有失败,所需要的依赖环境自行解决。例如:mysql-python等;使用做redis作为消息中间件,安装redis:略 二、创建django项目开始测试 1、创建django 工程 命名为djtest django-admin.py startproject djtest1 2、创建app 命名为apps cd djtest python manage.py startapp apps1 3、创建完成后,django 目录结构如下:djtest1├── apps1│ ├── admin.py│ ├── apps.py│ ├── init.py│ ├── migrations│ │ └── init.py│ ├── models.py│ ├── tests.py│ └── views.py├── djtest1│ ├── init.py│ ├── init.pyc│ ├── settings.py│ ├── settings.pyc│ ├── urls.py│ └── wsgi.py└── manage.py 4、修改setting.py django配置文件,增加如下: import djcelery ### djcelery.setup_loader() ### CELERY_TIMEZONE='Asia/Shanghai' #并没有北京时区,与下面TIME_ZONE应该一致 BROKER_URL='redis://192.168.217.77:16379/8' #任何可用的redis都可以,不一定要在django server运行的主机上 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' ### INSTALLED_APPS = ( 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'djcelery', ### 加入djcelery应用 'apps1', ### 加入新创建的apps1 ) TIME_ZONE='Asia/Shanghai' ### 开头增加如上配置文件,根据实际情况配置redis的地址和端口,时区一定要设置为Asia/Shanghai。否则时间不准确回影响定时任务的运行。 上面代码首先导出djcelery模块,并调用setup_loader方法加载有关配置;注意配置时区,不然默认使用UTC时间会比东八区慢8个小时。其中INSTALLED_APPS末尾添加两项,分别表示添加celery服务和自己定义的apps服务。 5、编写celery文件:djtest/djtest/celery.py #!/bin/python from __future__ import absolute_import import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djtest1.settings') #Specifying the settings here means the celery command line program will know where your Django project is. #This statement must always appear before the app instance is created, which is what we do next: from django.conf import settings app = Celery('djtest1') app.config_from_object('django.conf:settings') #This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings. #You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object. app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) #With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module. #The tasks.py should be in dir which is added to INSTALLED_APP in settings.py. #So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py. @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) #dumps its own request information 6、修改djtest1/djtest1/init.py #!/bin/python from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app 7、接下来编写你希望django去完成的app,本文中要编写的就是在INSTALLED_APPS中注册的apps。在celery.py中设定了对settings.py中INSTALLED_APPS做autodiscover_tasks,本文希望apps中能够接受这样的目录组织:所有的app都可以放到apps下面,而且每个app都有独立的目录,就和上面的app1、app2一样,每个app各自有各自的init.py和tasks.py(注意,每个app都需要init.py文件,可以是空白的)。但是这样的结构组织在启动时会报错说module apps找不到。然后在apps下增加了一个init.py文件,这时报错没了,但是apps下每个app的tasks.py中的任务函数还是无法被django和celery worker找到。 **然后尝试了在apps1下面写一个__init__.py(空白)和task.py,所有的task function都写到tasks.py中,如下** from __future__ import absolute_import from celery import task from celery import shared_task #from celery.task import tasks #from celery.task import Task @task() #@shared_task def add(x, y): print "%d + %d = %d"%(x,y,x+y) return x+y #class AddClass(Task): # def run(x,y): # print "%d + %d = %d"%(x,y,x+y) # return x+y #tasks.register(AddClass) @shared_task def mul(x, y): print "%d * %d = %d"%(x,y,x*y) return x*y @shared_task def sub(x, y): print "%d - %d = %d"%(x,y,x-y) return x-y 8、同步数据库 python manage.py makemigrations python manage.py migrate 9、创建超级用户 python manage.py createsuperuser Username (leave blank to use 'work'): admin Email address: yyc@taihe.com Password: Password (again): Superuser created successfully. 10、启动django-web、启动celery beat 启动 celery worker进程 python manage.py runserver 0.0.0.0:8001#启动django的应用,可以动态的使用django-admin来管理任务 python manage.py celery beat #应该是用来监控任务变化的 python manage.py celery worker -c 6 -l debug #任务执行进程,worker进程 11、通过django-admin添加已注册的任务,并查看输出是否正常。 http://192.168.217.77:8001/admin/ 输入密码登录(1)登录后添加任务:点击红线标记的列表,通过add来添加;(2) 点击进入以后,可以看到已经存在的任务,点击添加即可;(3)按照提示,输入name,通过task(registered) 选择已经注册的函数服务。选择运行模式,阻塞模式,为多长时间间隔运行一次,或者crontab形式运行。点击Arguments(show),添加需要传入注册函数的参数。(4)实例,具体名称以及运行时间以及传入参数等。(5)保存之后,可以查看到列表。 (6)在python manage.py celery worker -c 6 -l debug启动的窗口可以看到如下的运行过程,证明已经生效。 第一行红色标记,可以看到注册函数被调用,第二行红色标记,可以看到函数的返回值。 到此已经基本完成。在实际运用中,我们只需要修改或者添加到tasks.py文件里一些函数,让他注册到里边。我们从前台django-web写入任务,可以使其动态加载到任务。并且把正确的参数传过去,就可以正常执行。完成我们所想要的通过这个django-celery工具制作定期的备份、统一管理的crontab平台等。 参考文章: http://blog.csdn.net/vintage_1/article/details/47664297 http://docs.jinkan.org/docs/celery/getting-started/introduction.html http://www.jianshu.com/p/f78ed01969b3 http://www.jianshu.com/p/b7f843f21c46

优秀的个人博客,低调大师

补习系列(14)-springboot redis 整合-数据读写

一、简介 在 [补习系列(A3)-springboot redis 与发布订阅]() 一文中,我们介绍了使用 Redis 实现消息订阅发布的机制,并且给出了一个真实用例。然而,绝大多数场景下 Redis 是作为缓存被使用的(这是其主要优势)。除此之外,由于Redis 提供了 AOF以及RDB两种持久化机制,某些情况下也可以作为临时数据库使用。本次将介绍 SpringBoot 中如何使用 Redis 进行缓存读写。 Redis 的基本命令在学习之前,需要先了解一些Redis 的基本命令,可以参考这里 http://www.redis.cn/ 二、SpringBoot Redis 读写 A. 引入 spring-data-redis 添加依赖 <!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>${spring-boot.version}</version> </dependency> spring-boot-starter-redis在1.4版本已经废弃 配置redis连接application.properties # redis 连接配置 spring.redis.database=0 spring.redis.host=127.0.0.1 spring.redis.password= spring.redis.port=6379 spring.redis.ssl=false # 连接池最大数 spring.redis.pool.max-active=10 # 空闲连接最大数 spring.redis.pool.max-idle=10 # 获取连接最大等待时间(s) spring.redis.pool.max-wait=600000 B. 序列化 同样,我们需要指定 JSON作为 Key/HashKey/Value的主要方式: /** * 序列化定制 * * @return */ @Bean public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>( Object.class); // 初始化objectmapper ObjectMapper mapper = new ObjectMapper(); mapper.setSerializationInclusion(Include.NON_NULL); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(mapper); return jackson2JsonRedisSerializer; } /** * 操作模板 * * @param connectionFactory * @param jackson2JsonRedisSerializer * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory connectionFactory, Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(connectionFactory); // 设置key/hashkey序列化 RedisSerializer<String> stringSerializer = new StringRedisSerializer(); template.setKeySerializer(stringSerializer); template.setHashKeySerializer(stringSerializer); // 设置值序列化 template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); // template.setValueSerializer(new // GenericToStringSerializer<Object>(Object.class)); return template; } Jackson2JsonRedisSerializer是Jackson转换的桥接器;RedisTemplate是用于读写的主要操作类; C. 读写样例 首先定义一个Pet实体类 public class RedisPet { private String name; private String type; ... ignore get set 利用RedisTemplate封装一层Repository,如下: @Repository public static class PetRepository { private static final String KEY = "Pets"; @Autowired private RedisTemplate<String, Object> redisTemplate; private HashOperations<String, String, Object> hashOperations; @PostConstruct private void init() { hashOperations = redisTemplate.opsForHash(); } public void add(RedisPet pet) { hashOperations.put(KEY, pet.getName(), pet); } public RedisPet find(String name) { return (RedisPet) hashOperations.get(KEY, name); } public Map<String, Object> findAll() { return hashOperations.entries(KEY); } public void clear() { hashOperations.getOperations().delete(KEY); } } 在PetRepository 的实现中,我们利用Hash结构来存储 Pet信息(Pet.name是key)分别实现了添加(add)/查找(get)/清除(clear)等方法。 最后,实现读写调用: @Service public class RedisDataOperation { private static final Logger logger = LoggerFactory.getLogger(RedisDataOperation.class); @Autowired private PetRepository petRepo; @PostConstruct public void start() { RedisPet pet1 = new RedisPet("Polly", "Bird"); RedisPet pet2 = new RedisPet("Tom", "Cat"); //写入宠物信息 petRepo.add(pet1); petRepo.add(pet2); //打印宠物信息 logger.info("polly {}", JsonUtil.toJson(petRepo.find("Polly"))); logger.info("pets {}", JsonUtil.toJson(petRepo.findAll())); //清空 petRepo.clear(); } 上面的代码在应用启动时,会写入两个Pet信息,之后完成清理,控制台输出如下: RedisDataOperation : polly {"name":"Polly","type":"Bird"} RedisDataOperation : pets {"Tom":{"name":"Tom","type":"Cat"},"Polly":{"name":"Polly","type":"Bird"}} 三、方法级缓存 除了上面的RedisTemplate,spring-data-redis还提供了方法级缓存,就是将业务方法的执行结果缓存起来,后面再次调用直接从缓存中取得结果返回。 这种方式可以简化缓存逻辑的代码,比如配置类数据的读取,通过方法注解就可以实现,下面是一个样例: /** * 方法级缓存样例 * * @author atp * */ @Service public class RedisCacheOperation { private static final Logger logger = LoggerFactory.getLogger(RedisCacheOperation.class); public static final String PREFIX = "pets:"; public static final String WRAP_PREFIX = "'pets:'"; /** * 当结果不为空时缓存 * * @param name * @return */ @Cacheable(value = "petCache", key = WRAP_PREFIX + "+#name", unless = "#result==null") public RedisPet getPet(String name) { logger.info("get pet {}", name); return new RedisPet(name, "Bird"); } /** * 当结果不为空时淘汰缓存 * * @param pet * @return */ @CacheEvict(value = "petCache", key = WRAP_PREFIX + "+#pet.name", condition = "#result!=null") public RedisPet updatePet(RedisPet pet) { logger.info("update pet {}", pet.getName()); return new RedisPet(pet.getName(), "Bird1"); } /** * 当结果为true时淘汰缓存 * * @param name * @return */ @CacheEvict(value = "petCache", key = WRAP_PREFIX + "+#name", condition = "#result==true") public boolean deletePet(String name) { logger.info("delete pet {}", name); return true; } } 涉及到几个注解: 注解 说明 @Cachable 方法执行结果缓存 @CachePut 方法执行结果缓存(强制) @CacheEvict 方法执行时触发删除 其中 @CachePut 与 @Cachable 的区别在于,前者一定会执行方法,并尝试刷新缓存(条件满足),而后者则是当缓存中不存在时才会执行方法并更新。注解中的属性 key/condition 都支持通过 Spring EL 表达式来引用参数对象。 启用注解 除了上面的代码,我们还需要使用 @EnableCaching 启用注解: @EnableCaching @Configuration public class RedisConfig { private static final Logger logger = LoggerFactory.getLogger(RedisConfig.class); /** * 缓存管理,支持方法级注解 * * @param template * @return */ @Bean public RedisCacheManager cacheManager(RedisTemplate<String, Object> template) { RedisCacheManager redisCacheManager = new RedisCacheManager(template); // 默认过期时间 redisCacheManager.setDefaultExpiration(30 * 60 * 1000); return redisCacheManager; } 当@Cacheable 的key属性为空时,框架会自动生成,格式类似: param1,param2,param3... 如果希望修改默认的行为,可以使用自定义的 KeyGenerator: /** * 定制方法缓存的key生成策略 * * @return */ @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... args) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object arg : args) { sb.append(arg.toString()); } return sb.toString(); } }; } 单元测试 使用一小段单元测试代码来测试方法级缓存功能 @RunWith(SpringRunner.class) @SpringBootTest(classes =BootSampleRedis.class) public class RedisCacheOperationTest { private static final Logger logger = LoggerFactory.getLogger(RedisCacheOperationTest.class); @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private RedisCacheOperation operation; private RedisPet pet1 = new RedisPet("Polly", "Bird"); @Test public void testGet() { operation.getPet(pet1.getName()); Object object = redisTemplate.opsForValue().get(RedisCacheOperation.PREFIX + pet1.getName()); logger.info(String.valueOf(object)); assertNotNull(object); } @Test public void testUpdate() { operation.updatePet(pet1); Object object = redisTemplate.opsForValue().get(RedisCacheOperation.PREFIX + pet1.getName()); logger.info(String.valueOf(object)); assertNull(object); } @Test public void testDelete() { operation.getPet(pet1.getName()); // delete cache operation.deletePet(pet1.getName()); Object object = redisTemplate.opsForValue().get(RedisCacheOperation.PREFIX + pet1.getName()); logger.info(String.valueOf(object)); assertNull(object); } } 四、连接池 如果希望通过代码来配置 Jedis 的连接池(熟悉的方式),可以声明 JedisConnectionFactory 实现: /** * 连接池配置 * * @return */ @Bean public JedisConnectionFactory jedisConnectionFactory() { JedisPoolConfig config = new JedisPoolConfig(); // 最大连接 config.setMaxTotal(10); // 最大空闲,与最大连接保持一致,可减少频繁键链的开销 config.setMaxIdle(10); // 连接最大空闲时间 config.setMinEvictableIdleTimeMillis(10 * 60 * 1000); // 获取连接等待的最大时长 config.setMaxWaitMillis(30000); // 进行空闲连接检测的时间间隔 config.setTimeBetweenEvictionRunsMillis(30 * 1000); // 取消不必要的test,有利于性能提升 config.setTestOnBorrow(false);![](https://img2018.cnblogs.com/blog/242916/201812/242916-20181206231048870-1133770725.png) config.setTestOnReturn(false); JedisConnectionFactory factory = new JedisConnectionFactory(config); factory.setHostName("127.0.0.1"); factory.setPort(6379); logger.info("redis config init first"); return factory; } 更多配置可参考这里 示例代码可从 码云gitee 下载。https://gitee.com/littleatp/springboot-samples/ 小结 Redis 在大多数项目中的核心用途是缓存,spring-data-redis 为 SpringBoot 中集成 Redis 读写的封装。除了 RedisTemplate之外,还实现了方法级的缓存注解,一定程度上简化了业务的使用。Redis 在分布式系统中的应用场景有很多,后续有机会将进行更多的探讨。 欢迎继续关注"美码师的补习系列-springboot篇" ,期待更多精彩内容^-^ 原文链接:https://www.cnblogs.com/littleatp/p/10080296.html

优秀的个人博客,低调大师

SpringBoot开发案例之整合日志管理

概述 参考文档:Logging 这里顺便引用以下部分原文,当然看不明白也没关系,我们有有道翻译,如果翻译的不准确,后面会提供详细配置分享给大家。 SpringBootusesCommonsLoggingforallinternallogging,butleavestheunderlyinglogimplementationopen.DefaultconfigurationsareprovidedforJavaUtilLogging,Log4J2andLogback.Ineachcaseloggersarepre-configuredtouseconsoleoutputwithoptionalfileoutputalsoavailable. Bydefault,Ifyouusethe‘Starters’,Logbackwillbeusedforlogging.AppropriateLogbackroutingisalsoincludedtoensurethatdependentlibrariesthatuseJavaUtilLogging,CommonsLogging,Log4JorSLF4Jwillallworkcorrectly. 使用 pom.xml相关配置就不跟大家在这里扯了,依赖自行添加。 Console output 配置日志的相关参数也只需要写在 application.properties或者application.yml中就可以了,当然,这仅仅是基础的配置。 #官方文档中有提到,SpringBoot的Logging配置的级别有7个:TRACE,DEBUG,INFO,WARN,ERROR,FATAL,OFF #root日志以INFO级别输出 logging.level.root=INFO #springframework.web日志以WARN级别输出 logging.level.org.springframework.web=WARN #hibernate日志以ERROR级别输出 logging.level.org.hibernate=ERROR 加入以上配置后,我们启动项目,就可以在控制台打印Log信息了。 但是,在生产环境中,日志往往要以文件形式存放到服务器,下面介绍一下spring-boot日志的文件输出方式。 File output logging.file=spring_boot.log logging.pattern.console=%d{yyyy/MM/dd-HH:mm:ss}[%thread]%-5level%logger-%msg%n logging.pattern.file=%d{yyyy/MM/dd-HH:mm}[%thread]%-5level%logger-%msg%n 配置完成以后,我们再次启动项目,这时候会在根目录下生成一个spring_boot.log日志文件。 但是,经历过项目上线的小伙伴,其实这样配置远远达不到生产要求。比如,区分普通日志和错误日志,按照日期存储日志,配置单个日志文件最大容量,删除多少天之前的文件等等!下面与大家分享一款更高级的配置。 Custom log configuration Depending on your logging system, the following files will be loaded: Logback:logback-spring.xml, logback-spring.groovy, logback.xml or logback.groovy Log4j2:log4j2-spring.xml or log4j2.xml JDK (Java Util Logging):logging.properties spring-boot日志管理支持Logback,Log4j2以及Log4j,根据以上说明,我们可以定义文件命名。 下面我们使用Logback的指定配置文件实现更高级的日志配置。 logback-spring.xml: <?xmlversion="1.0"encoding="UTF-8"?> <!--scan配置文件如果发生改变,将会被重新加载scanPeriod检测间隔时间--> <configurationscan="true"scanPeriod="60seconds"debug="false"> <contextName>spring-boot-log</contextName> <includeresource="org/springframework/boot/logging/logback/base.xml"/> <!--普通日志--> <appendername="INFO_FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>log/spring-boot-log-info.log</file> <!--循环政策:基于时间创建日志文件--> <rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日志命名:单个文件大于128MB按照时间+自增i生成log文件--> <fileNamePattern>log/spring-boot-log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicyclass="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>128MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--最大保存时间:30天--> <maxHistory>30</maxHistory> </rollingPolicy> <append>true</append> <encoderclass="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-ddHH:mm:ss.SSS}%-5level%loggerLine:%-3L-%msg%n</pattern> <charset>utf-8</charset> </encoder> <filterclass="ch.qos.logback.classic.filter.LevelFilter"> <level>info</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!--错误日志--> <appendername="ERROR_FILE"class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>log/spring-boot-log-error.log</file> <!--循环政策:基于时间创建日志文件--> <rollingPolicyclass="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!--日志命名:单个文件大于2MB按照时间+自增i生成log文件--> <fileNamePattern>log/spring-boot-log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicyclass="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>2MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--最大保存时间:180天--> <maxHistory>180</maxHistory> </rollingPolicy> <append>true</append> <!--日志格式--> <encoderclass="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern>%d{yyyy-MM-ddHH:mm:ss.SSS}%-5level%loggerLine:%-3L-%msg%n</pattern> <charset>utf-8</charset> </encoder> <!--日志级别过滤器--> <filterclass="ch.qos.logback.classic.filter.LevelFilter"> <!--过滤的级别--> <level>ERROR</level> <!--匹配时的操作:接收(记录)--> <onMatch>ACCEPT</onMatch> <!--不匹配时的操作:拒绝(不记录)--> <onMismatch>DENY</onMismatch> </filter> </appender> <!--控制台--> <appendername="STDOUT"class="ch.qos.logback.core.ConsoleAppender"> <!--日志格式--> <encoder> <pattern>%d{yyyy-MM-ddHH:mm:ss.SSS}%-5level%loggerLine:%-3L-%msg%n</pattern> <charset>utf-8</charset> </encoder> <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息--> <filterclass="ch.qos.logback.classic.filter.ThresholdFilter"> <level>INFO</level> </filter> </appender> <!--additivity避免执行2次--> <loggername="com.itstyle"level="INFO"additivity="false"> <appender-refref="STDOUT"/> <appender-refref="INFO_FILE"/> <appender-refref="ERROR_FILE"/> </logger> <rootlevel="INFO"> <appender-refref="STDOUT"/> <appender-refref="INFO_FILE"/> <appender-refref="ERROR_FILE"/> </root> </configuration> 如果我们使用了 Logback 的指定配置文件的话,那么application.properties 中的配置可以取消了。 测试 配置完成后,我们做一个测试,为了测试方便把info和error日志容量maxFileSize都改为2MB。 然后启动程序: /** *创建者https://blog.52itstyle.com *创建时间2017年7月24日 */ @EnableAutoConfiguration publicclassApplication{ privatestaticfinalLoggerlogger=LoggerFactory.getLogger(Application.class); publicstaticvoidmain(String[]args)throwsInterruptedException{ SpringApplication.run(Application.class,args); while(true){ logger.info("普通日志"); logger.error("错误日志"); } } } 执行一段时间,如果在项目路径下生成以下日志文件说明配置成功。 代码:http://git.oschina.net/52itstyle/spring-boot-log 作者: 小柒 出处: https://blog.52itstyle.com

优秀的个人博客,低调大师

Spring-boot整合vue遇到的坑

spring-boot合并vue遇到的坑 第一次用spring-boot,项目经理让把spring-boot和vue整个在一起,我说分开有利于分布式部署,经理说不需要,直接整一起,没办法,整! 网上找资料,有2种方式,随便搜下都有,我选最简单的那种,直接将vue代码编译,然后将static文件夹和index.html直接丢到maven项目的src/main/resouces文件夹下,放在另外一个static文件内,结构如下: 展开: 照网上说的弄了下,始终不能访问,不知道是怎么回事,也搜不到相关问题,不知道是不是我搜的不对.最后找到一个spring-boot的applicationn.yml配置文件说明,看到一个配置项: resources: add-mappings: 是否开放默认的资源处理,默认true 不太明白是啥意思,但是我的项目里的application.yml里配置的是false,我想是不是这个问题,我改成true,能成功访问,so happy.原来这个是处理static文件夹下的静态资源的选项,get到新技能. 因为这个项目是请的别的架构师搭的,结果隔了几天之后,这个项目的pom.xml文件改了,在里面加了打包文件的拦截: WTF,加了这个之后我原先能访问的静态资源文件顿时访问不了,刚开始我以为是我什么地方的配置文件改动了,但是并没有,之后才意识到是pom.xml文件加了东西,咋整,查资料,公司也没人可以问. 查了资料才知道是资源拦截,resources里的文件本身是都打包的,加这个后就只打包resouceces文件夹下的logback.xml文件了.主要是看target中是否有已经包括了需要编译的文件,想把static文件整个加进去,我在incldues里又加了一条 看了网上的资料都是说怎么弄配置文件,没说怎么把整个static文件夹都放到编译之后的文件中,只好自己试了下,static后面一个只编译子文件,不编译其中的子文件夹以及子文件夹里的资源,加*则是所有都编译.终于是大功告成. 写给自己,遇见更好的自己!

资源下载

更多资源
Mario

Mario

马里奥是站在游戏界顶峰的超人气多面角色。马里奥靠吃蘑菇成长,特征是大鼻子、头戴帽子、身穿背带裤,还留着胡子。与他的双胞胎兄弟路易基一起,长年担任任天堂的招牌角色。

Oracle

Oracle

Oracle Database,又名Oracle RDBMS,或简称Oracle。是甲骨文公司的一款关系数据库管理系统。它是在数据库领域一直处于领先地位的产品。可以说Oracle数据库系统是目前世界上流行的关系数据库管理系统,系统可移植性好、使用方便、功能强,适用于各类大、中、小、微机环境。它是一种高效率、可靠性好的、适应高吞吐量的数据库方案。

Eclipse

Eclipse

Eclipse 是一个开放源代码的、基于Java的可扩展开发平台。就其本身而言,它只是一个框架和一组服务,用于通过插件组件构建开发环境。幸运的是,Eclipse 附带了一个标准的插件集,包括Java开发工具(Java Development Kit,JDK)。

Sublime Text

Sublime Text

Sublime Text具有漂亮的用户界面和强大的功能,例如代码缩略图,Python的插件,代码段等。还可自定义键绑定,菜单和工具栏。Sublime Text 的主要功能包括:拼写检查,书签,完整的 Python API , Goto 功能,即时项目切换,多选择,多窗口等等。Sublime Text 是一个跨平台的编辑器,同时支持Windows、Linux、Mac OS X等操作系统。