Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I want to write a netty based client. It should have method public String send(String msg); which should return response from the server or some future - doesen't matter. Also it should be multithreaded. Like this:

public class Client {
public static void main(String[] args) throws InterruptedException {
    Client client = new Client();

}

private Channel channel;

public Client() throws InterruptedException {
    EventLoopGroup loopGroup = new NioEventLoopGroup();

    Bootstrap b = new Bootstrap();
    b.group(loopGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new StringDecoder()).
                    addLast(new StringEncoder()).
                    addLast(new ClientHandler());
        }
    });
    channel = b.connect("localhost", 9091).sync().channel();
}

public String sendMessage(String msg) {
    channel.writeAndFlush(msg);
    return ??????????;
}

}

And I don't get how can I retrieve response from server after I invoke writeAndFlush(); What should I do?

Also I use Netty 4.0.18.Final

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
394 views
Welcome To Ask or Share your Answers For Others

1 Answer

Returning a Future<String> for the method is simple, we are going to implement the following method signature:

public Futute<String> sendMessage(String msg) {

The is relatively easy to do when you are known with the async programming structures. To solve the design problem, we are going to do the following steps:

  1. When a message is written, add a Promise<String> to a ArrayBlockingQueue<Promise>

    This will serve as a list of what messages have recently been send, and allows us to change our Future<String> objects return result.

  2. When a message arrives back into the handler, resolve it against the head of the Queue

    This allows us to get the correct future to change.

  3. Update the state of the Promise<String>

    We call promise.setSuccess() to finally set the state on the object, this will propagate back to the future object.

Example code

public class ClientHandler extends SimpleChannelInboundHandler<String> {
    private ChannelHandlerContext ctx;
    private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>(16);

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        super.channelActive(ctx);
        this.ctx = ctx;
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        super.channelInactive(ctx);
        synchronized(this){
            Promise<String> prom;
            while((prom = messageList.poll()) != null) 
                prom.setFailure(new IOException("Connection lost"));
            messageList = null;
        }
    }

    public Future<String> sendMessage(String message) {
        if(ctx == null) 
            throw new IllegalStateException();
        return sendMessage(message, ctx.executor().newPromise());
    }

    public Future<String> sendMessage(String message, Promise<String> prom) {
        synchronized(this){
            if(messageList == null) {
                // Connection closed
                prom.setFailure(new IllegalStateException());
            } else if(messageList.offer(prom)) { 
                // Connection open and message accepted
                ctx.writeAndFlush(message).addListener();
            } else { 
                // Connection open and message rejected
                prom.setFailure(new BufferOverflowException());
            }
            return prom;
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) {
        synchronized(this){
            if(messageList != null) {
                 messageList.poll().setSuccess(msg);
            }
        }
    }
}

Documentation breakdown

  • private ChannelHandlerContext ctx;

    Used to store our reference to the ChannelHandlerContext, we use this so we can create promises

  • private BlockingQueue<Promise<String>> messageList = new ArrayBlockingQueue<>();

    We keep the past messages in this list so we can change the result of the future

  • public void channelActive(ChannelHandlerContext ctx)

    Called by netty when the connection becomes active. Init our variables here.

  • public void channelInactive(ChannelHandlerContext ctx)

    Called by netty when the connection becomes inactive, either due to error or normal connection close.

  • protected void messageReceived(ChannelHandlerContext ctx, String msg)

    Called by netty when a new message arrives, here pick out the head of the queue, and then we call setsuccess on it.

Warning advise

When using futures, there is 1 thing you need to lookout for, do not call get() from 1 of the netty threads if the future isn't done yet, failure to follow this simple rule will either result in a deadlock or a BlockingOperationException.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...