Netty In Action中文版 - 第十三章:通过UDP广播事件
Netty In Action中文版 - 第十三章:通过UDP广播事件
本章介绍
- UDP介绍
- UDP程序结构和设计
- 日志事件POJO
- 编写广播器
- 编写监听者
- 使用广播器和监听者
- Summary
前面的章节都是在示例中使用TCP协议,这一章,我们将使用UDP。UDP是一种无连接协议,若需要很高的性能和对数据的完成性没有严格要求,那使用UDP是一个很好的方法。最著名的基于UDP协议的是用来域名解析的DNS。
Netty使用了统一的传输API,这使得编写基于UDP的应用程序很容易。可以重用现有的ChannelHandler和其他公共组件来编写另外的Netty程序。看完本章后,你就会知道什么事无连接协议以及为什么UDP可能适合你的应用程序。
13.1 UDP介绍
13.2 UDP程序结构和设计
- UDP是一个无连接协议,传输数据之前源端和终端不建立连接,当它想传送时就简单地去抓取来自应用程序的数据,并尽可能快地把它扔到网络上。在发送端,UDP传送数据的速度仅仅是受应用程序生成数据的速度、计算机的能力和传输带宽的限制;在接收端,UDP把每个消息段放在队列中,应用程序每次从队列中读一个消息段。
- 由于传输数据不建立连接,因此也就不需要维护连接状态,包括收发状态等,因此一台服务机可同时向多个客户机传输相同的消息。
- UDP信息包的标题很短,只有8个字节,相对于TCP的20个字节信息包的额外开销很小。
- 吞吐量不受拥挤控制算法的调节,只受应用软件生成数据的速率、传输带宽、源端和终端主机性能的限制。
- UDP使用尽最大努力交付,即不保证可靠交付,因此主机不需要维持复杂的链接状态表(这里面有许多参数)。
- UDP是面向报文的。发送方的UDP对应用程序交下来的报文,在添加首部后就向下交付给IP层。既不拆分,也不合并,而是保留这些报文的边界,因此,应用程序需要选择合适的报文大小。
本章UDP程序例子的示意图入如下:
13.3 日志事件POJO
- package netty.in.action.udp;
- import java.net.InetSocketAddress;
- public class LogEvent {
- public static final byte SEPARATOR = (byte) '|';
- private final InetSocketAddress source;
- private final String logfile;
- private final String msg;
- private final long received;
- public LogEvent(String logfile, String msg) {
- this(null, -1, logfile, msg);
- }
- public LogEvent(InetSocketAddress source, long received, String logfile, String msg) {
- this.source = source;
- this.logfile = logfile;
- this.msg = msg;
- this.received = received;
- }
- public InetSocketAddress getSource() {
- return source;
- }
- public String getLogfile() {
- return logfile;
- }
- public String getMsg() {
- return msg;
- }
- public long getReceived() {
- return received;
- }
- }
接下来的章节,我们将用这个POJO类来实现具体的逻辑。
13.4 编写广播器
- package netty.in.action.udp;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.socket.DatagramPacket;
- import io.netty.handler.codec.MessageToMessageEncoder;
- import io.netty.util.CharsetUtil;
- import java.net.InetSocketAddress;
- import java.util.List;
- public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
- private final InetSocketAddress remoteAddress;
- public LogEventEncoder(InetSocketAddress remoteAddress){
- this.remoteAddress = remoteAddress;
- }
- @Override
- protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out)
- throws Exception {
- ByteBuf buf = ctx.alloc().buffer();
- buf.writeBytes(msg.getLogfile().getBytes(CharsetUtil.UTF_8));
- buf.writeByte(LogEvent.SEPARATOR);
- buf.writeBytes(msg.getMsg().getBytes(CharsetUtil.UTF_8));
- out.add(new DatagramPacket(buf, remoteAddress));
- }
- }
下面我们再编写一个广播器:
- package netty.in.action.udp;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioDatagramChannel;
- import java.io.File;
- import java.io.IOException;
- import java.io.RandomAccessFile;
- import java.net.InetSocketAddress;
- public class LogEventBroadcaster {
- private final EventLoopGroup group;
- private final Bootstrap bootstrap;
- private final File file;
- public LogEventBroadcaster(InetSocketAddress address, File file) {
- group = new NioEventLoopGroup();
- bootstrap = new Bootstrap();
- bootstrap.group(group).channel(NioDatagramChannel.class)
- .option(ChannelOption.SO_BROADCAST, true)
- .handler(new LogEventEncoder(address));
- this.file = file;
- }
- public void run() throws IOException {
- Channel ch = bootstrap.bind(0).syncUninterruptibly().channel();
- long pointer = 0;
- for (;;) {
- long len = file.length();
- if (len < pointer) {
- pointer = len;
- } else {
- RandomAccessFile raf = new RandomAccessFile(file, "r");
- raf.seek(pointer);
- String line;
- while ((line = raf.readLine()) != null) {
- ch.write(new LogEvent(null, -1, file.getAbsolutePath(), line));
- }
- ch.flush();
- pointer = raf.getFilePointer();
- raf.close();
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.interrupted();
- break;
- }
- }
- }
- public void stop() {
- group.shutdownGracefully();
- }
- public static void main(String[] args) throws Exception {
- int port = 4096;
- String path = System.getProperty("user.dir") + "/log.txt";
- LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress(
- "255.255.255.255", port), new File(path));
- try {
- broadcaster.run();
- } finally {
- broadcaster.stop();
- }
- }
- }
13.5 编写监听者
- 接收LogEventBroadcaster广播的DatagramPacket
- 解码LogEvent消息
- 输出LogEvent
EventLogMonitor的示意图如下:
解码器代码如下:
- package netty.in.action.udp;
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.socket.DatagramPacket;
- import io.netty.handler.codec.MessageToMessageDecoder;
- import io.netty.util.CharsetUtil;
- import java.util.List;
- public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
- @Override
- protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out)
- throws Exception {
- ByteBuf buf = msg.content();
- int i = buf.indexOf(0, buf.readableBytes(), LogEvent.SEPARATOR);
- String filename = buf.slice(0, i).toString(CharsetUtil.UTF_8);
- String logMsg = buf.slice(i + 1, buf.readableBytes()).toString(CharsetUtil.UTF_8);
- LogEvent event = new LogEvent(msg.sender(),
- System.currentTimeMillis(), filename, logMsg);
- out.add(event);
- }
- }
处理消息的Handler代码如下:
- package netty.in.action.udp;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
- public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, LogEvent msg) throws Exception {
- StringBuilder builder = new StringBuilder();
- builder.append(msg.getReceived());
- builder.append(" [");
- builder.append(msg.getSource().toString());
- builder.append("] [");
- builder.append(msg.getLogfile());
- builder.append("] : ");
- builder.append(msg.getMsg());
- System.out.println(builder.toString());
- }
- }
EventLogMonitor代码如下:
- package netty.in.action.udp;
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.Channel;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioDatagramChannel;
- import java.net.InetSocketAddress;
- public class LogEventMonitor {
- private final EventLoopGroup group;
- private final Bootstrap bootstrap;
- public LogEventMonitor(InetSocketAddress address) {
- group = new NioEventLoopGroup();
- bootstrap = new Bootstrap();
- bootstrap.group(group).channel(NioDatagramChannel.class)
- .option(ChannelOption.SO_BROADCAST, true)
- .handler(new ChannelInitializer<Channel>() {
- @Override
- protected void initChannel(Channel channel) throws Exception {
- ChannelPipeline pipeline = channel.pipeline();
- pipeline.addLast(new LogEventDecoder());
- pipeline.addLast(new LogEventHandler());
- }
- }).localAddress(address);
- }
- public Channel bind() {
- return bootstrap.bind().syncUninterruptibly().channel();
- }
- public void stop() {
- group.shutdownGracefully();
- }
- public static void main(String[] args) throws InterruptedException {
- LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(4096));
- try {
- Channel channel = monitor.bind();
- System.out.println("LogEventMonitor running");
- channel.closeFuture().sync();
- } finally {
- monitor.stop();
- }
- }
- }
