「Netty」UDP廣播事件

鏡音雙子 Line 廣播 技術 達人科技 2017-06-05

一、前言

前面學習了WebSocket協議,並且通過示例講解了WebSocket的具體使用,接著學習如何使用無連接的UDP來廣播事件。

二、UDP廣播事件

2.1 UDP基礎

面向連接的TCP協議管理端到端的連接,在連接生命週期中,發送的消息會有序並且可靠地進行傳輸,最後連接有序地終止。然而,在無連接協議(如UDP)中,沒有持久連接的概念,每個消息(UDP數據報)都是獨立的傳輸,此外,UDP沒有TCP的糾錯機制(即每個對等體會確認其接收到的分組,並且發送者會重傳未確認的分組)。

UDP的限制比TCP多,但是比TCP快很多,這是因為消除了握手和消息管理的所有開銷,UDP非常適合處理或容忍消息丟失的應用。

2.2 UDP廣播

迄今為止所有的示例都使用了單播的傳輸模式,其被定義為將消息發送到由唯一地址標識的單個網絡目的地,有連接和無連接的協議都支持這種模式,UDP為多個收件人發送消息提供了額外的傳輸模式:

· 組播--傳輸到定義的主機組。

· 廣播--傳輸到網絡(或子網)上的所有主機。

本章中的示例將通過發送在同一網絡上的所有主機接收的消息來使用UDP廣播。

2.3 UDP簡單示例

示例將打開一個文件,並通過UDP將每一行廣播為指定端口。下圖展示了應用的結構圖。

「Netty」UDP廣播事件

2.4 LogEvent POJO

在消息應用中,消息經常以POJO形式展現,LogEvent的POJO如下。

public final class LogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;
    public LogEvent(String logfile, String msg) {
        this(null, -1, logfile, msg);
    }
    public LogEvent(InetSocketAddress source, long received,
        String logfile, String msg) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }
    public InetSocketAddress getSource {
        return source;
    }
    public String getLogfile {
        return logfile;
    }
    public String getMsg {
        return msg;
    }
    public long getReceivedTimestamp {
        return received;
    }
}

2.5 編寫broadcaster

Netty提供了許多類來支持UDP應用程序,如Netty的DatagramPacket是DatagramChannel實現與遠程對等體進行通信的簡單消息容器,我們需要一個編碼器將EventLog消息轉換為DatagramPackets,可以擴展Netty的MessageToMessageEncoder,LogEventEncoder的代碼如下。

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;
    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
        LogEvent logEvent, List<Object> out) throws Exception {
        byte file = logEvent.getLogfile.getBytes(CharsetUtil.UTF_8);
        byte msg = logEvent.getMsg.getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc
 .buffer(file.length + msg.length + 1);
        buf.writeBytes(file);
        buf.writeByte(LogEvent.SEPARATOR);
        buf.writeBytes(msg);
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}

完成編碼器後,即可以開始啟動服務端,其中服務端LogEventBroadcaster的代碼如下。

public class LogEventBroadcaster {
    private final Bootstrap bootstrap;
    private final File file;
    private final EventLoopGroup group;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup;
        bootstrap = new Bootstrap;
        bootstrap.group(group)
 .channel(NioDatagramChannel.class)
 .option(ChannelOption.SO_BROADCAST, true)
 .handler(new LogEventEncoder(address));

        this.file = file;
    }

    public void run throws IOException {
        Channel ch = bootstrap.bind(0).syncUninterruptibly.channel;
        System.out.println("LogEventBroadcaster running");
        long pointer = 0;
        for (;;) {
 long len = file.length;
 if (len < pointer) {
 // file was reset
 pointer = len;
 } else if (len > pointer) {
 // Content was added
 RandomAccessFile raf = new RandomAccessFile(file, "r");
 raf.seek(pointer);
 String line;
 while ((line = raf.readLine) != null) {
 ch.writeAndFlush(new LogEvent(null, -1, file.getAbsolutePath, line));
 }
 pointer = raf.getFilePointer;
 raf.close;
 }
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 Thread.interrupted;
 break;
 }
        }
    }

    public void stop {
        group.shutdownGracefully;
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
 throw new IllegalArgumentException;
        }

        LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
 Integer.parseInt(args[0])), new File(args[1]));
        try {
 broadcaster.run;
        } finally {
 broadcaster.stop;
        }
    }
}

2.6 編寫monitor

在應用中

· 接收由LogEventBroadcaster廣播的UDP DatagramPackets。

· 將其解碼為LogEvent。

· 將LogEvent寫入輸出流System.out。

下圖展示LogEvent的流動。

LogEventDecoder負責將傳入的DatagramPackets解碼為LogEvent消息,其代碼如下。

public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
        ByteBuf data = datagramPacket.content;
        int i = data.indexOf(0, data.readableBytes, LogEvent.SEPARATOR);
        String filename = data.slice(0, i).toString(CharsetUtil.UTF_8);
        String logMsg =  data.slice(i + 1, data.readableBytes).toString(CharsetUtil.UTF_8);

        LogEvent event = new LogEvent(datagramPacket.recipient, System.currentTimeMillis,
 filename,logMsg);
        out.add(event);
    }
}

而LogEventHandler用於處理LogEvent,其代碼如下。

public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace;
        ctx.close;
    }

    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder;
        builder.append(event.getReceivedTimestamp);
        builder.append(" [");
        builder.append(event.getSource.toString);
        builder.append("] [");
        builder.append(event.getLogfile);
        builder.append("] : ");
        builder.append(event.getMsg);

        System.out.println(builder.toString);
    }
}

LogEventMonitor用於將處理器添加至管道中,其代碼如下。

public class LogEventMonitor {

    private final Bootstrap bootstrap;
    private final EventLoopGroup group;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup;
        bootstrap = new Bootstrap;
        bootstrap.group(group)
 .channel(NioDatagramChannel.class)
 .option(ChannelOption.SO_BROADCAST, true)
 .handler(new ChannelInitializer<Channel> {
 @Override
 protected void initChannel(Channel channel) throws Exception {
 ChannelPipeline pipeline = channel.pipeline;
 pipeline.addLast(new LogEventDecoder);
 pipeline.addLast(new LogEventHandler);
 }
 }).localAddress(address);

    }

    public Channel bind {
        return bootstrap.bind.syncUninterruptibly.channel;
    }

    public void stop {
        group.shutdownGracefully;
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
 throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
        try {
 Channel channel = monitor.bind;
 System.out.println("LogEventMonitor running");

 channel.closeFuture.await;
        } finally {
 monitor.stop;
        }
    }
}

運行LogEventBroadcaster和LogEventMonitor

三、總結

本篇博文講解了UDP協議,以及其示例,在實際應用中需要根據不同的應用場景選擇不同的協議,謝謝各位園友的觀看~

相關推薦

推薦中...